1use std::{hash::Hash, sync::Arc};
16use ulid::Ulid;
17
18use crate::{
19 cursor::{Args, ReadResult, Value},
20 Event, RoutingKey, WriteError,
21};
22
23#[derive(Clone, PartialEq, Eq)]
40pub struct ReadAggregator {
41 pub aggregator_type: String,
43 pub aggregator_id: Option<String>,
45 pub name: Option<String>,
47}
48
49impl ReadAggregator {
50 pub fn new(
54 aggregator_type: impl Into<String>,
55 id: impl Into<String>,
56 name: impl Into<String>,
57 ) -> Self {
58 Self {
59 aggregator_type: aggregator_type.into(),
60 aggregator_id: Some(id.into()),
61 name: Some(name.into()),
62 }
63 }
64
65 pub fn aggregator(value: impl Into<String>) -> Self {
69 Self {
70 aggregator_type: value.into(),
71 aggregator_id: None,
72 name: None,
73 }
74 }
75
76 pub fn id(aggregator_type: impl Into<String>, id: impl Into<String>) -> Self {
80 Self {
81 aggregator_type: aggregator_type.into(),
82 aggregator_id: Some(id.into()),
83 name: None,
84 }
85 }
86
87 pub fn event(aggregator_type: impl Into<String>, name: impl Into<String>) -> Self {
91 Self {
92 aggregator_type: aggregator_type.into(),
93 aggregator_id: None,
94 name: Some(name.into()),
95 }
96 }
97}
98
99impl Hash for ReadAggregator {
100 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
101 self.aggregator_type.hash(state);
102 self.aggregator_id.hash(state);
103 self.name.hash(state);
104 }
105}
106
107#[async_trait::async_trait]
121pub trait Executor: Send + Sync + 'static {
122 async fn write(&self, events: Vec<Event>) -> Result<(), WriteError>;
126
127 async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>>;
129
130 async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool>;
132
133 async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()>;
135
136 async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()>;
138
139 async fn read(
141 &self,
142 aggregators: Option<Vec<ReadAggregator>>,
143 routing_key: Option<RoutingKey>,
144 args: Args,
145 ) -> anyhow::Result<ReadResult<Event>>;
146
147 async fn get_snapshot(
152 &self,
153 aggregator_type: String,
154 aggregator_revision: String,
155 id: String,
156 ) -> anyhow::Result<Option<(Vec<u8>, Value)>>;
157
158 async fn save_snapshot(
163 &self,
164 aggregator_type: String,
165 aggregator_revision: String,
166 id: String,
167 data: Vec<u8>,
168 cursor: Value,
169 ) -> anyhow::Result<()>;
170}
171
172pub struct Evento(Arc<Box<dyn Executor>>);
187
188impl Clone for Evento {
189 fn clone(&self) -> Self {
190 Self(self.0.clone())
191 }
192}
193
194#[async_trait::async_trait]
195impl Executor for Evento {
196 async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
197 self.0.write(events).await
198 }
199
200 async fn read(
201 &self,
202 aggregators: Option<Vec<ReadAggregator>>,
203 routing_key: Option<RoutingKey>,
204 args: Args,
205 ) -> anyhow::Result<ReadResult<Event>> {
206 self.0.read(aggregators, routing_key, args).await
207 }
208
209 async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
210 self.0.get_subscriber_cursor(key).await
211 }
212
213 async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
214 self.0.is_subscriber_running(key, worker_id).await
215 }
216
217 async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
218 self.0.upsert_subscriber(key, worker_id).await
219 }
220
221 async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
222 self.0.acknowledge(key, cursor, lag).await
223 }
224
225 async fn get_snapshot(
226 &self,
227 aggregator_type: String,
228 aggregator_revision: String,
229 id: String,
230 ) -> anyhow::Result<Option<(Vec<u8>, Value)>> {
231 self.0
232 .get_snapshot(aggregator_type, aggregator_revision, id)
233 .await
234 }
235
236 async fn save_snapshot(
237 &self,
238 aggregator_type: String,
239 aggregator_revision: String,
240 id: String,
241 data: Vec<u8>,
242 cursor: Value,
243 ) -> anyhow::Result<()> {
244 self.0
245 .save_snapshot(aggregator_type, aggregator_revision, id, data, cursor)
246 .await
247 }
248}
249
250impl Evento {
251 pub fn new<E: Executor>(executor: E) -> Self {
253 Self(Arc::new(Box::new(executor)))
254 }
255}
256
257#[cfg(feature = "group")]
264#[derive(Clone, Default)]
265pub struct EventoGroup {
266 executors: Vec<Evento>,
267}
268
269#[cfg(feature = "group")]
270impl EventoGroup {
271 pub fn executor(mut self, executor: impl Into<Evento>) -> Self {
275 self.executors.push(executor.into());
276
277 self
278 }
279
280 pub fn first(&self) -> &Evento {
286 self.executors
287 .first()
288 .expect("EventoGroup must have at least one executor")
289 }
290}
291
292#[cfg(feature = "group")]
293#[async_trait::async_trait]
294impl Executor for EventoGroup {
295 async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
296 self.first().write(events).await
297 }
298
299 async fn read(
300 &self,
301 aggregators: Option<Vec<ReadAggregator>>,
302 routing_key: Option<RoutingKey>,
303 args: Args,
304 ) -> anyhow::Result<ReadResult<Event>> {
305 use crate::cursor;
306 let futures = self
307 .executors
308 .iter()
309 .map(|e| e.read(aggregators.to_owned(), routing_key.to_owned(), args.clone()));
310
311 let results = futures_util::future::join_all(futures).await;
312 let mut events = vec![];
313 for res in results {
314 for edge in res?.edges {
315 events.push(edge.node);
316 }
317 }
318
319 Ok(cursor::Reader::new(events).args(args).execute()?)
320 }
321
322 async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
323 self.first().get_subscriber_cursor(key).await
324 }
325
326 async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
327 self.first().is_subscriber_running(key, worker_id).await
328 }
329
330 async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
331 self.first().upsert_subscriber(key, worker_id).await
332 }
333
334 async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
335 self.first().acknowledge(key, cursor, lag).await
336 }
337
338 async fn get_snapshot(
339 &self,
340 aggregator_type: String,
341 aggregator_revision: String,
342 id: String,
343 ) -> anyhow::Result<Option<(Vec<u8>, Value)>> {
344 self.first()
345 .get_snapshot(aggregator_type, aggregator_revision, id)
346 .await
347 }
348
349 async fn save_snapshot(
350 &self,
351 aggregator_type: String,
352 aggregator_revision: String,
353 id: String,
354 data: Vec<u8>,
355 cursor: Value,
356 ) -> anyhow::Result<()> {
357 self.first()
358 .save_snapshot(aggregator_type, aggregator_revision, id, data, cursor)
359 .await
360 }
361}
362
363#[cfg(feature = "rw")]
374pub struct Rw<R: Executor, W: Executor> {
375 r: R,
376 w: W,
377}
378
379#[cfg(feature = "rw")]
380impl<R: Executor + Clone, W: Executor + Clone> Clone for Rw<R, W> {
381 fn clone(&self) -> Self {
382 Self {
383 r: self.r.clone(),
384 w: self.w.clone(),
385 }
386 }
387}
388
389#[cfg(feature = "rw")]
390#[async_trait::async_trait]
391impl<R: Executor, W: Executor> Executor for Rw<R, W> {
392 async fn write(&self, events: Vec<Event>) -> Result<(), WriteError> {
393 self.w.write(events).await
394 }
395
396 async fn read(
397 &self,
398 aggregators: Option<Vec<ReadAggregator>>,
399 routing_key: Option<RoutingKey>,
400 args: Args,
401 ) -> anyhow::Result<ReadResult<Event>> {
402 self.r.read(aggregators, routing_key, args).await
403 }
404
405 async fn get_subscriber_cursor(&self, key: String) -> anyhow::Result<Option<Value>> {
406 self.r.get_subscriber_cursor(key).await
407 }
408
409 async fn is_subscriber_running(&self, key: String, worker_id: Ulid) -> anyhow::Result<bool> {
410 self.r.is_subscriber_running(key, worker_id).await
411 }
412
413 async fn upsert_subscriber(&self, key: String, worker_id: Ulid) -> anyhow::Result<()> {
414 self.w.upsert_subscriber(key, worker_id).await
415 }
416
417 async fn acknowledge(&self, key: String, cursor: Value, lag: u64) -> anyhow::Result<()> {
418 self.w.acknowledge(key, cursor, lag).await
419 }
420
421 async fn get_snapshot(
422 &self,
423 aggregator_type: String,
424 aggregator_revision: String,
425 id: String,
426 ) -> anyhow::Result<Option<(Vec<u8>, Value)>> {
427 self.r
428 .get_snapshot(aggregator_type, aggregator_revision, id)
429 .await
430 }
431
432 async fn save_snapshot(
433 &self,
434 aggregator_type: String,
435 aggregator_revision: String,
436 id: String,
437 data: Vec<u8>,
438 cursor: Value,
439 ) -> anyhow::Result<()> {
440 self.w
441 .save_snapshot(aggregator_type, aggregator_revision, id, data, cursor)
442 .await
443 }
444}
445
446#[cfg(feature = "rw")]
447impl<R: Executor, W: Executor> From<(R, W)> for Rw<R, W> {
448 fn from((r, w): (R, W)) -> Self {
449 Self { r, w }
450 }
451}