evento_core/aggregator.rs
1//! Event creation and committing.
2//!
3//! This module provides the [`AggregatorBuilder`] for creating and persisting events.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use evento::{create, aggregator};
9//!
10//! // Create a new aggregate with auto-generated ID
11//! let id = create()
12//! .event(&AccountOpened { owner: "John".into() })
13//! .metadata(&metadata)
14//! .routing_key("accounts")
15//! .commit(&executor)
16//! .await?;
17//!
18//! // Add events to existing aggregate
19//! aggregator(&existing_id)
20//! .original_version(current_version)
21//! .event(&MoneyDeposited { amount: 100 })
22//! .commit(&executor)
23//! .await?;
24//! ```
25
26use std::time::{SystemTime, UNIX_EPOCH};
27use thiserror::Error;
28use ulid::Ulid;
29
30use crate::{cursor::Args, metadata::Metadata, Event, Executor, ReadAggregator};
31
32/// Errors that can occur when writing events.
33#[derive(Debug, Error)]
34pub enum WriteError {
35 /// Version conflict - another event was written concurrently
36 #[error("invalid original version")]
37 InvalidOriginalVersion,
38
39 /// Attempted to commit without adding any events
40 #[error("trying to commit event without data")]
41 MissingData,
42
43 /// Unknown error from the executor
44 #[error("{0}")]
45 Unknown(#[from] anyhow::Error),
46
47 /// System time error
48 #[error("systemtime >> {0}")]
49 SystemTime(#[from] std::time::SystemTimeError),
50}
51
52/// Trait for aggregate types.
53///
54/// Aggregates are the root entities in event sourcing. Each aggregate
55/// type has a unique identifier string used for event storage and routing.
56///
57/// This trait is typically derived using the `#[evento::aggregator]` macro.
58///
59/// # Example
60///
61/// ```rust,ignore
62/// #[evento::aggregator("myapp/Account")]
63/// #[derive(Default)]
64/// pub struct Account {
65/// pub balance: i64,
66/// pub owner: String,
67/// }
68/// ```
69pub trait Aggregator: Default {
70 /// Returns the unique type identifier for this aggregate (e.g., "myapp/Account")
71 fn aggregator_type() -> &'static str;
72}
73
74/// Trait for event types.
75///
76/// Events represent state changes that have occurred. Each event type
77/// has a name and belongs to an aggregator type.
78///
79/// This trait is typically derived using the `#[evento::aggregator]` macro.
80///
81/// # Example
82///
83/// ```rust,ignore
84/// #[evento::aggregator("myapp/Account")]
85/// #[derive(bitcode::Encode, bitcode::Decode)]
86/// pub struct AccountOpened {
87/// pub owner: String,
88/// }
89/// ```
90pub trait AggregatorEvent: Aggregator {
91 /// Returns the event name (e.g., "AccountOpened")
92 fn event_name() -> &'static str;
93}
94
95/// Builder for creating and committing events.
96///
97/// Use [`create()`] or [`aggregator()`] to create an instance, then chain
98/// method calls to add events and metadata before committing.
99///
100/// # Optimistic Concurrency
101///
102/// If `original_version` is 0 (default for new aggregates), the builder
103/// queries the current version before writing. Otherwise, it uses the
104/// provided version for optimistic concurrency control.
105///
106/// # Example
107///
108/// ```rust,ignore
109/// // New aggregate
110/// let id = create()
111/// .event(&MyEvent { ... })
112/// .commit(&executor)
113/// .await?;
114///
115/// // Existing aggregate with version check
116/// aggregator(&id)
117/// .original_version(5)
118/// .event(&AnotherEvent { ... })
119/// .commit(&executor)
120/// .await?;
121/// ```
122#[derive(Clone)]
123pub struct AggregatorBuilder {
124 aggregator_id: String,
125 aggregator_type: String,
126 routing_key: Option<String>,
127 routing_key_locked: bool,
128 original_version: u16,
129 data: Vec<(&'static str, Vec<u8>)>,
130 metadata: Metadata,
131}
132
133impl AggregatorBuilder {
134 /// Creates a new builder for the given aggregate ID.
135 pub fn new(aggregator_id: impl Into<String>) -> AggregatorBuilder {
136 AggregatorBuilder {
137 aggregator_id: aggregator_id.into(),
138 aggregator_type: "".to_owned(),
139 routing_key: None,
140 routing_key_locked: false,
141 original_version: 0,
142 data: Vec::default(),
143 metadata: Default::default(),
144 }
145 }
146
147 /// Sets the expected version for optimistic concurrency control.
148 ///
149 /// If the aggregate's current version doesn't match, the commit will fail
150 /// with [`WriteError::InvalidOriginalVersion`].
151 pub fn original_version(&mut self, v: u16) -> &mut Self {
152 self.original_version = v;
153
154 self
155 }
156
157 /// Sets the routing key for event distribution.
158 ///
159 /// The routing key is used for partitioning events across consumers.
160 /// Once set, subsequent calls are ignored (locked behavior).
161 pub fn routing_key(&mut self, v: impl Into<String>) -> &mut Self {
162 self.routing_key_opt(Some(v.into()))
163 }
164
165 /// Sets an optional routing key for event distribution.
166 ///
167 /// Pass `None` to explicitly clear the routing key.
168 /// Once set, subsequent calls are ignored (locked behavior).
169 pub fn routing_key_opt(&mut self, v: Option<String>) -> &mut Self {
170 if !self.routing_key_locked {
171 self.routing_key = v;
172 self.routing_key_locked = true;
173 }
174
175 self
176 }
177
178 /// Sets the metadata to attach to all events.
179 ///
180 /// Metadata is serialized using bitcode and stored alongside each event.
181 pub fn metadata<M>(&mut self, key: impl Into<String>, value: &M) -> &mut Self
182 where
183 M: bitcode::Encode,
184 {
185 self.metadata.insert_enc(key, value);
186 self
187 }
188
189 pub fn requested_by(&mut self, value: impl Into<String>) -> &mut Self {
190 self.metadata.set_requested_by(value);
191 self
192 }
193
194 pub fn requested_as(&mut self, value: impl Into<String>) -> &mut Self {
195 self.metadata.set_requested_as(value);
196 self
197 }
198
199 /// Sets the metadata to attach to all events.
200 ///
201 /// Metadata is serialized using bitcode and stored alongside each event.
202 pub fn metadata_from(&mut self, value: impl Into<Metadata>) -> &mut Self {
203 self.metadata = value.into();
204 self
205 }
206
207 /// Adds an event to be committed.
208 ///
209 /// Multiple events can be added and will be committed atomically.
210 /// The event data is serialized using bitcode.
211 pub fn event<D>(&mut self, v: &D) -> &mut Self
212 where
213 D: AggregatorEvent + bitcode::Encode,
214 {
215 self.data.push((D::event_name(), bitcode::encode(v)));
216 self.aggregator_type = D::aggregator_type().to_owned();
217 self
218 }
219
220 /// Commits all added events to the executor.
221 ///
222 /// Returns the aggregate ID on success.
223 ///
224 /// # Errors
225 ///
226 /// - [`WriteError::MissingData`] - No events were added
227 /// - [`WriteError::InvalidOriginalVersion`] - Version conflict occurred
228 /// - [`WriteError::Unknown`] - Executor error
229 pub async fn commit<E: Executor>(&self, executor: &E) -> Result<String, WriteError> {
230 let first_event = executor
231 .read(
232 Some(vec![ReadAggregator::id(
233 &self.aggregator_type,
234 &self.aggregator_id,
235 )]),
236 None,
237 Args::forward(1, None),
238 )
239 .await
240 .map_err(WriteError::Unknown)?;
241
242 let routing_key = match first_event.edges.first() {
243 Some(event) => event.node.routing_key.to_owned(),
244 _ => self.routing_key.to_owned(),
245 };
246
247 let mut version = self.original_version;
248 let mut events = vec![];
249 let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
250
251 for (name, data) in &self.data {
252 version += 1;
253
254 let event = Event {
255 id: Ulid::new(),
256 name: name.to_string(),
257 data: data.to_vec(),
258 metadata: self.metadata.clone(),
259 timestamp: now.as_secs(),
260 timestamp_subsec: now.subsec_millis(),
261 aggregator_id: self.aggregator_id.to_owned(),
262 aggregator_type: self.aggregator_type.to_owned(),
263 version,
264 routing_key: routing_key.to_owned(),
265 };
266
267 events.push(event);
268 }
269
270 if events.is_empty() {
271 return Err(WriteError::MissingData);
272 }
273
274 executor.write(events).await?;
275
276 Ok(self.aggregator_id.to_owned())
277 }
278}
279
280/// Creates a new aggregate with an auto-generated ULID.
281///
282/// # Example
283///
284/// ```rust,ignore
285/// let id = create()
286/// .event(&AccountOpened { ... })
287/// .commit(&executor)
288/// .await?;
289/// ```
290pub fn create() -> AggregatorBuilder {
291 AggregatorBuilder::new(Ulid::new())
292}
293
294/// Creates a builder for an existing aggregate.
295///
296/// # Example
297///
298/// ```rust,ignore
299/// aggregator(&existing_id)
300/// .original_version(current_version)
301/// .event(&MoneyDeposited { amount: 100 })
302/// .commit(&executor)
303/// .await?;
304/// ```
305pub fn aggregator(id: impl Into<String>) -> AggregatorBuilder {
306 AggregatorBuilder::new(id)
307}
308
309pub trait AggregatorExecutor<E: Executor> {
310 fn has_event<A: AggregatorEvent>(
311 &self,
312 id: impl Into<String>,
313 ) -> impl std::future::Future<Output = anyhow::Result<bool>> + Send;
314}
315
316impl<E: Executor> AggregatorExecutor<E> for E {
317 fn has_event<A: AggregatorEvent>(
318 &self,
319 id: impl Into<String>,
320 ) -> impl std::future::Future<Output = anyhow::Result<bool>> + Send {
321 let id = id.into();
322 Box::pin(async {
323 let result = self
324 .read(
325 Some(vec![ReadAggregator::new(
326 A::aggregator_type(),
327 id,
328 A::event_name(),
329 )]),
330 None,
331 Args::backward(1, None),
332 )
333 .await?;
334
335 Ok(!result.edges.is_empty())
336 })
337 }
338}