evento_core/
aggregator.rs

1//! Event creation and committing.
2//!
3//! This module provides the [`AggregatorBuilder`] for creating and persisting events.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use evento::{create, aggregator};
9//!
10//! // Create a new aggregate with auto-generated ID
11//! let id = create()
12//!     .event(&AccountOpened { owner: "John".into() })?
13//!     .metadata(&metadata)?
14//!     .routing_key("accounts")
15//!     .commit(&executor)
16//!     .await?;
17//!
18//! // Add events to existing aggregate
19//! aggregator(&existing_id)
20//!     .original_version(current_version)
21//!     .event(&MoneyDeposited { amount: 100 })?
22//!     .commit(&executor)
23//!     .await?;
24//! ```
25
26use std::time::{SystemTime, UNIX_EPOCH};
27
28use thiserror::Error;
29use ulid::Ulid;
30
31use crate::{cursor::Args, Event, Executor, ReadAggregator};
32
33/// Errors that can occur when writing events.
34#[derive(Debug, Error)]
35pub enum WriteError {
36    /// Version conflict - another event was written concurrently
37    #[error("invalid original version")]
38    InvalidOriginalVersion,
39
40    /// Attempted to commit without adding any events
41    #[error("trying to commit event without data")]
42    MissingData,
43
44    /// Unknown error from the executor
45    #[error("{0}")]
46    Unknown(#[from] anyhow::Error),
47
48    /// Failed to serialize event data with rkyv
49    #[error("rkyv.encode >> {0}")]
50    RkyvEncode(String),
51
52    /// System time error
53    #[error("systemtime >> {0}")]
54    SystemTime(#[from] std::time::SystemTimeError),
55}
56
57/// Builder for creating and committing events.
58///
59/// Use [`create()`] or [`aggregator()`] to create an instance, then chain
60/// method calls to add events and metadata before committing.
61///
62/// # Optimistic Concurrency
63///
64/// If `original_version` is 0 (default for new aggregates), the builder
65/// queries the current version before writing. Otherwise, it uses the
66/// provided version for optimistic concurrency control.
67///
68/// # Example
69///
70/// ```rust,ignore
71/// // New aggregate
72/// let id = create()
73///     .event(&MyEvent { ... })?
74///     .commit(&executor)
75///     .await?;
76///
77/// // Existing aggregate with version check
78/// aggregator(&id)
79///     .original_version(5)
80///     .event(&AnotherEvent { ... })?
81///     .commit(&executor)
82///     .await?;
83/// ```
84pub struct AggregatorBuilder {
85    aggregator_id: String,
86    aggregator_type: String,
87    routing_key: Option<String>,
88    routing_key_locked: bool,
89    original_version: u16,
90    data: Vec<(&'static str, Vec<u8>)>,
91    metadata: Option<Vec<u8>>,
92}
93
94impl AggregatorBuilder {
95    pub fn new(aggregator_id: impl Into<String>) -> AggregatorBuilder {
96        AggregatorBuilder {
97            aggregator_id: aggregator_id.into(),
98            aggregator_type: "".to_owned(),
99            routing_key: None,
100            routing_key_locked: false,
101            original_version: 0,
102            data: Vec::default(),
103            metadata: None,
104        }
105    }
106
107    pub fn original_version(mut self, v: u16) -> Self {
108        self.original_version = v;
109
110        self
111    }
112
113    pub fn routing_key(self, v: impl Into<String>) -> Self {
114        self.routing_key_opt(Some(v.into()))
115    }
116
117    pub fn routing_key_opt(mut self, v: Option<String>) -> Self {
118        if !self.routing_key_locked {
119            self.routing_key = v;
120            self.routing_key_locked = true;
121        }
122
123        self
124    }
125
126    pub fn metadata<M>(mut self, v: &M) -> Result<Self, WriteError>
127    where
128        M: for<'a> rkyv::Serialize<
129            rkyv::rancor::Strategy<
130                rkyv::ser::Serializer<
131                    rkyv::util::AlignedVec,
132                    rkyv::ser::allocator::ArenaHandle<'a>,
133                    rkyv::ser::sharing::Share,
134                >,
135                rkyv::rancor::Error,
136            >,
137        >,
138    {
139        let metadata = rkyv::to_bytes::<rkyv::rancor::Error>(v)
140            .map_err(|e| WriteError::RkyvEncode(e.to_string()))?;
141        self.metadata = Some(metadata.to_vec());
142
143        Ok(self)
144    }
145
146    pub fn event<D>(mut self, v: &D) -> Result<Self, WriteError>
147    where
148        D: crate::projection::Event
149            + for<'a> rkyv::Serialize<
150                rkyv::rancor::Strategy<
151                    rkyv::ser::Serializer<
152                        rkyv::util::AlignedVec,
153                        rkyv::ser::allocator::ArenaHandle<'a>,
154                        rkyv::ser::sharing::Share,
155                    >,
156                    rkyv::rancor::Error,
157                >,
158            >,
159    {
160        let data = rkyv::to_bytes::<rkyv::rancor::Error>(v)
161            .map_err(|e| WriteError::RkyvEncode(e.to_string()))?;
162        self.data.push((D::event_name(), data.to_vec()));
163        self.aggregator_type = D::aggregator_type().to_owned();
164
165        Ok(self)
166    }
167
168    pub async fn commit<E: Executor>(&self, executor: &E) -> Result<String, WriteError> {
169        let (mut version, routing_key) = if self.original_version == 0 {
170            let events = executor
171                .read(
172                    Some(vec![ReadAggregator::id(
173                        &self.aggregator_type,
174                        &self.aggregator_id,
175                    )]),
176                    None,
177                    Args::backward(1, None),
178                )
179                .await
180                .map_err(WriteError::Unknown)?;
181
182            match events.edges.first() {
183                Some(event) => (event.node.version, event.node.routing_key.to_owned()),
184                _ => (self.original_version, self.routing_key.to_owned()),
185            }
186        } else {
187            (self.original_version, self.routing_key.to_owned())
188        };
189
190        let metadata = self.metadata.to_owned().unwrap_or_else(|| {
191            rkyv::to_bytes::<rkyv::rancor::Error>(&true)
192                .expect("Should never fail")
193                .to_vec()
194        });
195
196        let mut events = vec![];
197        let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
198
199        for (name, data) in &self.data {
200            version += 1;
201
202            let event = Event {
203                id: Ulid::new(),
204                name: name.to_string(),
205                data: data.to_vec(),
206                metadata: metadata.to_vec(),
207                timestamp: now.as_secs(),
208                timestamp_subsec: now.subsec_millis(),
209                aggregator_id: self.aggregator_id.to_owned(),
210                aggregator_type: self.aggregator_type.to_owned(),
211                version,
212                routing_key: routing_key.to_owned(),
213            };
214
215            events.push(event);
216        }
217
218        if events.is_empty() {
219            return Err(WriteError::MissingData);
220        }
221
222        executor.write(events).await?;
223
224        Ok(self.aggregator_id.to_owned())
225    }
226}
227
228/// Creates a new aggregate with an auto-generated ULID.
229///
230/// # Example
231///
232/// ```rust,ignore
233/// let id = create()
234///     .event(&AccountOpened { ... })?
235///     .commit(&executor)
236///     .await?;
237/// ```
238pub fn create() -> AggregatorBuilder {
239    AggregatorBuilder::new(Ulid::new())
240}
241
242/// Creates a builder for an existing aggregate.
243///
244/// # Example
245///
246/// ```rust,ignore
247/// aggregator(&existing_id)
248///     .original_version(current_version)
249///     .event(&MoneyDeposited { amount: 100 })?
250///     .commit(&executor)
251///     .await?;
252/// ```
253pub fn aggregator(id: impl Into<String>) -> AggregatorBuilder {
254    AggregatorBuilder::new(id)
255}