evento_core/
aggregator.rs1use std::time::{SystemTime, UNIX_EPOCH};
27
28use thiserror::Error;
29use ulid::Ulid;
30
31use crate::{cursor::Args, Event, Executor, ReadAggregator};
32
33#[derive(Debug, Error)]
35pub enum WriteError {
36 #[error("invalid original version")]
38 InvalidOriginalVersion,
39
40 #[error("trying to commit event without data")]
42 MissingData,
43
44 #[error("{0}")]
46 Unknown(#[from] anyhow::Error),
47
48 #[error("rkyv.encode >> {0}")]
50 RkyvEncode(String),
51
52 #[error("systemtime >> {0}")]
54 SystemTime(#[from] std::time::SystemTimeError),
55}
56
57pub struct AggregatorBuilder {
85 aggregator_id: String,
86 aggregator_type: String,
87 routing_key: Option<String>,
88 routing_key_locked: bool,
89 original_version: u16,
90 data: Vec<(&'static str, Vec<u8>)>,
91 metadata: Option<Vec<u8>>,
92}
93
94impl AggregatorBuilder {
95 pub fn new(aggregator_id: impl Into<String>) -> AggregatorBuilder {
96 AggregatorBuilder {
97 aggregator_id: aggregator_id.into(),
98 aggregator_type: "".to_owned(),
99 routing_key: None,
100 routing_key_locked: false,
101 original_version: 0,
102 data: Vec::default(),
103 metadata: None,
104 }
105 }
106
107 pub fn original_version(mut self, v: u16) -> Self {
108 self.original_version = v;
109
110 self
111 }
112
113 pub fn routing_key(self, v: impl Into<String>) -> Self {
114 self.routing_key_opt(Some(v.into()))
115 }
116
117 pub fn routing_key_opt(mut self, v: Option<String>) -> Self {
118 if !self.routing_key_locked {
119 self.routing_key = v;
120 self.routing_key_locked = true;
121 }
122
123 self
124 }
125
126 pub fn metadata<M>(mut self, v: &M) -> Result<Self, WriteError>
127 where
128 M: for<'a> rkyv::Serialize<
129 rkyv::rancor::Strategy<
130 rkyv::ser::Serializer<
131 rkyv::util::AlignedVec,
132 rkyv::ser::allocator::ArenaHandle<'a>,
133 rkyv::ser::sharing::Share,
134 >,
135 rkyv::rancor::Error,
136 >,
137 >,
138 {
139 let metadata = rkyv::to_bytes::<rkyv::rancor::Error>(v)
140 .map_err(|e| WriteError::RkyvEncode(e.to_string()))?;
141 self.metadata = Some(metadata.to_vec());
142
143 Ok(self)
144 }
145
146 pub fn event<D>(mut self, v: &D) -> Result<Self, WriteError>
147 where
148 D: crate::projection::Event
149 + for<'a> rkyv::Serialize<
150 rkyv::rancor::Strategy<
151 rkyv::ser::Serializer<
152 rkyv::util::AlignedVec,
153 rkyv::ser::allocator::ArenaHandle<'a>,
154 rkyv::ser::sharing::Share,
155 >,
156 rkyv::rancor::Error,
157 >,
158 >,
159 {
160 let data = rkyv::to_bytes::<rkyv::rancor::Error>(v)
161 .map_err(|e| WriteError::RkyvEncode(e.to_string()))?;
162 self.data.push((D::event_name(), data.to_vec()));
163 self.aggregator_type = D::aggregator_type().to_owned();
164
165 Ok(self)
166 }
167
168 pub async fn commit<E: Executor>(&self, executor: &E) -> Result<String, WriteError> {
169 let (mut version, routing_key) = if self.original_version == 0 {
170 let events = executor
171 .read(
172 Some(vec![ReadAggregator::id(
173 &self.aggregator_type,
174 &self.aggregator_id,
175 )]),
176 None,
177 Args::backward(1, None),
178 )
179 .await
180 .map_err(WriteError::Unknown)?;
181
182 match events.edges.first() {
183 Some(event) => (event.node.version, event.node.routing_key.to_owned()),
184 _ => (self.original_version, self.routing_key.to_owned()),
185 }
186 } else {
187 (self.original_version, self.routing_key.to_owned())
188 };
189
190 let metadata = self.metadata.to_owned().unwrap_or_else(|| {
191 rkyv::to_bytes::<rkyv::rancor::Error>(&true)
192 .expect("Should never fail")
193 .to_vec()
194 });
195
196 let mut events = vec![];
197 let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
198
199 for (name, data) in &self.data {
200 version += 1;
201
202 let event = Event {
203 id: Ulid::new(),
204 name: name.to_string(),
205 data: data.to_vec(),
206 metadata: metadata.to_vec(),
207 timestamp: now.as_secs(),
208 timestamp_subsec: now.subsec_millis(),
209 aggregator_id: self.aggregator_id.to_owned(),
210 aggregator_type: self.aggregator_type.to_owned(),
211 version,
212 routing_key: routing_key.to_owned(),
213 };
214
215 events.push(event);
216 }
217
218 if events.is_empty() {
219 return Err(WriteError::MissingData);
220 }
221
222 executor.write(events).await?;
223
224 Ok(self.aggregator_id.to_owned())
225 }
226}
227
228pub fn create() -> AggregatorBuilder {
239 AggregatorBuilder::new(Ulid::new())
240}
241
242pub fn aggregator(id: impl Into<String>) -> AggregatorBuilder {
254 AggregatorBuilder::new(id)
255}