evento_core/
aggregator.rs1use std::time::{SystemTime, UNIX_EPOCH};
27
28use thiserror::Error;
29use ulid::Ulid;
30
31use crate::{cursor::Args, Event, Executor, ReadAggregator};
32
33#[derive(Debug, Error)]
35pub enum WriteError {
36 #[error("invalid original version")]
38 InvalidOriginalVersion,
39
40 #[error("trying to commit event without data")]
42 MissingData,
43
44 #[error("{0}")]
46 Unknown(#[from] anyhow::Error),
47
48 #[error("systemtime >> {0}")]
50 SystemTime(#[from] std::time::SystemTimeError),
51}
52
53pub struct AggregatorBuilder {
81 aggregator_id: String,
82 aggregator_type: String,
83 routing_key: Option<String>,
84 routing_key_locked: bool,
85 original_version: u16,
86 data: Vec<(&'static str, Vec<u8>)>,
87 metadata: Option<Vec<u8>>,
88}
89
90impl AggregatorBuilder {
91 pub fn new(aggregator_id: impl Into<String>) -> AggregatorBuilder {
92 AggregatorBuilder {
93 aggregator_id: aggregator_id.into(),
94 aggregator_type: "".to_owned(),
95 routing_key: None,
96 routing_key_locked: false,
97 original_version: 0,
98 data: Vec::default(),
99 metadata: None,
100 }
101 }
102
103 pub fn original_version(mut self, v: u16) -> Self {
104 self.original_version = v;
105
106 self
107 }
108
109 pub fn routing_key(self, v: impl Into<String>) -> Self {
110 self.routing_key_opt(Some(v.into()))
111 }
112
113 pub fn routing_key_opt(mut self, v: Option<String>) -> Self {
114 if !self.routing_key_locked {
115 self.routing_key = v;
116 self.routing_key_locked = true;
117 }
118
119 self
120 }
121
122 pub fn metadata<M>(mut self, v: &M) -> Self
123 where
124 M: bitcode::Encode,
125 {
126 self.metadata = Some(bitcode::encode(v));
127 self
128 }
129
130 pub fn event<D>(mut self, v: &D) -> Self
131 where
132 D: crate::projection::Event + bitcode::Encode,
133 {
134 self.data.push((D::event_name(), bitcode::encode(v)));
135 self.aggregator_type = D::aggregator_type().to_owned();
136 self
137 }
138
139 pub async fn commit<E: Executor>(&self, executor: &E) -> Result<String, WriteError> {
140 let (mut version, routing_key) = if self.original_version == 0 {
141 let events = executor
142 .read(
143 Some(vec![ReadAggregator::id(
144 &self.aggregator_type,
145 &self.aggregator_id,
146 )]),
147 None,
148 Args::backward(1, None),
149 )
150 .await
151 .map_err(WriteError::Unknown)?;
152
153 match events.edges.first() {
154 Some(event) => (event.node.version, event.node.routing_key.to_owned()),
155 _ => (self.original_version, self.routing_key.to_owned()),
156 }
157 } else {
158 (self.original_version, self.routing_key.to_owned())
159 };
160
161 let metadata = self
162 .metadata
163 .to_owned()
164 .unwrap_or_else(|| bitcode::encode(&true));
165
166 let mut events = vec![];
167 let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
168
169 for (name, data) in &self.data {
170 version += 1;
171
172 let event = Event {
173 id: Ulid::new(),
174 name: name.to_string(),
175 data: data.to_vec(),
176 metadata: metadata.to_vec(),
177 timestamp: now.as_secs(),
178 timestamp_subsec: now.subsec_millis(),
179 aggregator_id: self.aggregator_id.to_owned(),
180 aggregator_type: self.aggregator_type.to_owned(),
181 version,
182 routing_key: routing_key.to_owned(),
183 };
184
185 events.push(event);
186 }
187
188 if events.is_empty() {
189 return Err(WriteError::MissingData);
190 }
191
192 executor.write(events).await?;
193
194 Ok(self.aggregator_id.to_owned())
195 }
196}
197
198pub fn create() -> AggregatorBuilder {
209 AggregatorBuilder::new(Ulid::new())
210}
211
212pub fn aggregator(id: impl Into<String>) -> AggregatorBuilder {
224 AggregatorBuilder::new(id)
225}