evento_core/
aggregator.rs

1//! Event creation and committing.
2//!
3//! This module provides the [`AggregatorBuilder`] for creating and persisting events.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use evento::{create, aggregator};
9//!
10//! // Create a new aggregate with auto-generated ID
11//! let id = create()
12//!     .event(&AccountOpened { owner: "John".into() })
13//!     .metadata(&metadata)
14//!     .routing_key("accounts")
15//!     .commit(&executor)
16//!     .await?;
17//!
18//! // Add events to existing aggregate
19//! aggregator(&existing_id)
20//!     .original_version(current_version)
21//!     .event(&MoneyDeposited { amount: 100 })
22//!     .commit(&executor)
23//!     .await?;
24//! ```
25
26use std::time::{SystemTime, UNIX_EPOCH};
27
28use thiserror::Error;
29use ulid::Ulid;
30
31use crate::{cursor::Args, Event, Executor, ReadAggregator};
32
33/// Errors that can occur when writing events.
34#[derive(Debug, Error)]
35pub enum WriteError {
36    /// Version conflict - another event was written concurrently
37    #[error("invalid original version")]
38    InvalidOriginalVersion,
39
40    /// Attempted to commit without adding any events
41    #[error("trying to commit event without data")]
42    MissingData,
43
44    /// Unknown error from the executor
45    #[error("{0}")]
46    Unknown(#[from] anyhow::Error),
47
48    /// System time error
49    #[error("systemtime >> {0}")]
50    SystemTime(#[from] std::time::SystemTimeError),
51}
52
53/// Builder for creating and committing events.
54///
55/// Use [`create()`] or [`aggregator()`] to create an instance, then chain
56/// method calls to add events and metadata before committing.
57///
58/// # Optimistic Concurrency
59///
60/// If `original_version` is 0 (default for new aggregates), the builder
61/// queries the current version before writing. Otherwise, it uses the
62/// provided version for optimistic concurrency control.
63///
64/// # Example
65///
66/// ```rust,ignore
67/// // New aggregate
68/// let id = create()
69///     .event(&MyEvent { ... })
70///     .commit(&executor)
71///     .await?;
72///
73/// // Existing aggregate with version check
74/// aggregator(&id)
75///     .original_version(5)
76///     .event(&AnotherEvent { ... })
77///     .commit(&executor)
78///     .await?;
79/// ```
80pub struct AggregatorBuilder {
81    aggregator_id: String,
82    aggregator_type: String,
83    routing_key: Option<String>,
84    routing_key_locked: bool,
85    original_version: u16,
86    data: Vec<(&'static str, Vec<u8>)>,
87    metadata: Option<Vec<u8>>,
88}
89
90impl AggregatorBuilder {
91    pub fn new(aggregator_id: impl Into<String>) -> AggregatorBuilder {
92        AggregatorBuilder {
93            aggregator_id: aggregator_id.into(),
94            aggregator_type: "".to_owned(),
95            routing_key: None,
96            routing_key_locked: false,
97            original_version: 0,
98            data: Vec::default(),
99            metadata: None,
100        }
101    }
102
103    pub fn original_version(mut self, v: u16) -> Self {
104        self.original_version = v;
105
106        self
107    }
108
109    pub fn routing_key(self, v: impl Into<String>) -> Self {
110        self.routing_key_opt(Some(v.into()))
111    }
112
113    pub fn routing_key_opt(mut self, v: Option<String>) -> Self {
114        if !self.routing_key_locked {
115            self.routing_key = v;
116            self.routing_key_locked = true;
117        }
118
119        self
120    }
121
122    pub fn metadata<M>(mut self, v: &M) -> Self
123    where
124        M: bitcode::Encode,
125    {
126        self.metadata = Some(bitcode::encode(v));
127        self
128    }
129
130    pub fn event<D>(mut self, v: &D) -> Self
131    where
132        D: crate::projection::Event + bitcode::Encode,
133    {
134        self.data.push((D::event_name(), bitcode::encode(v)));
135        self.aggregator_type = D::aggregator_type().to_owned();
136        self
137    }
138
139    pub async fn commit<E: Executor>(&self, executor: &E) -> Result<String, WriteError> {
140        let (mut version, routing_key) = if self.original_version == 0 {
141            let events = executor
142                .read(
143                    Some(vec![ReadAggregator::id(
144                        &self.aggregator_type,
145                        &self.aggregator_id,
146                    )]),
147                    None,
148                    Args::backward(1, None),
149                )
150                .await
151                .map_err(WriteError::Unknown)?;
152
153            match events.edges.first() {
154                Some(event) => (event.node.version, event.node.routing_key.to_owned()),
155                _ => (self.original_version, self.routing_key.to_owned()),
156            }
157        } else {
158            (self.original_version, self.routing_key.to_owned())
159        };
160
161        let metadata = self
162            .metadata
163            .to_owned()
164            .unwrap_or_else(|| bitcode::encode(&true));
165
166        let mut events = vec![];
167        let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
168
169        for (name, data) in &self.data {
170            version += 1;
171
172            let event = Event {
173                id: Ulid::new(),
174                name: name.to_string(),
175                data: data.to_vec(),
176                metadata: metadata.to_vec(),
177                timestamp: now.as_secs(),
178                timestamp_subsec: now.subsec_millis(),
179                aggregator_id: self.aggregator_id.to_owned(),
180                aggregator_type: self.aggregator_type.to_owned(),
181                version,
182                routing_key: routing_key.to_owned(),
183            };
184
185            events.push(event);
186        }
187
188        if events.is_empty() {
189            return Err(WriteError::MissingData);
190        }
191
192        executor.write(events).await?;
193
194        Ok(self.aggregator_id.to_owned())
195    }
196}
197
198/// Creates a new aggregate with an auto-generated ULID.
199///
200/// # Example
201///
202/// ```rust,ignore
203/// let id = create()
204///     .event(&AccountOpened { ... })
205///     .commit(&executor)
206///     .await?;
207/// ```
208pub fn create() -> AggregatorBuilder {
209    AggregatorBuilder::new(Ulid::new())
210}
211
212/// Creates a builder for an existing aggregate.
213///
214/// # Example
215///
216/// ```rust,ignore
217/// aggregator(&existing_id)
218///     .original_version(current_version)
219///     .event(&MoneyDeposited { amount: 100 })
220///     .commit(&executor)
221///     .await?;
222/// ```
223pub fn aggregator(id: impl Into<String>) -> AggregatorBuilder {
224    AggregatorBuilder::new(id)
225}