event_sourcing/
event_store.rs

1use crate::adapter::EventStoreAdapter;
2use crate::adapter::NotificationAdapter;
3use crate::error::create_error::{
4    AdapterSnafu, AggregateAlreadyExistsSnafu, EmptyEventListSnafu, ExternalIdAlreadyExistsSnafu,
5    InconsistentAggregateIdSnafu, InconsistentEventOrderingSnafu, NilUuidSnafu,
6};
7use crate::error::{AdapterError, CreateError, StoreError};
8use crate::{Aggregate, Event, EventStoreBuilder, IntoEventList};
9use alloc::string::{String, ToString};
10use alloc::sync::Arc;
11use alloc::vec::Vec;
12use core::marker::PhantomData;
13use core::time::Duration;
14use futures::future::{try_join_all};
15use futures::stream::BoxStream;
16use futures::{StreamExt, TryStreamExt};
17use snafu::{ensure, ResultExt};
18use uuid::Uuid;
19
20#[cfg(feature = "prometheus")]
21lazy_static::lazy_static! {
22    static ref STORED_EVENT_COUNTER: prometheus::IntCounter =
23        prometheus::register_int_counter!("num_stored_events_count", "Number of stored events").unwrap();
24    static ref CREATED_AGGREGATES_COUNTER: prometheus::IntCounter =
25        prometheus::register_int_counter!("num_aggregates_created_count", "Number of newly created aggregates").unwrap();
26    static ref READ_EVENTS_COUNTER: prometheus::IntCounter =
27        prometheus::register_int_counter!("num_read_events_count", "Number of read events").unwrap();
28    static ref AGGREGATE_APPLY_TIME_HISTOGRAM: prometheus::HistogramVec =
29        prometheus::register_histogram_vec!("aggregate_apply_time", "Time fully build an aggregate", &["snapshot", "aggregate_name"], vec![0.001, 0.005, 0.010, 0.025, 0.05, 0.075, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0]).unwrap();
30}
31
32/// The event store used to persisting events. Besides from using the store, to well, store events,
33/// it can also be used to fetch them, and to stream updates synced between different instances
34#[derive(Debug, Clone)]
35pub struct EventStore<A: Aggregate<E>, E> {
36    adapter: Arc<dyn EventStoreAdapter<A, E>>,
37    notification_adapter: Arc<dyn NotificationAdapter<A, E>>,
38    store_attempts: usize,
39    phantom_data: PhantomData<A>,
40}
41
42impl<A: Aggregate<E> + Send + Sync + Clone, E: std::marker::Send> EventStore<A, E> {
43    pub(crate) fn new<T: EventStoreAdapter<A, E> + 'static, NT: NotificationAdapter<A, E> + 'static>(
44        adapter: T,
45        notification_adapter: NT,
46        store_attempts: usize
47    ) -> Self {
48        Self {
49            adapter: Arc::new(adapter),
50            notification_adapter: Arc::new(notification_adapter),
51            store_attempts,
52            phantom_data: PhantomData,
53        }
54    }
55
56    pub fn builder() -> EventStoreBuilder<A, E, (), ()> {
57        EventStoreBuilder::new()
58    }
59
60    /// Fetches a single aggregate
61    pub async fn aggregate(&self, aggregate_id: Uuid) -> Result<Option<A>, AdapterError> {
62        #[cfg(feature = "prometheus")]
63        let timer = AGGREGATE_APPLY_TIME_HISTOGRAM.with_label_values(&["true", A::name()]).start_timer();
64
65        let result = self.aggregate_inner(aggregate_id).await;
66
67        #[cfg(feature = "prometheus")]
68        timer.observe_duration();
69
70        result
71    }
72
73    /// Fetches a single aggregate
74    pub async fn aggregate_without_snapshot(&self, aggregate_id: Uuid) -> Result<Option<A>, AdapterError> {
75        #[cfg(feature = "prometheus")]
76            let timer = AGGREGATE_APPLY_TIME_HISTOGRAM.with_label_values(&["false", A::name()]).start_timer();
77
78        let result = self.aggregate_inner_without_snapshot(aggregate_id).await;
79
80        #[cfg(feature = "prometheus")]
81        timer.observe_duration();
82
83        result
84    }
85    async fn aggregate_inner(&self, aggregate_id: Uuid) -> Result<Option<A>, AdapterError> {
86        match self.adapter.get_snapshot(aggregate_id).await? {
87            Some(mut aggregate) => {
88                let start_version = aggregate.version();
89                let stream = self.adapter.get_events(aggregate_id, Some(aggregate.version())).await?;
90
91                aggregate = stream.try_fold(aggregate, |mut a, event| async move {
92                    ensure!(
93                        event.event_id == (a.version() + 1) as u64,
94                        crate::error::adapter_error::InconsistentEventOrderingSnafu
95                    );
96                    a.apply(&event);
97                    Ok(a)
98                }).await?;
99
100                #[cfg(feature = "prometheus")]
101                READ_EVENTS_COUNTER.inc_by(aggregate.version() - start_version);
102
103                if aggregate.version() == 0 {
104                    Ok(None)
105                } else {
106                    if (aggregate.version() - start_version) > 10 {
107                        self.adapter.save_snapshot(&aggregate).await?;
108                    }
109                    Ok(Some(aggregate))
110                }
111            }
112            None => {
113                self.aggregate_inner_without_snapshot(aggregate_id).await
114            }
115        }
116    }
117    async fn aggregate_inner_without_snapshot(&self, aggregate_id: Uuid) -> Result<Option<A>, AdapterError> {
118        let stream = self.adapter.get_events(aggregate_id, None).await?;
119        let mut aggregate = A::new_with_aggregate_id(aggregate_id);
120
121        aggregate = stream.try_fold(aggregate, |mut a, event| async move {
122            ensure!(
123                event.event_id == (a.version() + 1) as u64,
124                crate::error::adapter_error::InconsistentEventOrderingSnafu
125            );
126            a.apply(&event);
127            Ok(a)
128        }).await?;
129
130        #[cfg(feature = "prometheus")]
131        READ_EVENTS_COUNTER.inc_by(aggregate.version());
132
133        if aggregate.version() == 0 {
134            Ok(None)
135        } else {
136            self.adapter.save_snapshot(&aggregate).await?;
137            Ok(Some(aggregate))
138        }
139    }
140
141    /// Returns all ids in the store
142    pub async fn ids(&self) -> Result<BoxStream<Uuid>, AdapterError> {
143        self.adapter.stream_ids().await
144    }
145
146    /// Returns all aggregates in the store
147    pub async fn all(&self) -> Result<BoxStream<Result<A, AdapterError>>, AdapterError> {
148        Ok(self
149            .ids()
150            .await?
151            .then(move |id| async move { self.aggregate(id).await })
152            .try_filter_map(|i| async move { Ok(i) })
153            .boxed())
154    }
155
156    /// Creates a new aggregate
157    pub async fn create<T: IntoEventList<E>>(&self, events: T) -> Result<A, CreateError> {
158        let events = events.into_list();
159        ensure!(!events.is_empty(), EmptyEventListSnafu);
160
161        let first_aggregate_id = events.first().unwrap().aggregate_id;
162
163        ensure!(first_aggregate_id != Uuid::nil(), NilUuidSnafu);
164
165        for (index, event) in events.iter().enumerate() {
166            ensure!(
167                event.aggregate_id == first_aggregate_id,
168                InconsistentAggregateIdSnafu
169            );
170            ensure!(
171                event.event_id == (index + 1) as u64,
172                InconsistentEventOrderingSnafu
173            );
174        }
175
176        ensure!(
177            self.aggregate(first_aggregate_id)
178                .await
179                .context(crate::error::create_error::AdapterSnafu {})?
180                .is_none(),
181            AggregateAlreadyExistsSnafu {
182                id: first_aggregate_id
183            }
184        );
185
186        self.adapter
187            .save_events(&events)
188            .await
189            .context(AdapterSnafu)?;
190
191        let mut aggregate = A::new_with_aggregate_id(first_aggregate_id);
192
193        for event in events {
194            let old_aggregate = if aggregate.version() == 0 {
195                None
196            } else {
197                Some(aggregate.clone())
198            };
199
200            aggregate.apply(&event);
201            self.notification_adapter
202                .send_event(&event, &aggregate, old_aggregate.as_ref())
203                .await
204                .context(AdapterSnafu {})?;
205        }
206
207
208        #[cfg(feature = "prometheus")]
209        CREATED_AGGREGATES_COUNTER.inc_by(1);
210
211        Ok(aggregate)
212    }
213
214    /// Creates a new aggregate
215    pub async fn create_with_external_ids(
216        &self,
217        events: Vec<Event<E>>,
218        external_ids: Vec<String>,
219    ) -> Result<A, CreateError> {
220        ensure!(!events.is_empty(), EmptyEventListSnafu);
221
222        let first_aggregate_id = events.first().unwrap().aggregate_id;
223
224        let results: Vec<(Option<Uuid>, String)> = try_join_all(
225            external_ids
226                .iter()
227                .map(|external_id| async move {
228                    let aggregate_id = self
229                        .adapter
230                        .aggregate_id_from_external_id(external_id)
231                        .await?;
232                    Ok((aggregate_id, external_id.to_string()))
233                })
234                .collect::<Vec<_>>(),
235        )
236        .await
237        .context(AdapterSnafu {})?;
238
239        for (aggregate_id_opt, external_id) in results {
240            ensure!(
241                aggregate_id_opt.is_none() || aggregate_id_opt == Some(first_aggregate_id),
242                ExternalIdAlreadyExistsSnafu {
243                    id: aggregate_id_opt.unwrap(),
244                    external_id
245                }
246            );
247        }
248
249        self.adapter
250            .save_aggregate_id_to_external_ids(first_aggregate_id, &external_ids)
251            .await
252            .context(AdapterSnafu {})?;
253
254        self.create(events).await
255    }
256
257    /// Stores events to an existing aggregate
258    pub async fn store<T: IntoEventList<E>, F: Fn(&A) -> T + Send + Sync>(
259        &self,
260        aggregate_id: Uuid,
261        callback: F,
262    ) -> Result<A, StoreError> {
263        let mut error = None;
264        for _i in 0..self.store_attempts {
265            match self
266                .aggregate(aggregate_id)
267                .await
268                .context(crate::error::store_error::AdapterSnafu {})?
269            {
270                Some(mut aggregate) => {
271                    let events = callback(&aggregate);
272                    let events = events.into_list();
273                    if events.is_empty() {
274                        return Ok(aggregate);
275                    }
276                    match self.adapter.save_events(&events).await {
277                        Ok(_) => {
278
279                            #[cfg(feature = "prometheus")]
280                            STORED_EVENT_COUNTER.inc_by(events.len() as u64);
281
282                            for event in events {
283                                let old_aggregate = aggregate.clone();
284                                aggregate.apply(&event);
285                                self.notification_adapter
286                                    .send_event(&event, &aggregate, Some(&old_aggregate))
287                                    .await
288                                    .context(crate::error::store_error::AdapterSnafu {})?;
289                            }
290                            return Ok(aggregate);
291                        }
292                        Err(err) => {
293                            error = Some(err);
294                            tokio::time::sleep(Duration::from_millis(50)).await;
295                            continue;
296                        }
297                    }
298                }
299                None => {
300                    return Err(StoreError::AggregateDoesNotExist { aggregate_id });
301                }
302            }
303        }
304        match error {
305            None => Err(StoreError::Unknown),
306            Some(err) => Err(StoreError::AdapterError { source: err }),
307        }
308    }
309
310    /// Stores events to an existing aggregate with the option to throw a validation error
311    pub async fn try_store<
312        T: IntoEventList<E>,
313        F: Fn(&A) -> Result<T, I> + Send + Sync,
314        I: Send + Sync,
315    >(
316        &self,
317        aggregate_id: Uuid,
318        callback: F,
319    ) -> Result<Result<A, I>, StoreError> {
320        let mut error = None;
321        for _i in 0..self.store_attempts {
322            match self
323                .aggregate(aggregate_id)
324                .await
325                .context(crate::error::store_error::AdapterSnafu {})?
326            {
327                Some(mut aggregate) => match callback(&aggregate) {
328                    Ok(events) => {
329                        let events = events.into_list();
330                        if events.is_empty() {
331                            return Ok(Ok(aggregate));
332                        }
333                        match self.adapter.save_events(&events).await {
334                            Ok(_) => {
335
336                                #[cfg(feature = "prometheus")]
337                                STORED_EVENT_COUNTER.inc_by(events.len() as u64);
338
339                                for event in events {
340                                    let old_aggregate = aggregate.clone();
341                                    aggregate.apply(&event);
342                                    self.notification_adapter
343                                        .send_event(&event, &aggregate, Some(&old_aggregate))
344                                        .await
345                                        .context(crate::error::store_error::AdapterSnafu {})?;
346                                }
347                                return Ok(Ok(aggregate));
348                            }
349                            Err(err) => {
350                                error = Some(err);
351                                tokio::time::sleep(Duration::from_millis(50)).await;
352                                continue;
353                            }
354                        }
355                    }
356                    Err(err) => return Ok(Err(err)),
357                },
358                None => {
359                    return Err(StoreError::AggregateDoesNotExist { aggregate_id });
360                }
361            }
362        }
363
364        match error {
365            None => Err(StoreError::Unknown),
366            Some(err) => Err(StoreError::AdapterError { source: err }),
367        }
368    }
369
370    pub async fn stream_realtime(
371        &self,
372    ) -> Result<BoxStream<Result<RealtimeStreamData<A, E>, AdapterError>>, AdapterError> {
373        let stream = self.notification_adapter.listen_for_events().await?;
374
375        let mapped_stream = stream
376            .map_ok(|data| RealtimeStreamData {
377                event: data.event,
378                new_aggregate: data.new_aggregate,
379                old_aggregate: data.old_aggregate,
380            })
381            .boxed();
382
383        Ok(mapped_stream)
384    }
385
386    /// Removes the aggregate permanently
387    pub async fn remove(&self, aggregate_id: Uuid) -> Result<(), AdapterError> {
388        self.adapter.remove(aggregate_id).await
389    }
390}
391
392pub struct RealtimeStreamData<A, E> {
393    pub event: Event<E>,
394    pub old_aggregate: Option<A>,
395    pub new_aggregate: A,
396}
397
398#[cfg(test)]
399mod tests {
400    extern crate std;
401
402    use crate::adapter::in_memory::InMemoryAdapter;
403    use crate::adapter::{EventStoreAdapter, NotificationAdapter};
404    use crate::{Aggregate, Event, EventStore};
405    use alloc::string::ToString;
406    use alloc::vec;
407    use alloc::vec::Vec;
408    use core::convert::Infallible;
409    use futures::{StreamExt, TryStreamExt};
410    use uuid::Uuid;
411
412    #[derive(Debug, Clone, PartialEq)]
413    enum TestEvent {
414        Test(),
415    }
416
417    #[derive(Debug, Clone, Default, Eq, PartialEq)]
418    struct TestAggregate {
419        aggregate_id: Uuid,
420        version: u64,
421    }
422
423    impl Aggregate<TestEvent> for TestAggregate {
424        fn version(&self) -> u64 {
425            self.version
426        }
427
428        fn aggregate_id(&self) -> Uuid {
429            self.aggregate_id
430        }
431
432        fn apply(&mut self, event: &Event<TestEvent>) {
433            self.version = event.event_id
434        }
435
436        fn new_with_aggregate_id(aggregate_id: Uuid) -> Self {
437            Self {
438                aggregate_id,
439                version: 0,
440            }
441        }
442
443        fn name() -> &'static str {
444            "test"
445        }
446    }
447
448    #[tokio::test]
449    async fn get_none_existing_aggregate() {
450        let adapter = InMemoryAdapter::new();
451        let store = EventStore::<TestAggregate, _>::builder()
452            .event_store_adapter(adapter.clone())
453            .notification_adapter(adapter.clone())
454            .build();
455
456        let id = Uuid::new_v4();
457
458        let aggregate = store.aggregate(id).await.unwrap();
459        assert_eq!(aggregate, None);
460    }
461
462    #[tokio::test]
463    async fn get_existing_aggregate() {
464        let adapter = InMemoryAdapter::new();
465        let store = EventStore::<TestAggregate, _>::builder()
466            .event_store_adapter(adapter.clone())
467            .notification_adapter(adapter.clone())
468            .build();
469
470        let id = Uuid::new_v4();
471
472        adapter
473            .save_events(
474                &Event::list_builder()
475                    .add_event(TestEvent::Test(), None)
476                    .build_new(id),
477            )
478            .await
479            .unwrap();
480
481        let aggregate = store.aggregate(id).await.unwrap();
482        assert!(aggregate.is_some());
483    }
484
485    #[tokio::test]
486    async fn get_ids_should_return_all_ids_in_store() {
487        let adapter = InMemoryAdapter::new();
488        let store = EventStore::<TestAggregate, _>::builder()
489            .event_store_adapter(adapter.clone())
490            .notification_adapter(adapter.clone())
491            .build();
492
493        let id = Uuid::new_v4();
494        adapter
495            .save_events(
496                &Event::list_builder()
497                    .add_event(TestEvent::Test(), None)
498                    .build_new(id),
499            )
500            .await
501            .unwrap();
502
503        let aggregate: Vec<_> = store.ids().await.unwrap().collect().await;
504        assert_eq!(aggregate, vec![id]);
505    }
506
507    #[tokio::test]
508    async fn get_all_aggregates_should_return_all_aggregates_in_store() {
509        let adapter = InMemoryAdapter::new();
510        let store = EventStore::<TestAggregate, _>::builder()
511            .event_store_adapter(adapter.clone())
512            .notification_adapter(adapter.clone())
513            .build();
514
515        let id = Uuid::new_v4();
516        adapter
517            .save_events(
518                &Event::list_builder()
519                    .add_event(TestEvent::Test(), None)
520                    .build_new(id),
521            )
522            .await
523            .unwrap();
524
525        let aggregate: Vec<_> = store.all().await.unwrap().try_collect().await.unwrap();
526        assert_eq!(aggregate[0].aggregate_id(), id);
527    }
528
529    #[tokio::test]
530    async fn create_without_external_ids_should_work() {
531        let adapter = InMemoryAdapter::new();
532        let store = EventStore::<TestAggregate, _>::builder()
533            .event_store_adapter(adapter.clone())
534            .notification_adapter(adapter.clone())
535            .build();
536
537        let id = Uuid::new_v4();
538        store
539            .create(
540                Event::list_builder()
541                    .add_event(TestEvent::Test(), None)
542                    .build_new(id),
543            )
544            .await
545            .unwrap();
546
547        let events = adapter.get_events(id, None).await.unwrap().try_collect::<Vec<_>>().await.unwrap();
548        assert_eq!(events.len(), 1);
549    }
550
551    #[tokio::test]
552    async fn create_without_external_ids_should_not_work_twice() {
553        let adapter = InMemoryAdapter::new();
554        let store = EventStore::<TestAggregate, _>::builder()
555            .event_store_adapter(adapter.clone())
556            .notification_adapter(adapter.clone())
557            .build();
558
559        let id = Uuid::new_v4();
560        store
561            .create(
562                Event::list_builder()
563                    .add_event(TestEvent::Test(), None)
564                    .build_new(id),
565            )
566            .await
567            .unwrap();
568        store
569            .create(
570                Event::list_builder()
571                    .add_event(TestEvent::Test(), None)
572                    .build_new(id),
573            )
574            .await
575            .unwrap_err();
576    }
577
578    #[tokio::test]
579    async fn create_without_external_ids_should_send_notifications() {
580        let adapter = InMemoryAdapter::new();
581        let store = EventStore::<TestAggregate, _>::builder()
582            .event_store_adapter(adapter.clone())
583            .notification_adapter(adapter.clone())
584            .build();
585
586        let stream = adapter.listen_for_events().await.unwrap();
587
588        let id = Uuid::new_v4();
589        store
590            .create(
591                Event::list_builder()
592                    .add_event(TestEvent::Test(), None)
593                    .build_new(id),
594            )
595            .await
596            .unwrap();
597
598        let event = stream.into_future().await.0.unwrap().unwrap().event.payload;
599
600        assert_eq!(event, TestEvent::Test())
601    }
602
603    #[tokio::test]
604    async fn create_with_external_ids_should_work() {
605        let adapter = InMemoryAdapter::new();
606        let store = EventStore::<TestAggregate, _>::builder()
607            .event_store_adapter(adapter.clone())
608            .notification_adapter(adapter.clone())
609            .build();
610
611        let id = Uuid::new_v4();
612        store
613            .create_with_external_ids(
614                Event::list_builder()
615                    .add_event(TestEvent::Test(), None)
616                    .build_new(id),
617                vec!["123".to_string()],
618            )
619            .await
620            .unwrap();
621
622        let events = adapter.get_events(id, None).await.unwrap().try_collect::<Vec<_>>().await.unwrap();
623        assert_eq!(events.len(), 1);
624    }
625
626    #[tokio::test]
627    async fn create_with_external_ids_should_not_work_twice() {
628        let adapter = InMemoryAdapter::new();
629        let store = EventStore::<TestAggregate, _>::builder()
630            .event_store_adapter(adapter.clone())
631            .notification_adapter(adapter.clone())
632            .build();
633
634        store
635            .create_with_external_ids(
636                Event::list_builder()
637                    .add_event(TestEvent::Test(), None)
638                    .build_new(Uuid::new_v4()),
639                vec!["123".to_string()],
640            )
641            .await
642            .unwrap();
643        store
644            .create_with_external_ids(
645                Event::list_builder()
646                    .add_event(TestEvent::Test(), None)
647                    .build_new(Uuid::new_v4()),
648                vec!["123".to_string()],
649            )
650            .await
651            .unwrap_err();
652    }
653
654    #[tokio::test]
655    async fn store_should_store_events() {
656        let adapter = InMemoryAdapter::new();
657        let store = EventStore::<TestAggregate, _>::builder()
658            .event_store_adapter(adapter.clone())
659            .notification_adapter(adapter.clone())
660            .build();
661
662        let id = Uuid::new_v4();
663        store
664            .create(
665                Event::list_builder()
666                    .add_event(TestEvent::Test(), None)
667                    .build_new(id),
668            )
669            .await
670            .unwrap();
671
672        store
673            .store(id, |aggregate| {
674                Event::list_builder()
675                    .add_event(TestEvent::Test(), None)
676                    .build(aggregate)
677            })
678            .await
679            .unwrap();
680
681        let events = adapter.get_events(id, None).await.unwrap().try_collect::<Vec<_>>().await.unwrap();
682
683        assert_eq!(events.len(), 2)
684    }
685
686    #[tokio::test]
687    async fn store_should_fail_if_aggregate_does_not_exist() {
688        let adapter = InMemoryAdapter::new();
689        let store = EventStore::<TestAggregate, _>::builder()
690            .event_store_adapter(adapter.clone())
691            .notification_adapter(adapter.clone())
692            .build();
693
694        let id = Uuid::new_v4();
695
696        store
697            .store(id, |aggregate| {
698                Event::list_builder()
699                    .add_event(TestEvent::Test(), None)
700                    .build(aggregate)
701            })
702            .await
703            .unwrap_err();
704    }
705
706    #[tokio::test]
707    async fn store_should_fail_if_the_event_sequence_is_invalid() {
708        let adapter = InMemoryAdapter::new();
709        let store = EventStore::<TestAggregate, _>::builder()
710            .event_store_adapter(adapter.clone())
711            .notification_adapter(adapter.clone())
712            .build();
713
714        let id = Uuid::new_v4();
715
716        store
717            .store(id, |_aggregate| {
718                vec![
719                    Event {
720                        aggregate_id: id,
721                        event_id: 1,
722                        created_at: Default::default(),
723                        user_id: None,
724                        payload: TestEvent::Test(),
725                    },
726                    Event {
727                        aggregate_id: id,
728                        event_id: 1, // Duplicated event id
729                        created_at: Default::default(),
730                        user_id: None,
731                        payload: TestEvent::Test(),
732                    },
733                ]
734            })
735            .await
736            .unwrap_err();
737    }
738
739    #[tokio::test]
740    async fn store_should_send_events() {
741        let adapter = InMemoryAdapter::new();
742        let store = EventStore::<TestAggregate, _>::builder()
743            .event_store_adapter(adapter.clone())
744            .notification_adapter(adapter.clone())
745            .build();
746
747        let id = Uuid::new_v4();
748        store
749            .create(
750                Event::list_builder()
751                    .add_event(TestEvent::Test(), None)
752                    .build_new(id),
753            )
754            .await
755            .unwrap();
756
757        let stream = adapter.listen_for_events().await.unwrap();
758
759        store
760            .store(id, |aggregate| {
761                Event::list_builder()
762                    .add_event(TestEvent::Test(), None)
763                    .build(aggregate)
764            })
765            .await
766            .unwrap();
767
768        let event = stream.into_future().await.0.unwrap().unwrap().event.payload;
769
770        assert_eq!(event, TestEvent::Test())
771    }
772
773    #[tokio::test]
774    async fn try_store_should_store_events() {
775        let adapter = InMemoryAdapter::new();
776        let store = EventStore::<TestAggregate, _>::builder()
777            .event_store_adapter(adapter.clone())
778            .notification_adapter(adapter.clone())
779            .build();
780
781        let id = Uuid::new_v4();
782        store
783            .create(
784                Event::list_builder()
785                    .add_event(TestEvent::Test(), None)
786                    .build_new(id),
787            )
788            .await
789            .unwrap();
790
791        store
792            .try_store(id, |aggregate| {
793                Result::<_, Infallible>::Ok(
794                    Event::list_builder()
795                        .add_event(TestEvent::Test(), None)
796                        .build(aggregate),
797                )
798            })
799            .await
800            .unwrap()
801            .unwrap();
802
803        let events = adapter.get_events(id, None).await.unwrap().try_collect::<Vec<_>>().await.unwrap();
804
805        assert_eq!(events.len(), 2)
806    }
807
808    #[tokio::test]
809    async fn try_store_should_fail_if_aggregate_does_not_exist() {
810        let adapter = InMemoryAdapter::new();
811        let store = EventStore::<TestAggregate, _>::builder()
812            .event_store_adapter(adapter.clone())
813            .notification_adapter(adapter.clone())
814            .build();
815
816        let id = Uuid::new_v4();
817
818        store
819            .try_store(id, |aggregate| {
820                Result::<_, Infallible>::Ok(
821                    Event::list_builder()
822                        .add_event(TestEvent::Test(), None)
823                        .build(aggregate),
824                )
825            })
826            .await
827            .unwrap_err();
828    }
829
830    #[tokio::test]
831    async fn try_store_should_fail_if_the_event_sequence_is_invalid() {
832        let adapter = InMemoryAdapter::new();
833        let store = EventStore::<TestAggregate, _>::builder()
834            .event_store_adapter(adapter.clone())
835            .notification_adapter(adapter.clone())
836            .build();
837
838        let id = Uuid::new_v4();
839
840        store
841            .try_store(id, |_aggregate| {
842                Result::<_, Infallible>::Ok(vec![
843                    Event {
844                        aggregate_id: id,
845                        event_id: 1,
846                        created_at: Default::default(),
847                        user_id: None,
848                        payload: TestEvent::Test(),
849                    },
850                    Event {
851                        aggregate_id: id,
852                        event_id: 1, // Duplicated event id
853                        created_at: Default::default(),
854                        user_id: None,
855                        payload: TestEvent::Test(),
856                    },
857                ])
858            })
859            .await
860            .unwrap_err();
861    }
862
863    #[tokio::test]
864    async fn try_store_error_thrown_inside_should_be_propegated() {
865        let adapter = InMemoryAdapter::new();
866        let store = EventStore::<TestAggregate, _>::builder()
867            .event_store_adapter(adapter.clone())
868            .notification_adapter(adapter.clone())
869            .build();
870
871        let id = Uuid::new_v4();
872        store
873            .create(
874                Event::list_builder()
875                    .add_event(TestEvent::Test(), None)
876                    .build_new(id),
877            )
878            .await
879            .unwrap();
880
881        let err = store
882            .try_store(id, |_aggregate| {
883                Err::<Vec<Event<TestEvent>>, &str>("Failure")
884            })
885            .await
886            .unwrap()
887            .unwrap_err();
888
889        assert_eq!(err, "Failure")
890    }
891
892    #[tokio::test]
893    async fn try_store_send_events() {
894        let adapter = InMemoryAdapter::new();
895        let store = EventStore::<TestAggregate, _>::builder()
896            .event_store_adapter(adapter.clone())
897            .notification_adapter(adapter.clone())
898            .build();
899
900        let id = Uuid::new_v4();
901        store
902            .create(
903                Event::list_builder()
904                    .add_event(TestEvent::Test(), None)
905                    .build_new(id),
906            )
907            .await
908            .unwrap();
909
910        store
911            .try_store(id, |aggregate| {
912                Result::<_, Infallible>::Ok(
913                    Event::list_builder()
914                        .add_event(TestEvent::Test(), None)
915                        .build(aggregate),
916                )
917            })
918            .await
919            .unwrap()
920            .unwrap();
921
922        let events = adapter.get_events(id, None).await.unwrap().try_collect::<Vec<_>>().await.unwrap();
923
924        assert_eq!(events.len(), 2)
925    }
926
927    #[tokio::test]
928    async fn remove_should_remove_all_events_for_aggregate() {
929        let adapter = InMemoryAdapter::new();
930        let store = EventStore::<TestAggregate, _>::builder()
931            .event_store_adapter(adapter.clone())
932            .notification_adapter(adapter.clone())
933            .build();
934
935        let id = Uuid::new_v4();
936        store
937            .create(
938                Event::list_builder()
939                    .add_event(TestEvent::Test(), None)
940                    .build_new(id),
941            )
942            .await
943            .unwrap();
944
945        let events = adapter.get_events(id, None).await.unwrap().try_collect::<Vec<_>>().await.unwrap();
946
947        assert_eq!(events.len(), 1);
948
949        store.remove(id).await.unwrap();
950
951        let events = adapter.get_events(id, None).await.unwrap().try_collect::<Vec<_>>().await.unwrap();
952
953        assert_eq!(events.len(), 0);
954    }
955}