1use crate::adapter::EventStoreAdapter;
2use crate::adapter::NotificationAdapter;
3use crate::error::create_error::{
4 AdapterSnafu, AggregateAlreadyExistsSnafu, EmptyEventListSnafu, ExternalIdAlreadyExistsSnafu,
5 InconsistentAggregateIdSnafu, InconsistentEventOrderingSnafu, NilUuidSnafu,
6};
7use crate::error::{AdapterError, CreateError, StoreError};
8use crate::{Aggregate, Event, EventStoreBuilder, IntoEventList};
9use alloc::string::{String, ToString};
10use alloc::sync::Arc;
11use alloc::vec::Vec;
12use core::marker::PhantomData;
13use core::time::Duration;
14use futures::future::{try_join_all};
15use futures::stream::BoxStream;
16use futures::{StreamExt, TryStreamExt};
17use snafu::{ensure, ResultExt};
18use uuid::Uuid;
19
20#[cfg(feature = "prometheus")]
21lazy_static::lazy_static! {
22 static ref STORED_EVENT_COUNTER: prometheus::IntCounter =
23 prometheus::register_int_counter!("num_stored_events_count", "Number of stored events").unwrap();
24 static ref CREATED_AGGREGATES_COUNTER: prometheus::IntCounter =
25 prometheus::register_int_counter!("num_aggregates_created_count", "Number of newly created aggregates").unwrap();
26 static ref READ_EVENTS_COUNTER: prometheus::IntCounter =
27 prometheus::register_int_counter!("num_read_events_count", "Number of read events").unwrap();
28 static ref AGGREGATE_APPLY_TIME_HISTOGRAM: prometheus::HistogramVec =
29 prometheus::register_histogram_vec!("aggregate_apply_time", "Time fully build an aggregate", &["snapshot", "aggregate_name"], vec![0.001, 0.005, 0.010, 0.025, 0.05, 0.075, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0]).unwrap();
30}
31
32#[derive(Debug, Clone)]
35pub struct EventStore<A: Aggregate<E>, E> {
36 adapter: Arc<dyn EventStoreAdapter<A, E>>,
37 notification_adapter: Arc<dyn NotificationAdapter<A, E>>,
38 store_attempts: usize,
39 phantom_data: PhantomData<A>,
40}
41
42impl<A: Aggregate<E> + Send + Sync + Clone, E: std::marker::Send> EventStore<A, E> {
43 pub(crate) fn new<T: EventStoreAdapter<A, E> + 'static, NT: NotificationAdapter<A, E> + 'static>(
44 adapter: T,
45 notification_adapter: NT,
46 store_attempts: usize
47 ) -> Self {
48 Self {
49 adapter: Arc::new(adapter),
50 notification_adapter: Arc::new(notification_adapter),
51 store_attempts,
52 phantom_data: PhantomData,
53 }
54 }
55
56 pub fn builder() -> EventStoreBuilder<A, E, (), ()> {
57 EventStoreBuilder::new()
58 }
59
60 pub async fn aggregate(&self, aggregate_id: Uuid) -> Result<Option<A>, AdapterError> {
62 #[cfg(feature = "prometheus")]
63 let timer = AGGREGATE_APPLY_TIME_HISTOGRAM.with_label_values(&["true", A::name()]).start_timer();
64
65 let result = self.aggregate_inner(aggregate_id).await;
66
67 #[cfg(feature = "prometheus")]
68 timer.observe_duration();
69
70 result
71 }
72
73 pub async fn aggregate_without_snapshot(&self, aggregate_id: Uuid) -> Result<Option<A>, AdapterError> {
75 #[cfg(feature = "prometheus")]
76 let timer = AGGREGATE_APPLY_TIME_HISTOGRAM.with_label_values(&["false", A::name()]).start_timer();
77
78 let result = self.aggregate_inner_without_snapshot(aggregate_id).await;
79
80 #[cfg(feature = "prometheus")]
81 timer.observe_duration();
82
83 result
84 }
85 async fn aggregate_inner(&self, aggregate_id: Uuid) -> Result<Option<A>, AdapterError> {
86 match self.adapter.get_snapshot(aggregate_id).await? {
87 Some(mut aggregate) => {
88 let start_version = aggregate.version();
89 let stream = self.adapter.get_events(aggregate_id, Some(aggregate.version())).await?;
90
91 aggregate = stream.try_fold(aggregate, |mut a, event| async move {
92 ensure!(
93 event.event_id == (a.version() + 1) as u64,
94 crate::error::adapter_error::InconsistentEventOrderingSnafu
95 );
96 a.apply(&event);
97 Ok(a)
98 }).await?;
99
100 #[cfg(feature = "prometheus")]
101 READ_EVENTS_COUNTER.inc_by(aggregate.version() - start_version);
102
103 if aggregate.version() == 0 {
104 Ok(None)
105 } else {
106 if (aggregate.version() - start_version) > 10 {
107 self.adapter.save_snapshot(&aggregate).await?;
108 }
109 Ok(Some(aggregate))
110 }
111 }
112 None => {
113 self.aggregate_inner_without_snapshot(aggregate_id).await
114 }
115 }
116 }
117 async fn aggregate_inner_without_snapshot(&self, aggregate_id: Uuid) -> Result<Option<A>, AdapterError> {
118 let stream = self.adapter.get_events(aggregate_id, None).await?;
119 let mut aggregate = A::new_with_aggregate_id(aggregate_id);
120
121 aggregate = stream.try_fold(aggregate, |mut a, event| async move {
122 ensure!(
123 event.event_id == (a.version() + 1) as u64,
124 crate::error::adapter_error::InconsistentEventOrderingSnafu
125 );
126 a.apply(&event);
127 Ok(a)
128 }).await?;
129
130 #[cfg(feature = "prometheus")]
131 READ_EVENTS_COUNTER.inc_by(aggregate.version());
132
133 if aggregate.version() == 0 {
134 Ok(None)
135 } else {
136 self.adapter.save_snapshot(&aggregate).await?;
137 Ok(Some(aggregate))
138 }
139 }
140
141 pub async fn ids(&self) -> Result<BoxStream<Uuid>, AdapterError> {
143 self.adapter.stream_ids().await
144 }
145
146 pub async fn all(&self) -> Result<BoxStream<Result<A, AdapterError>>, AdapterError> {
148 Ok(self
149 .ids()
150 .await?
151 .then(move |id| async move { self.aggregate(id).await })
152 .try_filter_map(|i| async move { Ok(i) })
153 .boxed())
154 }
155
156 pub async fn create<T: IntoEventList<E>>(&self, events: T) -> Result<A, CreateError> {
158 let events = events.into_list();
159 ensure!(!events.is_empty(), EmptyEventListSnafu);
160
161 let first_aggregate_id = events.first().unwrap().aggregate_id;
162
163 ensure!(first_aggregate_id != Uuid::nil(), NilUuidSnafu);
164
165 for (index, event) in events.iter().enumerate() {
166 ensure!(
167 event.aggregate_id == first_aggregate_id,
168 InconsistentAggregateIdSnafu
169 );
170 ensure!(
171 event.event_id == (index + 1) as u64,
172 InconsistentEventOrderingSnafu
173 );
174 }
175
176 ensure!(
177 self.aggregate(first_aggregate_id)
178 .await
179 .context(crate::error::create_error::AdapterSnafu {})?
180 .is_none(),
181 AggregateAlreadyExistsSnafu {
182 id: first_aggregate_id
183 }
184 );
185
186 self.adapter
187 .save_events(&events)
188 .await
189 .context(AdapterSnafu)?;
190
191 let mut aggregate = A::new_with_aggregate_id(first_aggregate_id);
192
193 for event in events {
194 let old_aggregate = if aggregate.version() == 0 {
195 None
196 } else {
197 Some(aggregate.clone())
198 };
199
200 aggregate.apply(&event);
201 self.notification_adapter
202 .send_event(&event, &aggregate, old_aggregate.as_ref())
203 .await
204 .context(AdapterSnafu {})?;
205 }
206
207
208 #[cfg(feature = "prometheus")]
209 CREATED_AGGREGATES_COUNTER.inc_by(1);
210
211 Ok(aggregate)
212 }
213
214 pub async fn create_with_external_ids(
216 &self,
217 events: Vec<Event<E>>,
218 external_ids: Vec<String>,
219 ) -> Result<A, CreateError> {
220 ensure!(!events.is_empty(), EmptyEventListSnafu);
221
222 let first_aggregate_id = events.first().unwrap().aggregate_id;
223
224 let results: Vec<(Option<Uuid>, String)> = try_join_all(
225 external_ids
226 .iter()
227 .map(|external_id| async move {
228 let aggregate_id = self
229 .adapter
230 .aggregate_id_from_external_id(external_id)
231 .await?;
232 Ok((aggregate_id, external_id.to_string()))
233 })
234 .collect::<Vec<_>>(),
235 )
236 .await
237 .context(AdapterSnafu {})?;
238
239 for (aggregate_id_opt, external_id) in results {
240 ensure!(
241 aggregate_id_opt.is_none() || aggregate_id_opt == Some(first_aggregate_id),
242 ExternalIdAlreadyExistsSnafu {
243 id: aggregate_id_opt.unwrap(),
244 external_id
245 }
246 );
247 }
248
249 self.adapter
250 .save_aggregate_id_to_external_ids(first_aggregate_id, &external_ids)
251 .await
252 .context(AdapterSnafu {})?;
253
254 self.create(events).await
255 }
256
257 pub async fn store<T: IntoEventList<E>, F: Fn(&A) -> T + Send + Sync>(
259 &self,
260 aggregate_id: Uuid,
261 callback: F,
262 ) -> Result<A, StoreError> {
263 let mut error = None;
264 for _i in 0..self.store_attempts {
265 match self
266 .aggregate(aggregate_id)
267 .await
268 .context(crate::error::store_error::AdapterSnafu {})?
269 {
270 Some(mut aggregate) => {
271 let events = callback(&aggregate);
272 let events = events.into_list();
273 if events.is_empty() {
274 return Ok(aggregate);
275 }
276 match self.adapter.save_events(&events).await {
277 Ok(_) => {
278
279 #[cfg(feature = "prometheus")]
280 STORED_EVENT_COUNTER.inc_by(events.len() as u64);
281
282 for event in events {
283 let old_aggregate = aggregate.clone();
284 aggregate.apply(&event);
285 self.notification_adapter
286 .send_event(&event, &aggregate, Some(&old_aggregate))
287 .await
288 .context(crate::error::store_error::AdapterSnafu {})?;
289 }
290 return Ok(aggregate);
291 }
292 Err(err) => {
293 error = Some(err);
294 tokio::time::sleep(Duration::from_millis(50)).await;
295 continue;
296 }
297 }
298 }
299 None => {
300 return Err(StoreError::AggregateDoesNotExist { aggregate_id });
301 }
302 }
303 }
304 match error {
305 None => Err(StoreError::Unknown),
306 Some(err) => Err(StoreError::AdapterError { source: err }),
307 }
308 }
309
310 pub async fn try_store<
312 T: IntoEventList<E>,
313 F: Fn(&A) -> Result<T, I> + Send + Sync,
314 I: Send + Sync,
315 >(
316 &self,
317 aggregate_id: Uuid,
318 callback: F,
319 ) -> Result<Result<A, I>, StoreError> {
320 let mut error = None;
321 for _i in 0..self.store_attempts {
322 match self
323 .aggregate(aggregate_id)
324 .await
325 .context(crate::error::store_error::AdapterSnafu {})?
326 {
327 Some(mut aggregate) => match callback(&aggregate) {
328 Ok(events) => {
329 let events = events.into_list();
330 if events.is_empty() {
331 return Ok(Ok(aggregate));
332 }
333 match self.adapter.save_events(&events).await {
334 Ok(_) => {
335
336 #[cfg(feature = "prometheus")]
337 STORED_EVENT_COUNTER.inc_by(events.len() as u64);
338
339 for event in events {
340 let old_aggregate = aggregate.clone();
341 aggregate.apply(&event);
342 self.notification_adapter
343 .send_event(&event, &aggregate, Some(&old_aggregate))
344 .await
345 .context(crate::error::store_error::AdapterSnafu {})?;
346 }
347 return Ok(Ok(aggregate));
348 }
349 Err(err) => {
350 error = Some(err);
351 tokio::time::sleep(Duration::from_millis(50)).await;
352 continue;
353 }
354 }
355 }
356 Err(err) => return Ok(Err(err)),
357 },
358 None => {
359 return Err(StoreError::AggregateDoesNotExist { aggregate_id });
360 }
361 }
362 }
363
364 match error {
365 None => Err(StoreError::Unknown),
366 Some(err) => Err(StoreError::AdapterError { source: err }),
367 }
368 }
369
370 pub async fn stream_realtime(
371 &self,
372 ) -> Result<BoxStream<Result<RealtimeStreamData<A, E>, AdapterError>>, AdapterError> {
373 let stream = self.notification_adapter.listen_for_events().await?;
374
375 let mapped_stream = stream
376 .map_ok(|data| RealtimeStreamData {
377 event: data.event,
378 new_aggregate: data.new_aggregate,
379 old_aggregate: data.old_aggregate,
380 })
381 .boxed();
382
383 Ok(mapped_stream)
384 }
385
386 pub async fn remove(&self, aggregate_id: Uuid) -> Result<(), AdapterError> {
388 self.adapter.remove(aggregate_id).await
389 }
390}
391
392pub struct RealtimeStreamData<A, E> {
393 pub event: Event<E>,
394 pub old_aggregate: Option<A>,
395 pub new_aggregate: A,
396}
397
398#[cfg(test)]
399mod tests {
400 extern crate std;
401
402 use crate::adapter::in_memory::InMemoryAdapter;
403 use crate::adapter::{EventStoreAdapter, NotificationAdapter};
404 use crate::{Aggregate, Event, EventStore};
405 use alloc::string::ToString;
406 use alloc::vec;
407 use alloc::vec::Vec;
408 use core::convert::Infallible;
409 use futures::{StreamExt, TryStreamExt};
410 use uuid::Uuid;
411
412 #[derive(Debug, Clone, PartialEq)]
413 enum TestEvent {
414 Test(),
415 }
416
417 #[derive(Debug, Clone, Default, Eq, PartialEq)]
418 struct TestAggregate {
419 aggregate_id: Uuid,
420 version: u64,
421 }
422
423 impl Aggregate<TestEvent> for TestAggregate {
424 fn version(&self) -> u64 {
425 self.version
426 }
427
428 fn aggregate_id(&self) -> Uuid {
429 self.aggregate_id
430 }
431
432 fn apply(&mut self, event: &Event<TestEvent>) {
433 self.version = event.event_id
434 }
435
436 fn new_with_aggregate_id(aggregate_id: Uuid) -> Self {
437 Self {
438 aggregate_id,
439 version: 0,
440 }
441 }
442
443 fn name() -> &'static str {
444 "test"
445 }
446 }
447
448 #[tokio::test]
449 async fn get_none_existing_aggregate() {
450 let adapter = InMemoryAdapter::new();
451 let store = EventStore::<TestAggregate, _>::builder()
452 .event_store_adapter(adapter.clone())
453 .notification_adapter(adapter.clone())
454 .build();
455
456 let id = Uuid::new_v4();
457
458 let aggregate = store.aggregate(id).await.unwrap();
459 assert_eq!(aggregate, None);
460 }
461
462 #[tokio::test]
463 async fn get_existing_aggregate() {
464 let adapter = InMemoryAdapter::new();
465 let store = EventStore::<TestAggregate, _>::builder()
466 .event_store_adapter(adapter.clone())
467 .notification_adapter(adapter.clone())
468 .build();
469
470 let id = Uuid::new_v4();
471
472 adapter
473 .save_events(
474 &Event::list_builder()
475 .add_event(TestEvent::Test(), None)
476 .build_new(id),
477 )
478 .await
479 .unwrap();
480
481 let aggregate = store.aggregate(id).await.unwrap();
482 assert!(aggregate.is_some());
483 }
484
485 #[tokio::test]
486 async fn get_ids_should_return_all_ids_in_store() {
487 let adapter = InMemoryAdapter::new();
488 let store = EventStore::<TestAggregate, _>::builder()
489 .event_store_adapter(adapter.clone())
490 .notification_adapter(adapter.clone())
491 .build();
492
493 let id = Uuid::new_v4();
494 adapter
495 .save_events(
496 &Event::list_builder()
497 .add_event(TestEvent::Test(), None)
498 .build_new(id),
499 )
500 .await
501 .unwrap();
502
503 let aggregate: Vec<_> = store.ids().await.unwrap().collect().await;
504 assert_eq!(aggregate, vec![id]);
505 }
506
507 #[tokio::test]
508 async fn get_all_aggregates_should_return_all_aggregates_in_store() {
509 let adapter = InMemoryAdapter::new();
510 let store = EventStore::<TestAggregate, _>::builder()
511 .event_store_adapter(adapter.clone())
512 .notification_adapter(adapter.clone())
513 .build();
514
515 let id = Uuid::new_v4();
516 adapter
517 .save_events(
518 &Event::list_builder()
519 .add_event(TestEvent::Test(), None)
520 .build_new(id),
521 )
522 .await
523 .unwrap();
524
525 let aggregate: Vec<_> = store.all().await.unwrap().try_collect().await.unwrap();
526 assert_eq!(aggregate[0].aggregate_id(), id);
527 }
528
529 #[tokio::test]
530 async fn create_without_external_ids_should_work() {
531 let adapter = InMemoryAdapter::new();
532 let store = EventStore::<TestAggregate, _>::builder()
533 .event_store_adapter(adapter.clone())
534 .notification_adapter(adapter.clone())
535 .build();
536
537 let id = Uuid::new_v4();
538 store
539 .create(
540 Event::list_builder()
541 .add_event(TestEvent::Test(), None)
542 .build_new(id),
543 )
544 .await
545 .unwrap();
546
547 let events = adapter.get_events(id, None).await.unwrap().try_collect::<Vec<_>>().await.unwrap();
548 assert_eq!(events.len(), 1);
549 }
550
551 #[tokio::test]
552 async fn create_without_external_ids_should_not_work_twice() {
553 let adapter = InMemoryAdapter::new();
554 let store = EventStore::<TestAggregate, _>::builder()
555 .event_store_adapter(adapter.clone())
556 .notification_adapter(adapter.clone())
557 .build();
558
559 let id = Uuid::new_v4();
560 store
561 .create(
562 Event::list_builder()
563 .add_event(TestEvent::Test(), None)
564 .build_new(id),
565 )
566 .await
567 .unwrap();
568 store
569 .create(
570 Event::list_builder()
571 .add_event(TestEvent::Test(), None)
572 .build_new(id),
573 )
574 .await
575 .unwrap_err();
576 }
577
578 #[tokio::test]
579 async fn create_without_external_ids_should_send_notifications() {
580 let adapter = InMemoryAdapter::new();
581 let store = EventStore::<TestAggregate, _>::builder()
582 .event_store_adapter(adapter.clone())
583 .notification_adapter(adapter.clone())
584 .build();
585
586 let stream = adapter.listen_for_events().await.unwrap();
587
588 let id = Uuid::new_v4();
589 store
590 .create(
591 Event::list_builder()
592 .add_event(TestEvent::Test(), None)
593 .build_new(id),
594 )
595 .await
596 .unwrap();
597
598 let event = stream.into_future().await.0.unwrap().unwrap().event.payload;
599
600 assert_eq!(event, TestEvent::Test())
601 }
602
603 #[tokio::test]
604 async fn create_with_external_ids_should_work() {
605 let adapter = InMemoryAdapter::new();
606 let store = EventStore::<TestAggregate, _>::builder()
607 .event_store_adapter(adapter.clone())
608 .notification_adapter(adapter.clone())
609 .build();
610
611 let id = Uuid::new_v4();
612 store
613 .create_with_external_ids(
614 Event::list_builder()
615 .add_event(TestEvent::Test(), None)
616 .build_new(id),
617 vec!["123".to_string()],
618 )
619 .await
620 .unwrap();
621
622 let events = adapter.get_events(id, None).await.unwrap().try_collect::<Vec<_>>().await.unwrap();
623 assert_eq!(events.len(), 1);
624 }
625
626 #[tokio::test]
627 async fn create_with_external_ids_should_not_work_twice() {
628 let adapter = InMemoryAdapter::new();
629 let store = EventStore::<TestAggregate, _>::builder()
630 .event_store_adapter(adapter.clone())
631 .notification_adapter(adapter.clone())
632 .build();
633
634 store
635 .create_with_external_ids(
636 Event::list_builder()
637 .add_event(TestEvent::Test(), None)
638 .build_new(Uuid::new_v4()),
639 vec!["123".to_string()],
640 )
641 .await
642 .unwrap();
643 store
644 .create_with_external_ids(
645 Event::list_builder()
646 .add_event(TestEvent::Test(), None)
647 .build_new(Uuid::new_v4()),
648 vec!["123".to_string()],
649 )
650 .await
651 .unwrap_err();
652 }
653
654 #[tokio::test]
655 async fn store_should_store_events() {
656 let adapter = InMemoryAdapter::new();
657 let store = EventStore::<TestAggregate, _>::builder()
658 .event_store_adapter(adapter.clone())
659 .notification_adapter(adapter.clone())
660 .build();
661
662 let id = Uuid::new_v4();
663 store
664 .create(
665 Event::list_builder()
666 .add_event(TestEvent::Test(), None)
667 .build_new(id),
668 )
669 .await
670 .unwrap();
671
672 store
673 .store(id, |aggregate| {
674 Event::list_builder()
675 .add_event(TestEvent::Test(), None)
676 .build(aggregate)
677 })
678 .await
679 .unwrap();
680
681 let events = adapter.get_events(id, None).await.unwrap().try_collect::<Vec<_>>().await.unwrap();
682
683 assert_eq!(events.len(), 2)
684 }
685
686 #[tokio::test]
687 async fn store_should_fail_if_aggregate_does_not_exist() {
688 let adapter = InMemoryAdapter::new();
689 let store = EventStore::<TestAggregate, _>::builder()
690 .event_store_adapter(adapter.clone())
691 .notification_adapter(adapter.clone())
692 .build();
693
694 let id = Uuid::new_v4();
695
696 store
697 .store(id, |aggregate| {
698 Event::list_builder()
699 .add_event(TestEvent::Test(), None)
700 .build(aggregate)
701 })
702 .await
703 .unwrap_err();
704 }
705
706 #[tokio::test]
707 async fn store_should_fail_if_the_event_sequence_is_invalid() {
708 let adapter = InMemoryAdapter::new();
709 let store = EventStore::<TestAggregate, _>::builder()
710 .event_store_adapter(adapter.clone())
711 .notification_adapter(adapter.clone())
712 .build();
713
714 let id = Uuid::new_v4();
715
716 store
717 .store(id, |_aggregate| {
718 vec![
719 Event {
720 aggregate_id: id,
721 event_id: 1,
722 created_at: Default::default(),
723 user_id: None,
724 payload: TestEvent::Test(),
725 },
726 Event {
727 aggregate_id: id,
728 event_id: 1, created_at: Default::default(),
730 user_id: None,
731 payload: TestEvent::Test(),
732 },
733 ]
734 })
735 .await
736 .unwrap_err();
737 }
738
739 #[tokio::test]
740 async fn store_should_send_events() {
741 let adapter = InMemoryAdapter::new();
742 let store = EventStore::<TestAggregate, _>::builder()
743 .event_store_adapter(adapter.clone())
744 .notification_adapter(adapter.clone())
745 .build();
746
747 let id = Uuid::new_v4();
748 store
749 .create(
750 Event::list_builder()
751 .add_event(TestEvent::Test(), None)
752 .build_new(id),
753 )
754 .await
755 .unwrap();
756
757 let stream = adapter.listen_for_events().await.unwrap();
758
759 store
760 .store(id, |aggregate| {
761 Event::list_builder()
762 .add_event(TestEvent::Test(), None)
763 .build(aggregate)
764 })
765 .await
766 .unwrap();
767
768 let event = stream.into_future().await.0.unwrap().unwrap().event.payload;
769
770 assert_eq!(event, TestEvent::Test())
771 }
772
773 #[tokio::test]
774 async fn try_store_should_store_events() {
775 let adapter = InMemoryAdapter::new();
776 let store = EventStore::<TestAggregate, _>::builder()
777 .event_store_adapter(adapter.clone())
778 .notification_adapter(adapter.clone())
779 .build();
780
781 let id = Uuid::new_v4();
782 store
783 .create(
784 Event::list_builder()
785 .add_event(TestEvent::Test(), None)
786 .build_new(id),
787 )
788 .await
789 .unwrap();
790
791 store
792 .try_store(id, |aggregate| {
793 Result::<_, Infallible>::Ok(
794 Event::list_builder()
795 .add_event(TestEvent::Test(), None)
796 .build(aggregate),
797 )
798 })
799 .await
800 .unwrap()
801 .unwrap();
802
803 let events = adapter.get_events(id, None).await.unwrap().try_collect::<Vec<_>>().await.unwrap();
804
805 assert_eq!(events.len(), 2)
806 }
807
808 #[tokio::test]
809 async fn try_store_should_fail_if_aggregate_does_not_exist() {
810 let adapter = InMemoryAdapter::new();
811 let store = EventStore::<TestAggregate, _>::builder()
812 .event_store_adapter(adapter.clone())
813 .notification_adapter(adapter.clone())
814 .build();
815
816 let id = Uuid::new_v4();
817
818 store
819 .try_store(id, |aggregate| {
820 Result::<_, Infallible>::Ok(
821 Event::list_builder()
822 .add_event(TestEvent::Test(), None)
823 .build(aggregate),
824 )
825 })
826 .await
827 .unwrap_err();
828 }
829
830 #[tokio::test]
831 async fn try_store_should_fail_if_the_event_sequence_is_invalid() {
832 let adapter = InMemoryAdapter::new();
833 let store = EventStore::<TestAggregate, _>::builder()
834 .event_store_adapter(adapter.clone())
835 .notification_adapter(adapter.clone())
836 .build();
837
838 let id = Uuid::new_v4();
839
840 store
841 .try_store(id, |_aggregate| {
842 Result::<_, Infallible>::Ok(vec![
843 Event {
844 aggregate_id: id,
845 event_id: 1,
846 created_at: Default::default(),
847 user_id: None,
848 payload: TestEvent::Test(),
849 },
850 Event {
851 aggregate_id: id,
852 event_id: 1, created_at: Default::default(),
854 user_id: None,
855 payload: TestEvent::Test(),
856 },
857 ])
858 })
859 .await
860 .unwrap_err();
861 }
862
863 #[tokio::test]
864 async fn try_store_error_thrown_inside_should_be_propegated() {
865 let adapter = InMemoryAdapter::new();
866 let store = EventStore::<TestAggregate, _>::builder()
867 .event_store_adapter(adapter.clone())
868 .notification_adapter(adapter.clone())
869 .build();
870
871 let id = Uuid::new_v4();
872 store
873 .create(
874 Event::list_builder()
875 .add_event(TestEvent::Test(), None)
876 .build_new(id),
877 )
878 .await
879 .unwrap();
880
881 let err = store
882 .try_store(id, |_aggregate| {
883 Err::<Vec<Event<TestEvent>>, &str>("Failure")
884 })
885 .await
886 .unwrap()
887 .unwrap_err();
888
889 assert_eq!(err, "Failure")
890 }
891
892 #[tokio::test]
893 async fn try_store_send_events() {
894 let adapter = InMemoryAdapter::new();
895 let store = EventStore::<TestAggregate, _>::builder()
896 .event_store_adapter(adapter.clone())
897 .notification_adapter(adapter.clone())
898 .build();
899
900 let id = Uuid::new_v4();
901 store
902 .create(
903 Event::list_builder()
904 .add_event(TestEvent::Test(), None)
905 .build_new(id),
906 )
907 .await
908 .unwrap();
909
910 store
911 .try_store(id, |aggregate| {
912 Result::<_, Infallible>::Ok(
913 Event::list_builder()
914 .add_event(TestEvent::Test(), None)
915 .build(aggregate),
916 )
917 })
918 .await
919 .unwrap()
920 .unwrap();
921
922 let events = adapter.get_events(id, None).await.unwrap().try_collect::<Vec<_>>().await.unwrap();
923
924 assert_eq!(events.len(), 2)
925 }
926
927 #[tokio::test]
928 async fn remove_should_remove_all_events_for_aggregate() {
929 let adapter = InMemoryAdapter::new();
930 let store = EventStore::<TestAggregate, _>::builder()
931 .event_store_adapter(adapter.clone())
932 .notification_adapter(adapter.clone())
933 .build();
934
935 let id = Uuid::new_v4();
936 store
937 .create(
938 Event::list_builder()
939 .add_event(TestEvent::Test(), None)
940 .build_new(id),
941 )
942 .await
943 .unwrap();
944
945 let events = adapter.get_events(id, None).await.unwrap().try_collect::<Vec<_>>().await.unwrap();
946
947 assert_eq!(events.len(), 1);
948
949 store.remove(id).await.unwrap();
950
951 let events = adapter.get_events(id, None).await.unwrap().try_collect::<Vec<_>>().await.unwrap();
952
953 assert_eq!(events.len(), 0);
954 }
955}