evento_core/
aggregator.rs

1//! Event creation and committing.
2//!
3//! This module provides the [`AggregatorBuilder`] for creating and persisting events.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use evento::{create, aggregator};
9//!
10//! // Create a new aggregate with auto-generated ID
11//! let id = create()
12//!     .event(&AccountOpened { owner: "John".into() })
13//!     .metadata(&metadata)
14//!     .routing_key("accounts")
15//!     .commit(&executor)
16//!     .await?;
17//!
18//! // Add events to existing aggregate
19//! aggregator(&existing_id)
20//!     .original_version(current_version)
21//!     .event(&MoneyDeposited { amount: 100 })
22//!     .commit(&executor)
23//!     .await?;
24//! ```
25
26use sha3::{Digest, Sha3_256};
27use std::time::{SystemTime, UNIX_EPOCH};
28use thiserror::Error;
29use ulid::Ulid;
30
31use crate::{cursor::Args, metadata::Metadata, Event, Executor, ReadAggregator};
32
33/// Creates a new builder for the given aggregate IDs.
34pub fn hash_ids(ids: Vec<impl Into<String>>) -> String {
35    let mut hasher = Sha3_256::new();
36    for id in ids {
37        hasher.update(id.into());
38    }
39
40    hex::encode(hasher.finalize())
41}
42
43/// Errors that can occur when writing events.
44#[derive(Debug, Error)]
45pub enum WriteError {
46    /// Version conflict - another event was written concurrently
47    #[error("invalid original version")]
48    InvalidOriginalVersion,
49
50    /// Attempted to commit without adding any events
51    #[error("trying to commit event without data")]
52    MissingData,
53
54    /// Unknown error from the executor
55    #[error("{0}")]
56    Unknown(#[from] anyhow::Error),
57
58    /// System time error
59    #[error("systemtime >> {0}")]
60    SystemTime(#[from] std::time::SystemTimeError),
61}
62
63/// Trait for aggregate types.
64///
65/// Aggregates are the root entities in event sourcing. Each aggregate
66/// type has a unique identifier string used for event storage and routing.
67///
68/// This trait is typically derived using the `#[evento::aggregator]` macro.
69///
70/// # Example
71///
72/// ```rust,ignore
73/// #[evento::aggregator("myapp/Account")]
74/// #[derive(Default)]
75/// pub struct Account {
76///     pub balance: i64,
77///     pub owner: String,
78/// }
79/// ```
80pub trait Aggregator: Default {
81    /// Returns the unique type identifier for this aggregate (e.g., "myapp/Account")
82    fn aggregator_type() -> &'static str;
83}
84
85/// Trait for event types.
86///
87/// Events represent state changes that have occurred. Each event type
88/// has a name and belongs to an aggregator type.
89///
90/// This trait is typically derived using the `#[evento::aggregator]` macro.
91///
92/// # Example
93///
94/// ```rust,ignore
95/// #[evento::aggregator("myapp/Account")]
96/// #[derive(bitcode::Encode, bitcode::Decode)]
97/// pub struct AccountOpened {
98///     pub owner: String,
99/// }
100/// ```
101pub trait AggregatorEvent: Aggregator {
102    /// Returns the event name (e.g., "AccountOpened")
103    fn event_name() -> &'static str;
104}
105
106/// Builder for creating and committing events.
107///
108/// Use [`create()`] or [`aggregator()`] to create an instance, then chain
109/// method calls to add events and metadata before committing.
110///
111/// # Optimistic Concurrency
112///
113/// If `original_version` is 0 (default for new aggregates), the builder
114/// queries the current version before writing. Otherwise, it uses the
115/// provided version for optimistic concurrency control.
116///
117/// # Example
118///
119/// ```rust,ignore
120/// // New aggregate
121/// let id = create()
122///     .event(&MyEvent { ... })
123///     .commit(&executor)
124///     .await?;
125///
126/// // Existing aggregate with version check
127/// aggregator(&id)
128///     .original_version(5)
129///     .event(&AnotherEvent { ... })
130///     .commit(&executor)
131///     .await?;
132/// ```
133#[derive(Clone)]
134pub struct AggregatorBuilder {
135    aggregator_id: String,
136    aggregator_type: String,
137    routing_key: Option<String>,
138    routing_key_locked: bool,
139    original_version: u16,
140    data: Vec<(&'static str, Vec<u8>)>,
141    metadata: Metadata,
142}
143
144impl AggregatorBuilder {
145    /// Creates a new builder for the given aggregate ID.
146    pub fn new(aggregator_id: impl Into<String>) -> AggregatorBuilder {
147        AggregatorBuilder {
148            aggregator_id: aggregator_id.into(),
149            aggregator_type: "".to_owned(),
150            routing_key: None,
151            routing_key_locked: false,
152            original_version: 0,
153            data: Vec::default(),
154            metadata: Default::default(),
155        }
156    }
157
158    /// Creates a new builder for the given aggregate IDs.
159    pub fn ids(ids: Vec<impl Into<String>>) -> AggregatorBuilder {
160        Self::new(hash_ids(ids))
161    }
162
163    /// Sets the expected version for optimistic concurrency control.
164    ///
165    /// If the aggregate's current version doesn't match, the commit will fail
166    /// with [`WriteError::InvalidOriginalVersion`].
167    pub fn original_version(&mut self, v: u16) -> &mut Self {
168        self.original_version = v;
169
170        self
171    }
172
173    /// Sets the routing key for event distribution.
174    ///
175    /// The routing key is used for partitioning events across consumers.
176    /// Once set, subsequent calls are ignored (locked behavior).
177    pub fn routing_key(&mut self, v: impl Into<String>) -> &mut Self {
178        self.routing_key_opt(Some(v.into()))
179    }
180
181    /// Sets an optional routing key for event distribution.
182    ///
183    /// Pass `None` to explicitly clear the routing key.
184    /// Once set, subsequent calls are ignored (locked behavior).
185    pub fn routing_key_opt(&mut self, v: Option<String>) -> &mut Self {
186        if !self.routing_key_locked {
187            self.routing_key = v;
188            self.routing_key_locked = true;
189        }
190
191        self
192    }
193
194    /// Sets the metadata to attach to all events.
195    ///
196    /// Metadata is serialized using bitcode and stored alongside each event.
197    pub fn metadata<M>(&mut self, key: impl Into<String>, value: &M) -> &mut Self
198    where
199        M: bitcode::Encode,
200    {
201        self.metadata.insert_enc(key, value);
202        self
203    }
204
205    pub fn requested_by(&mut self, value: impl Into<String>) -> &mut Self {
206        self.metadata.set_requested_by(value);
207        self
208    }
209
210    pub fn requested_as(&mut self, value: impl Into<String>) -> &mut Self {
211        self.metadata.set_requested_as(value);
212        self
213    }
214
215    /// Sets the metadata to attach to all events.
216    ///
217    /// Metadata is serialized using bitcode and stored alongside each event.
218    pub fn metadata_from(&mut self, value: impl Into<Metadata>) -> &mut Self {
219        self.metadata = value.into();
220        self
221    }
222
223    /// Adds an event to be committed.
224    ///
225    /// Multiple events can be added and will be committed atomically.
226    /// The event data is serialized using bitcode.
227    pub fn event<D>(&mut self, v: &D) -> &mut Self
228    where
229        D: AggregatorEvent + bitcode::Encode,
230    {
231        self.data.push((D::event_name(), bitcode::encode(v)));
232        self.aggregator_type = D::aggregator_type().to_owned();
233        self
234    }
235
236    /// Commits all added events to the executor.
237    ///
238    /// Returns the aggregate ID on success.
239    ///
240    /// # Errors
241    ///
242    /// - [`WriteError::MissingData`] - No events were added
243    /// - [`WriteError::InvalidOriginalVersion`] - Version conflict occurred
244    /// - [`WriteError::Unknown`] - Executor error
245    pub async fn commit<E: Executor>(&self, executor: &E) -> Result<String, WriteError> {
246        let first_event = executor
247            .read(
248                Some(vec![ReadAggregator::id(
249                    &self.aggregator_type,
250                    &self.aggregator_id,
251                )]),
252                None,
253                Args::forward(1, None),
254            )
255            .await
256            .map_err(WriteError::Unknown)?;
257
258        let routing_key = match first_event.edges.first() {
259            Some(event) => event.node.routing_key.to_owned(),
260            _ => self.routing_key.to_owned(),
261        };
262
263        let mut version = self.original_version;
264        let mut events = vec![];
265        let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
266
267        for (name, data) in &self.data {
268            version += 1;
269
270            let event = Event {
271                id: Ulid::new(),
272                name: name.to_string(),
273                data: data.to_vec(),
274                metadata: self.metadata.clone(),
275                timestamp: now.as_secs(),
276                timestamp_subsec: now.subsec_millis(),
277                aggregator_id: self.aggregator_id.to_owned(),
278                aggregator_type: self.aggregator_type.to_owned(),
279                version,
280                routing_key: routing_key.to_owned(),
281            };
282
283            events.push(event);
284        }
285
286        if events.is_empty() {
287            return Err(WriteError::MissingData);
288        }
289
290        executor.write(events).await?;
291
292        Ok(self.aggregator_id.to_owned())
293    }
294}
295
296/// Creates a new aggregate with an auto-generated ULID.
297///
298/// # Example
299///
300/// ```rust,ignore
301/// let id = create()
302///     .event(&AccountOpened { ... })
303///     .commit(&executor)
304///     .await?;
305/// ```
306pub fn create() -> AggregatorBuilder {
307    AggregatorBuilder::new(Ulid::new())
308}
309
310/// Creates a builder for an existing aggregate.
311///
312/// # Example
313///
314/// ```rust,ignore
315/// aggregator(&existing_id)
316///     .original_version(current_version)
317///     .event(&MoneyDeposited { amount: 100 })
318///     .commit(&executor)
319///     .await?;
320/// ```
321pub fn aggregator(id: impl Into<String>) -> AggregatorBuilder {
322    AggregatorBuilder::new(id)
323}
324
325pub trait AggregatorExecutor<E: Executor> {
326    fn has_event<A: AggregatorEvent>(
327        &self,
328        id: impl Into<String>,
329    ) -> impl std::future::Future<Output = anyhow::Result<bool>> + Send;
330}
331
332impl<E: Executor> AggregatorExecutor<E> for E {
333    fn has_event<A: AggregatorEvent>(
334        &self,
335        id: impl Into<String>,
336    ) -> impl std::future::Future<Output = anyhow::Result<bool>> + Send {
337        let id = id.into();
338        Box::pin(async {
339            let result = self
340                .read(
341                    Some(vec![ReadAggregator::new(
342                        A::aggregator_type(),
343                        id,
344                        A::event_name(),
345                    )]),
346                    None,
347                    Args::backward(1, None),
348                )
349                .await?;
350
351            Ok(!result.edges.is_empty())
352        })
353    }
354}