evento_core/
aggregator.rs

1//! Event creation and committing.
2//!
3//! This module provides the [`AggregatorBuilder`] for creating and persisting events.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use evento::{create, aggregator};
9//!
10//! // Create a new aggregate with auto-generated ID
11//! let id = create()
12//!     .event(&AccountOpened { owner: "John".into() })
13//!     .metadata(&metadata)
14//!     .routing_key("accounts")
15//!     .commit(&executor)
16//!     .await?;
17//!
18//! // Add events to existing aggregate
19//! aggregator(&existing_id)
20//!     .original_version(current_version)
21//!     .event(&MoneyDeposited { amount: 100 })
22//!     .commit(&executor)
23//!     .await?;
24//! ```
25
26use std::time::{SystemTime, UNIX_EPOCH};
27use thiserror::Error;
28use ulid::Ulid;
29
30use crate::{cursor::Args, metadata::Metadata, Event, Executor, ReadAggregator};
31
32/// Errors that can occur when writing events.
33#[derive(Debug, Error)]
34pub enum WriteError {
35    /// Version conflict - another event was written concurrently
36    #[error("invalid original version")]
37    InvalidOriginalVersion,
38
39    /// Attempted to commit without adding any events
40    #[error("trying to commit event without data")]
41    MissingData,
42
43    /// Unknown error from the executor
44    #[error("{0}")]
45    Unknown(#[from] anyhow::Error),
46
47    /// System time error
48    #[error("systemtime >> {0}")]
49    SystemTime(#[from] std::time::SystemTimeError),
50}
51
52/// Trait for aggregate types.
53///
54/// Aggregates are the root entities in event sourcing. Each aggregate
55/// type has a unique identifier string used for event storage and routing.
56///
57/// This trait is typically derived using the `#[evento::aggregator]` macro.
58///
59/// # Example
60///
61/// ```rust,ignore
62/// #[evento::aggregator("myapp/Account")]
63/// #[derive(Default)]
64/// pub struct Account {
65///     pub balance: i64,
66///     pub owner: String,
67/// }
68/// ```
69pub trait Aggregator: Default {
70    /// Returns the unique type identifier for this aggregate (e.g., "myapp/Account")
71    fn aggregator_type() -> &'static str;
72}
73
74/// Trait for event types.
75///
76/// Events represent state changes that have occurred. Each event type
77/// has a name and belongs to an aggregator type.
78///
79/// This trait is typically derived using the `#[evento::aggregator]` macro.
80///
81/// # Example
82///
83/// ```rust,ignore
84/// #[evento::aggregator("myapp/Account")]
85/// #[derive(bitcode::Encode, bitcode::Decode)]
86/// pub struct AccountOpened {
87///     pub owner: String,
88/// }
89/// ```
90pub trait AggregatorEvent: Aggregator {
91    /// Returns the event name (e.g., "AccountOpened")
92    fn event_name() -> &'static str;
93}
94
95/// Builder for creating and committing events.
96///
97/// Use [`create()`] or [`aggregator()`] to create an instance, then chain
98/// method calls to add events and metadata before committing.
99///
100/// # Optimistic Concurrency
101///
102/// If `original_version` is 0 (default for new aggregates), the builder
103/// queries the current version before writing. Otherwise, it uses the
104/// provided version for optimistic concurrency control.
105///
106/// # Example
107///
108/// ```rust,ignore
109/// // New aggregate
110/// let id = create()
111///     .event(&MyEvent { ... })
112///     .commit(&executor)
113///     .await?;
114///
115/// // Existing aggregate with version check
116/// aggregator(&id)
117///     .original_version(5)
118///     .event(&AnotherEvent { ... })
119///     .commit(&executor)
120///     .await?;
121/// ```
122#[derive(Clone)]
123pub struct AggregatorBuilder {
124    aggregator_id: String,
125    aggregator_type: String,
126    routing_key: Option<String>,
127    routing_key_locked: bool,
128    original_version: u16,
129    data: Vec<(&'static str, Vec<u8>)>,
130    metadata: Metadata,
131}
132
133impl AggregatorBuilder {
134    /// Creates a new builder for the given aggregate ID.
135    pub fn new(aggregator_id: impl Into<String>) -> AggregatorBuilder {
136        AggregatorBuilder {
137            aggregator_id: aggregator_id.into(),
138            aggregator_type: "".to_owned(),
139            routing_key: None,
140            routing_key_locked: false,
141            original_version: 0,
142            data: Vec::default(),
143            metadata: Default::default(),
144        }
145    }
146
147    /// Sets the expected version for optimistic concurrency control.
148    ///
149    /// If the aggregate's current version doesn't match, the commit will fail
150    /// with [`WriteError::InvalidOriginalVersion`].
151    pub fn original_version(&mut self, v: u16) -> &mut Self {
152        self.original_version = v;
153
154        self
155    }
156
157    /// Sets the routing key for event distribution.
158    ///
159    /// The routing key is used for partitioning events across consumers.
160    /// Once set, subsequent calls are ignored (locked behavior).
161    pub fn routing_key(&mut self, v: impl Into<String>) -> &mut Self {
162        self.routing_key_opt(Some(v.into()))
163    }
164
165    /// Sets an optional routing key for event distribution.
166    ///
167    /// Pass `None` to explicitly clear the routing key.
168    /// Once set, subsequent calls are ignored (locked behavior).
169    pub fn routing_key_opt(&mut self, v: Option<String>) -> &mut Self {
170        if !self.routing_key_locked {
171            self.routing_key = v;
172            self.routing_key_locked = true;
173        }
174
175        self
176    }
177
178    /// Sets the metadata to attach to all events.
179    ///
180    /// Metadata is serialized using bitcode and stored alongside each event.
181    pub fn metadata<M>(&mut self, key: impl Into<String>, value: &M) -> &mut Self
182    where
183        M: bitcode::Encode,
184    {
185        self.metadata.insert_enc(key, value);
186        self
187    }
188
189    pub fn requested_by(&mut self, value: impl Into<String>) -> &mut Self {
190        self.metadata.set_requested_by(value);
191        self
192    }
193
194    pub fn requested_as(&mut self, value: impl Into<String>) -> &mut Self {
195        self.metadata.set_requested_as(value);
196        self
197    }
198
199    /// Sets the metadata to attach to all events.
200    ///
201    /// Metadata is serialized using bitcode and stored alongside each event.
202    pub fn metadata_from(&mut self, value: impl Into<Metadata>) -> &mut Self {
203        self.metadata = value.into();
204        self
205    }
206
207    /// Adds an event to be committed.
208    ///
209    /// Multiple events can be added and will be committed atomically.
210    /// The event data is serialized using bitcode.
211    pub fn event<D>(&mut self, v: &D) -> &mut Self
212    where
213        D: AggregatorEvent + bitcode::Encode,
214    {
215        self.data.push((D::event_name(), bitcode::encode(v)));
216        self.aggregator_type = D::aggregator_type().to_owned();
217        self
218    }
219
220    /// Commits all added events to the executor.
221    ///
222    /// Returns the aggregate ID on success.
223    ///
224    /// # Errors
225    ///
226    /// - [`WriteError::MissingData`] - No events were added
227    /// - [`WriteError::InvalidOriginalVersion`] - Version conflict occurred
228    /// - [`WriteError::Unknown`] - Executor error
229    pub async fn commit<E: Executor>(&self, executor: &E) -> Result<String, WriteError> {
230        let first_event = executor
231            .read(
232                Some(vec![ReadAggregator::id(
233                    &self.aggregator_type,
234                    &self.aggregator_id,
235                )]),
236                None,
237                Args::forward(1, None),
238            )
239            .await
240            .map_err(WriteError::Unknown)?;
241
242        let routing_key = match first_event.edges.first() {
243            Some(event) => event.node.routing_key.to_owned(),
244            _ => self.routing_key.to_owned(),
245        };
246
247        let mut version = self.original_version;
248        let mut events = vec![];
249        let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
250
251        for (name, data) in &self.data {
252            version += 1;
253
254            let event = Event {
255                id: Ulid::new(),
256                name: name.to_string(),
257                data: data.to_vec(),
258                metadata: self.metadata.clone(),
259                timestamp: now.as_secs(),
260                timestamp_subsec: now.subsec_millis(),
261                aggregator_id: self.aggregator_id.to_owned(),
262                aggregator_type: self.aggregator_type.to_owned(),
263                version,
264                routing_key: routing_key.to_owned(),
265            };
266
267            events.push(event);
268        }
269
270        if events.is_empty() {
271            return Err(WriteError::MissingData);
272        }
273
274        executor.write(events).await?;
275
276        Ok(self.aggregator_id.to_owned())
277    }
278}
279
280/// Creates a new aggregate with an auto-generated ULID.
281///
282/// # Example
283///
284/// ```rust,ignore
285/// let id = create()
286///     .event(&AccountOpened { ... })
287///     .commit(&executor)
288///     .await?;
289/// ```
290pub fn create() -> AggregatorBuilder {
291    AggregatorBuilder::new(Ulid::new())
292}
293
294/// Creates a builder for an existing aggregate.
295///
296/// # Example
297///
298/// ```rust,ignore
299/// aggregator(&existing_id)
300///     .original_version(current_version)
301///     .event(&MoneyDeposited { amount: 100 })
302///     .commit(&executor)
303///     .await?;
304/// ```
305pub fn aggregator(id: impl Into<String>) -> AggregatorBuilder {
306    AggregatorBuilder::new(id)
307}
308
309pub trait AggregatorExecutor<E: Executor> {
310    fn has_event<A: AggregatorEvent>(
311        &self,
312        id: impl Into<String>,
313    ) -> impl std::future::Future<Output = anyhow::Result<bool>> + Send;
314}
315
316impl<E: Executor> AggregatorExecutor<E> for E {
317    fn has_event<A: AggregatorEvent>(
318        &self,
319        id: impl Into<String>,
320    ) -> impl std::future::Future<Output = anyhow::Result<bool>> + Send {
321        let id = id.into();
322        Box::pin(async {
323            let result = self
324                .read(
325                    Some(vec![ReadAggregator::new(
326                        A::aggregator_type(),
327                        id,
328                        A::event_name(),
329                    )]),
330                    None,
331                    Args::backward(1, None),
332                )
333                .await?;
334
335            Ok(!result.edges.is_empty())
336        })
337    }
338}