evento_core/aggregator.rs
1//! Event creation and committing.
2//!
3//! This module provides the [`AggregatorBuilder`] for creating and persisting events.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use evento::{create, aggregator};
9//!
10//! // Create a new aggregate with auto-generated ID
11//! let id = create()
12//! .event(&AccountOpened { owner: "John".into() })
13//! .metadata(&metadata)
14//! .routing_key("accounts")
15//! .commit(&executor)
16//! .await?;
17//!
18//! // Add events to existing aggregate
19//! aggregator(&existing_id)
20//! .original_version(current_version)
21//! .event(&MoneyDeposited { amount: 100 })
22//! .commit(&executor)
23//! .await?;
24//! ```
25
26use sha3::{Digest, Sha3_256};
27use std::time::{SystemTime, UNIX_EPOCH};
28use thiserror::Error;
29use ulid::Ulid;
30
31use crate::{cursor::Args, metadata::Metadata, Event, Executor, ReadAggregator};
32
33/// Creates a new builder for the given aggregate IDs.
34pub fn hash_ids(ids: Vec<impl Into<String>>) -> String {
35 let mut hasher = Sha3_256::new();
36 for id in ids {
37 hasher.update(id.into());
38 }
39
40 hex::encode(hasher.finalize())
41}
42
43/// Errors that can occur when writing events.
44#[derive(Debug, Error)]
45pub enum WriteError {
46 /// Version conflict - another event was written concurrently
47 #[error("invalid original version")]
48 InvalidOriginalVersion,
49
50 /// Attempted to commit without adding any events
51 #[error("trying to commit event without data")]
52 MissingData,
53
54 /// Unknown error from the executor
55 #[error("{0}")]
56 Unknown(#[from] anyhow::Error),
57
58 /// System time error
59 #[error("systemtime >> {0}")]
60 SystemTime(#[from] std::time::SystemTimeError),
61}
62
63/// Trait for aggregate types.
64///
65/// Aggregates are the root entities in event sourcing. Each aggregate
66/// type has a unique identifier string used for event storage and routing.
67///
68/// This trait is typically derived using the `#[evento::aggregator]` macro.
69///
70/// # Example
71///
72/// ```rust,ignore
73/// #[evento::aggregator("myapp/Account")]
74/// #[derive(Default)]
75/// pub struct Account {
76/// pub balance: i64,
77/// pub owner: String,
78/// }
79/// ```
80pub trait Aggregator: Default {
81 /// Returns the unique type identifier for this aggregate (e.g., "myapp/Account")
82 fn aggregator_type() -> &'static str;
83}
84
85/// Trait for event types.
86///
87/// Events represent state changes that have occurred. Each event type
88/// has a name and belongs to an aggregator type.
89///
90/// This trait is typically derived using the `#[evento::aggregator]` macro.
91///
92/// # Example
93///
94/// ```rust,ignore
95/// #[evento::aggregator("myapp/Account")]
96/// #[derive(bitcode::Encode, bitcode::Decode)]
97/// pub struct AccountOpened {
98/// pub owner: String,
99/// }
100/// ```
101pub trait AggregatorEvent: Aggregator {
102 /// Returns the event name (e.g., "AccountOpened")
103 fn event_name() -> &'static str;
104}
105
106/// Builder for creating and committing events.
107///
108/// Use [`create()`] or [`aggregator()`] to create an instance, then chain
109/// method calls to add events and metadata before committing.
110///
111/// # Optimistic Concurrency
112///
113/// If `original_version` is 0 (default for new aggregates), the builder
114/// queries the current version before writing. Otherwise, it uses the
115/// provided version for optimistic concurrency control.
116///
117/// # Example
118///
119/// ```rust,ignore
120/// // New aggregate
121/// let id = create()
122/// .event(&MyEvent { ... })
123/// .commit(&executor)
124/// .await?;
125///
126/// // Existing aggregate with version check
127/// aggregator(&id)
128/// .original_version(5)
129/// .event(&AnotherEvent { ... })
130/// .commit(&executor)
131/// .await?;
132/// ```
133#[derive(Clone)]
134pub struct AggregatorBuilder {
135 aggregator_id: String,
136 aggregator_type: String,
137 routing_key: Option<String>,
138 routing_key_locked: bool,
139 original_version: u16,
140 data: Vec<(&'static str, Vec<u8>)>,
141 metadata: Metadata,
142}
143
144impl AggregatorBuilder {
145 /// Creates a new builder for the given aggregate ID.
146 pub fn new(aggregator_id: impl Into<String>) -> AggregatorBuilder {
147 AggregatorBuilder {
148 aggregator_id: aggregator_id.into(),
149 aggregator_type: "".to_owned(),
150 routing_key: None,
151 routing_key_locked: false,
152 original_version: 0,
153 data: Vec::default(),
154 metadata: Default::default(),
155 }
156 }
157
158 /// Creates a new builder for the given aggregate IDs.
159 pub fn ids(ids: Vec<impl Into<String>>) -> AggregatorBuilder {
160 Self::new(hash_ids(ids))
161 }
162
163 /// Sets the expected version for optimistic concurrency control.
164 ///
165 /// If the aggregate's current version doesn't match, the commit will fail
166 /// with [`WriteError::InvalidOriginalVersion`].
167 pub fn original_version(&mut self, v: u16) -> &mut Self {
168 self.original_version = v;
169
170 self
171 }
172
173 /// Sets the routing key for event distribution.
174 ///
175 /// The routing key is used for partitioning events across consumers.
176 /// Once set, subsequent calls are ignored (locked behavior).
177 pub fn routing_key(&mut self, v: impl Into<String>) -> &mut Self {
178 self.routing_key_opt(Some(v.into()))
179 }
180
181 /// Sets an optional routing key for event distribution.
182 ///
183 /// Pass `None` to explicitly clear the routing key.
184 /// Once set, subsequent calls are ignored (locked behavior).
185 pub fn routing_key_opt(&mut self, v: Option<String>) -> &mut Self {
186 if !self.routing_key_locked {
187 self.routing_key = v;
188 self.routing_key_locked = true;
189 }
190
191 self
192 }
193
194 /// Sets the metadata to attach to all events.
195 ///
196 /// Metadata is serialized using bitcode and stored alongside each event.
197 pub fn metadata<M>(&mut self, key: impl Into<String>, value: &M) -> &mut Self
198 where
199 M: bitcode::Encode,
200 {
201 self.metadata.insert_enc(key, value);
202 self
203 }
204
205 pub fn requested_by(&mut self, value: impl Into<String>) -> &mut Self {
206 self.metadata.set_requested_by(value);
207 self
208 }
209
210 pub fn requested_as(&mut self, value: impl Into<String>) -> &mut Self {
211 self.metadata.set_requested_as(value);
212 self
213 }
214
215 /// Sets the metadata to attach to all events.
216 ///
217 /// Metadata is serialized using bitcode and stored alongside each event.
218 pub fn metadata_from(&mut self, value: impl Into<Metadata>) -> &mut Self {
219 self.metadata = value.into();
220 self
221 }
222
223 /// Adds an event to be committed.
224 ///
225 /// Multiple events can be added and will be committed atomically.
226 /// The event data is serialized using bitcode.
227 pub fn event<D>(&mut self, v: &D) -> &mut Self
228 where
229 D: AggregatorEvent + bitcode::Encode,
230 {
231 self.data.push((D::event_name(), bitcode::encode(v)));
232 self.aggregator_type = D::aggregator_type().to_owned();
233 self
234 }
235
236 /// Commits all added events to the executor.
237 ///
238 /// Returns the aggregate ID on success.
239 ///
240 /// # Errors
241 ///
242 /// - [`WriteError::MissingData`] - No events were added
243 /// - [`WriteError::InvalidOriginalVersion`] - Version conflict occurred
244 /// - [`WriteError::Unknown`] - Executor error
245 pub async fn commit<E: Executor>(&self, executor: &E) -> Result<String, WriteError> {
246 let first_event = executor
247 .read(
248 Some(vec![ReadAggregator::id(
249 &self.aggregator_type,
250 &self.aggregator_id,
251 )]),
252 None,
253 Args::forward(1, None),
254 )
255 .await
256 .map_err(WriteError::Unknown)?;
257
258 let routing_key = match first_event.edges.first() {
259 Some(event) => event.node.routing_key.to_owned(),
260 _ => self.routing_key.to_owned(),
261 };
262
263 let mut version = self.original_version;
264 let mut events = vec![];
265 let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
266
267 for (name, data) in &self.data {
268 version += 1;
269
270 let event = Event {
271 id: Ulid::new(),
272 name: name.to_string(),
273 data: data.to_vec(),
274 metadata: self.metadata.clone(),
275 timestamp: now.as_secs(),
276 timestamp_subsec: now.subsec_millis(),
277 aggregator_id: self.aggregator_id.to_owned(),
278 aggregator_type: self.aggregator_type.to_owned(),
279 version,
280 routing_key: routing_key.to_owned(),
281 };
282
283 events.push(event);
284 }
285
286 if events.is_empty() {
287 return Err(WriteError::MissingData);
288 }
289
290 executor.write(events).await?;
291
292 Ok(self.aggregator_id.to_owned())
293 }
294}
295
296/// Creates a new aggregate with an auto-generated ULID.
297///
298/// # Example
299///
300/// ```rust,ignore
301/// let id = create()
302/// .event(&AccountOpened { ... })
303/// .commit(&executor)
304/// .await?;
305/// ```
306pub fn create() -> AggregatorBuilder {
307 AggregatorBuilder::new(Ulid::new())
308}
309
310/// Creates a builder for an existing aggregate.
311///
312/// # Example
313///
314/// ```rust,ignore
315/// aggregator(&existing_id)
316/// .original_version(current_version)
317/// .event(&MoneyDeposited { amount: 100 })
318/// .commit(&executor)
319/// .await?;
320/// ```
321pub fn aggregator(id: impl Into<String>) -> AggregatorBuilder {
322 AggregatorBuilder::new(id)
323}
324
325pub trait AggregatorExecutor<E: Executor> {
326 fn has_event<A: AggregatorEvent>(
327 &self,
328 id: impl Into<String>,
329 ) -> impl std::future::Future<Output = anyhow::Result<bool>> + Send;
330
331 fn original_version<A: AggregatorEvent>(
332 &self,
333 id: impl Into<String>,
334 ) -> impl std::future::Future<Output = anyhow::Result<Option<u16>>> + Send;
335}
336
337impl<E: Executor> AggregatorExecutor<E> for E {
338 fn has_event<A: AggregatorEvent>(
339 &self,
340 id: impl Into<String>,
341 ) -> impl std::future::Future<Output = anyhow::Result<bool>> + Send {
342 let id = id.into();
343 Box::pin(async {
344 let result = self
345 .read(
346 Some(vec![ReadAggregator::new(
347 A::aggregator_type(),
348 id,
349 A::event_name(),
350 )]),
351 None,
352 Args::backward(1, None),
353 )
354 .await?;
355
356 Ok(!result.edges.is_empty())
357 })
358 }
359
360 fn original_version<A: AggregatorEvent>(
361 &self,
362 id: impl Into<String>,
363 ) -> impl std::future::Future<Output = anyhow::Result<Option<u16>>> + Send {
364 let id = id.into();
365 Box::pin(async {
366 let result = self
367 .read(
368 Some(vec![ReadAggregator::id(A::aggregator_type(), id)]),
369 None,
370 Args::backward(1, None),
371 )
372 .await?;
373
374 Ok(result.edges.first().map(|e| e.node.version))
375 })
376 }
377}