eventually_util/inmemory/
store.rs

1//! Contains supporting entities using an in-memory backend.
2
3use std::collections::HashMap;
4use std::fmt::Debug;
5use std::hash::Hash;
6use std::sync::atomic::{AtomicU32, Ordering};
7use std::sync::Arc;
8
9use eventually_core::aggregate::Aggregate;
10use eventually_core::store::persistent::EventBuilderWithVersion;
11use eventually_core::store::{AppendError, EventStream, Expected, Persisted, Select};
12use eventually_core::subscription::EventSubscriber;
13use eventually_core::versioning::Versioned;
14
15use futures::future::BoxFuture;
16use futures::stream::{empty, iter, StreamExt, TryStreamExt};
17
18use parking_lot::RwLock;
19
20use tokio::sync::broadcast::{channel, RecvError, Sender};
21
22#[cfg(feature = "with-tracing")]
23use tracing_futures::Instrument;
24
25const SUBSCRIBE_CHANNEL_DEFAULT_CAP: usize = 128;
26
27/// Error returned by the [`EventStore::append`] when a conflict has been detected.
28///
29/// [`EventStore::append`]: trait.EventStore.html#method.append
30#[derive(Debug, thiserror::Error, PartialEq, Eq)]
31#[error("conflicting versions, expected {expected}, got instead {actual}")]
32pub struct ConflictError {
33    /// The last version value found the Store.
34    pub expected: u32,
35    /// The actual version passed by the caller to the Store.
36    pub actual: u32,
37}
38
39impl AppendError for ConflictError {
40    fn is_conflict_error(&self) -> bool {
41        true
42    }
43}
44
45/// Error returned by the [`EventSubscriber`] when reading elements
46/// from the [`EventStream`] produced by [`subscribe_all`].
47///
48/// [`EventSubscriber`]: struct.EventSubscriber.html
49/// [`EventStream`]: ../../eventually-core/subscription/type.EventStream.html
50/// [`subscribe_all`]: struct.EventSubscriber.html#method.subscribe_all
51#[derive(Debug, thiserror::Error)]
52#[error("failed to read event from subscription watch channel: {0}")]
53pub struct SubscriberError(#[from] RecvError);
54
55/// Builder for [`EventStore`] instances.
56///
57/// [`EventStore`]: struct.EventStore.html
58pub struct EventStoreBuilder;
59
60impl EventStoreBuilder {
61    /// Builds a new [`EventStore`] instance compatible with the provided [`Aggregate`].
62    ///
63    /// [`Aggregate`]: ../../eventually-core/aggregate/trait.Aggregate.html
64    #[inline]
65    pub fn for_aggregate<T>(_: &T) -> EventStore<T::Id, T::Event>
66    where
67        T: Aggregate,
68        T::Id: Hash + Eq,
69    {
70        Default::default()
71    }
72}
73
74/// An in-memory [`EventStore`] implementation, backed by an [`HashMap`].
75///
76/// [`EventStore`]: ../../eventually_core/store/trait.EventStore.html
77/// [`HashMap`]: something
78#[derive(Debug, Clone)]
79pub struct EventStore<Id, Event>
80where
81    Id: Hash + Eq,
82{
83    global_offset: Arc<AtomicU32>,
84    tx: Sender<Persisted<Id, Event>>,
85    backend: Arc<RwLock<HashMap<Id, Vec<Persisted<Id, Event>>>>>,
86}
87
88impl<Id, Event> EventStore<Id, Event>
89where
90    Id: Hash + Eq,
91{
92    /// Creates a new EventStore with a specified in-memory broadcast channel
93    /// size, which will used by the [`subscribe_all`] method to notify
94    /// of newly [`append`] events.
95    ///
96    /// [`subscribe_all`]: struct.EventStore.html#method.subscribe_all
97    /// [`append`]: struct.EventStore.html#method.append
98    pub fn new(subscribe_capacity: usize) -> Self {
99        // Use this broadcast channel to send append events to
100        // subscriptions from .subscribe_all()
101        let (tx, _) = channel(subscribe_capacity);
102
103        Self {
104            tx,
105            global_offset: Arc::new(AtomicU32::new(0)),
106            backend: Arc::new(RwLock::new(HashMap::new())),
107        }
108    }
109}
110
111impl<Id, Event> Default for EventStore<Id, Event>
112where
113    Id: Hash + Eq,
114{
115    #[inline]
116    fn default() -> Self {
117        Self::new(SUBSCRIBE_CHANNEL_DEFAULT_CAP)
118    }
119}
120
121impl<Id, Event> EventSubscriber for EventStore<Id, Event>
122where
123    Id: Hash + Eq + Sync + Send + Clone,
124    Event: Sync + Send + Clone,
125{
126    type SourceId = Id;
127    type Event = Event;
128    type Error = SubscriberError;
129
130    fn subscribe_all(
131        &self,
132    ) -> BoxFuture<Result<eventually_core::subscription::EventStream<Self>, Self::Error>> {
133        // Create a new Receiver from the store Sender.
134        //
135        // This receiver implements the TryStream trait, which works perfectly
136        // with the definition of the EventStream.
137        let rx = self.tx.subscribe();
138
139        Box::pin(async move { Ok(rx.into_stream().map_err(SubscriberError).boxed()) })
140    }
141}
142
143impl<Id, Event> eventually_core::store::EventStore for EventStore<Id, Event>
144where
145    Id: Hash + Eq + Sync + Send + Debug + Clone,
146    Event: Sync + Send + Debug + Clone,
147{
148    type SourceId = Id;
149    type Event = Event;
150    type Error = ConflictError;
151
152    fn append(
153        &mut self,
154        id: Self::SourceId,
155        version: Expected,
156        events: Vec<Self::Event>,
157    ) -> BoxFuture<Result<u32, Self::Error>> {
158        #[cfg(feature = "with-tracing")]
159        let span = tracing::info_span!(
160            "EventStore::append",
161            id = ?id,
162            version = ?version,
163            events = ?events
164        );
165
166        let fut = async move {
167            let expected = self
168                .backend
169                .read()
170                .get(&id)
171                .and_then(|events| events.last())
172                .map(|event| event.version())
173                .unwrap_or(0);
174
175            if let Expected::Exact(actual) = version {
176                if expected != actual {
177                    return Err(ConflictError { expected, actual });
178                }
179            }
180
181            let mut persisted_events: Vec<Persisted<Id, Event>> =
182                into_persisted_events(expected, id.clone(), events)
183                    .into_iter()
184                    .map(|event| {
185                        let offset = self.global_offset.fetch_add(1, Ordering::SeqCst);
186                        event.sequence_number(offset)
187                    })
188                    .collect();
189
190            // Copy of the events for broadcasting.
191            let broadcast_copy = persisted_events.clone();
192
193            let last_version = persisted_events
194                .last()
195                .map(Persisted::version)
196                .unwrap_or(expected);
197
198            self.backend
199                .write()
200                .entry(id)
201                .and_modify(|events| events.append(&mut persisted_events))
202                .or_insert_with(|| persisted_events);
203
204            // From tokio documentation, the send() operation can only
205            // fail if there are no active receivers to listen to these events.
206            //
207            // From our perspective, this is not an issue, since appends
208            // might be done without any active subscription.
209            #[allow(unused_must_use)]
210            {
211                // Broadcast events into the store's Sender channel.
212                broadcast_copy.into_iter().for_each(|event| {
213                    self.tx.send(event);
214                });
215            }
216
217            Ok(last_version)
218        };
219
220        #[cfg(feature = "with-tracing")]
221        let fut = fut.instrument(span);
222
223        Box::pin(fut)
224    }
225
226    fn stream(
227        &self,
228        id: Self::SourceId,
229        select: Select,
230    ) -> BoxFuture<Result<EventStream<Self>, Self::Error>> {
231        #[cfg(feature = "with-tracing")]
232        let span = tracing::info_span!(
233            "EventStore::stream",
234            id = ?id,
235            select = ?select
236        );
237
238        let fut = async move {
239            Ok(self
240                .backend
241                .read()
242                .get(&id)
243                .map(move |events| {
244                    let stream = events
245                        .clone()
246                        .into_iter()
247                        .filter(move |event| match select {
248                            Select::All => true,
249                            Select::From(v) => event.version() >= v,
250                        });
251
252                    iter(stream).map(Ok).boxed()
253                })
254                .unwrap_or_else(|| empty().boxed()))
255        };
256
257        #[cfg(feature = "with-tracing")]
258        let fut = fut.instrument(span);
259
260        Box::pin(fut)
261    }
262
263    fn stream_all(&self, select: Select) -> BoxFuture<Result<EventStream<Self>, Self::Error>> {
264        #[cfg(feature = "with-tracing")]
265        let span = tracing::info_span!(
266            "EventStore::stream_all",
267            select = ?select
268        );
269
270        let mut events: Vec<Persisted<Id, Event>> = self
271            .backend
272            .read()
273            .values()
274            .flatten()
275            .cloned()
276            .filter(move |event| match select {
277                Select::All => true,
278                Select::From(sequence_number) => event.sequence_number() >= sequence_number,
279            })
280            .collect();
281
282        // Events must be sorted by the sequence number when using $all.
283        events.sort_by(|a, b| a.sequence_number().cmp(&b.sequence_number()));
284
285        let fut = futures::future::ok(iter(events).map(Ok).boxed());
286
287        #[cfg(feature = "with-tracing")]
288        let fut = fut.instrument(span);
289
290        Box::pin(fut)
291    }
292
293    fn remove(&mut self, id: Self::SourceId) -> BoxFuture<Result<(), Self::Error>> {
294        #[cfg(feature = "with-tracing")]
295        let span = tracing::info_span!(
296            "EventStore::remove",
297            id = ?id
298        );
299
300        let fut = async move {
301            self.backend.write().remove(&id);
302
303            Ok(())
304        };
305
306        #[cfg(feature = "with-tracing")]
307        let fut = fut.instrument(span);
308
309        Box::pin(fut)
310    }
311}
312
313fn into_persisted_events<Id, T>(
314    last_version: u32,
315    id: Id,
316    events: Vec<T>,
317) -> Vec<EventBuilderWithVersion<Id, T>>
318where
319    Id: Clone,
320{
321    events
322        .into_iter()
323        .enumerate()
324        .map(|(i, event)| Persisted::from(id.clone(), event).version(last_version + (i as u32) + 1))
325        .collect()
326}
327
328#[cfg(test)]
329mod tests {
330    use super::{ConflictError, EventStore as InMemoryStore};
331
332    use std::cell::RefCell;
333    use std::sync::Arc;
334
335    use eventually_core::store::{EventStore, Expected, Persisted, Select};
336    use eventually_core::subscription::EventSubscriber;
337
338    use futures::{StreamExt, TryStreamExt};
339
340    use tokio::sync::Barrier;
341
342    #[derive(Debug, PartialEq, Eq, Clone, Copy)]
343    enum Event {
344        A,
345        B,
346        C,
347    }
348
349    #[tokio::test]
350    async fn subscribe_returns_all_the_latest_events() {
351        let id_1 = "test-subscribe-1";
352        let id_2 = "test-subscribe-2";
353
354        // Create the store and the clone to move into the async closure.
355        let store = InMemoryStore::<&'static str, Event>::default();
356        let store_1 = store.clone();
357
358        // Use a barrier to synchronize the start of the first 2 events
359        // and the first subscription start.
360        let barrier = Arc::new(Barrier::new(2));
361        let barrier_1 = barrier.clone();
362
363        // First subscription.
364        let join_handle_1 = tokio::spawn(async move {
365            let mut events = store_1.subscribe_all().await.unwrap().enumerate();
366            barrier_1.wait().await;
367
368            while let Some((i, res)) = events.next().await {
369                assert!(res.is_ok());
370                let event = res.unwrap();
371
372                match i {
373                    0 => assert_eq!(
374                        Persisted::from(id_1, Event::A)
375                            .version(1)
376                            .sequence_number(0),
377                        event
378                    ),
379                    1 => assert_eq!(
380                        Persisted::from(id_1, Event::B)
381                            .version(2)
382                            .sequence_number(1),
383                        event
384                    ),
385                    2 => assert_eq!(
386                        Persisted::from(id_1, Event::C)
387                            .version(3)
388                            .sequence_number(2),
389                        event
390                    ),
391                    3 => {
392                        assert_eq!(
393                            Persisted::from(id_2, Event::A)
394                                .version(1)
395                                .sequence_number(3),
396                            event
397                        );
398                        // Break out of the stream looping after the last expected
399                        // event has been received.
400                        break;
401                    }
402                    _ => panic!("should not reach this point"),
403                };
404            }
405        });
406
407        // Use internal mutability to escape the borrow checker rules for the test,
408        // because append() requires &mut self, but subscribe() requires &self first.
409        let store = RefCell::new(store);
410        barrier.wait().await;
411
412        assert!(store
413            .borrow_mut()
414            .append(id_1, Expected::Exact(0), vec![Event::A])
415            .await
416            .is_ok());
417
418        assert!(store
419            .borrow_mut()
420            .append(id_1, Expected::Exact(1), vec![Event::B])
421            .await
422            .is_ok());
423
424        let store = store.into_inner();
425        let store_2 = store.clone();
426
427        // Same thing as above, but to wait the second batch of the events
428        // appended to the store.
429        let barrier = Arc::new(Barrier::new(2));
430        let barrier_2 = barrier.clone();
431
432        // Second subscriber, it will only see events of the second batch,
433        // which is when it started listening to events.
434        let join_handle_2 = tokio::spawn(async move {
435            let mut events = store_2.subscribe_all().await.unwrap().enumerate();
436            barrier_2.wait().await;
437
438            while let Some((i, res)) = events.next().await {
439                assert!(res.is_ok());
440                let event = res.unwrap();
441
442                match i {
443                    0 => assert_eq!(
444                        Persisted::from(id_1, Event::C)
445                            .version(3)
446                            .sequence_number(2),
447                        event
448                    ),
449                    1 => {
450                        assert_eq!(
451                            Persisted::from(id_2, Event::A)
452                                .version(1)
453                                .sequence_number(3),
454                            event
455                        );
456                        break;
457                    }
458                    _ => panic!("should not reach this point"),
459                };
460            }
461        });
462
463        let store = RefCell::new(store);
464        barrier.wait().await;
465
466        assert!(store
467            .borrow_mut()
468            .append(id_1, Expected::Exact(2), vec![Event::C])
469            .await
470            .is_ok());
471
472        assert!(store
473            .borrow_mut()
474            .append(id_2, Expected::Exact(0), vec![Event::A])
475            .await
476            .is_ok());
477
478        // Wait for both subscribers to be done.
479        tokio::join!(join_handle_1, join_handle_2);
480    }
481
482    #[tokio::test]
483    async fn append_with_any_versions_works() {
484        let mut store = InMemoryStore::<&'static str, Event>::default();
485        let id = "test-append";
486
487        let events = vec![Event::A, Event::B, Event::C];
488
489        assert!(store
490            .append(id, Expected::Any, events.clone())
491            .await
492            .is_ok());
493
494        assert!(store
495            .append(id, Expected::Any, events.clone())
496            .await
497            .is_ok());
498    }
499
500    #[tokio::test]
501    async fn append_with_expected_versions_works() {
502        let mut store = InMemoryStore::<&'static str, Event>::default();
503        let id = "test-append";
504
505        let events = vec![Event::A, Event::B, Event::C];
506
507        let result = store.append(id, Expected::Exact(0), events.clone()).await;
508        assert!(result.is_ok());
509
510        let last_version = result.unwrap();
511
512        assert!(store
513            .append(id, Expected::Exact(last_version), events.clone())
514            .await
515            .is_ok());
516    }
517
518    #[tokio::test]
519    async fn append_with_wrong_expected_versions_fails() {
520        let id = "test-append";
521        let mut store = InMemoryStore::<&'static str, Event>::default();
522
523        let events = vec![Event::A, Event::B, Event::C];
524
525        let result = store.append(id, Expected::Exact(0), events.clone()).await;
526        assert!(result.is_ok());
527
528        let last_version = result.unwrap();
529        let poisoned_last_version = last_version + 1; // Poison the last version on purpose
530
531        let result = store
532            .append(id, Expected::Exact(poisoned_last_version), events.clone())
533            .await;
534
535        assert_eq!(
536            Err(ConflictError {
537                expected: last_version,
538                actual: poisoned_last_version
539            }),
540            result
541        );
542    }
543
544    #[tokio::test]
545    async fn remove() {
546        let id = "test-remove";
547        let mut store = InMemoryStore::<&'static str, Event>::default();
548
549        // Removing an empty stream works.
550        assert!(store.remove(id).await.is_ok());
551        assert!(stream_to_vec(&store, id, Select::All)
552            .await
553            .unwrap()
554            .is_empty());
555
556        // Add some events and lets remove them after
557        let events = vec![Event::A, Event::B, Event::C];
558        assert!(store
559            .append(id, Expected::Exact(0), events.clone())
560            .await
561            .is_ok());
562
563        assert!(store.remove(id).await.is_ok());
564        assert!(stream_to_vec(&store, id, Select::All)
565            .await
566            .unwrap()
567            .is_empty());
568    }
569
570    #[tokio::test]
571    async fn stream() {
572        let id = "test-stream";
573        let mut store = InMemoryStore::<&'static str, Event>::default();
574
575        let events_1 = vec![Event::A, Event::B, Event::C];
576        let events_2 = vec![Event::B, Event::A];
577
578        let result = store.append(id, Expected::Exact(0), events_1).await;
579        assert!(result.is_ok());
580
581        let last_version = result.unwrap();
582        assert!(store
583            .append(id, Expected::Exact(last_version), events_2)
584            .await
585            .is_ok());
586
587        // Stream from the start.
588        assert_eq!(
589            stream_to_vec(&store, id, Select::All).await.unwrap(),
590            vec![
591                Persisted::from(id, Event::A).version(1).sequence_number(0),
592                Persisted::from(id, Event::B).version(2).sequence_number(1),
593                Persisted::from(id, Event::C).version(3).sequence_number(2),
594                Persisted::from(id, Event::B).version(4).sequence_number(3),
595                Persisted::from(id, Event::A).version(5).sequence_number(4)
596            ]
597        );
598
599        // Stream from another offset.
600        assert_eq!(
601            stream_to_vec(&store, id, Select::From(4)).await.unwrap(),
602            vec![
603                Persisted::from(id, Event::B).version(4).sequence_number(3),
604                Persisted::from(id, Event::A).version(5).sequence_number(4)
605            ]
606        );
607
608        // Stream from an unexistent offset.
609        assert!(stream_to_vec(&store, id, Select::From(10))
610            .await
611            .unwrap()
612            .is_empty());
613    }
614
615    #[tokio::test]
616    async fn stream_all() {
617        let id_1 = "test-stream-all-1";
618        let id_2 = "test-stream-all-2";
619
620        let mut store = InMemoryStore::<&'static str, Event>::default();
621
622        assert!(store
623            .append(id_1, Expected::Any, vec![Event::A])
624            .await
625            .is_ok());
626
627        assert!(store
628            .append(id_2, Expected::Any, vec![Event::B])
629            .await
630            .is_ok());
631
632        assert!(store
633            .append(id_1, Expected::Any, vec![Event::C])
634            .await
635            .is_ok());
636
637        assert!(store
638            .append(id_2, Expected::Any, vec![Event::A])
639            .await
640            .is_ok());
641
642        // Stream from the start.
643        let result: anyhow::Result<Vec<Persisted<&str, Event>>> = store
644            .stream_all(Select::All)
645            .await
646            .unwrap()
647            .try_collect()
648            .await
649            .map_err(anyhow::Error::from);
650
651        assert!(result.is_ok());
652        let result = result.unwrap();
653
654        assert_eq!(
655            vec![
656                Persisted::from(id_1, Event::A)
657                    .version(1)
658                    .sequence_number(0),
659                Persisted::from(id_2, Event::B)
660                    .version(1)
661                    .sequence_number(1),
662                Persisted::from(id_1, Event::C)
663                    .version(2)
664                    .sequence_number(2),
665                Persisted::from(id_2, Event::A)
666                    .version(2)
667                    .sequence_number(3)
668            ],
669            result
670        );
671
672        // Stream from a specified sequence number.
673        let result: anyhow::Result<Vec<Persisted<&str, Event>>> = store
674            .stream_all(Select::From(2))
675            .await
676            .unwrap()
677            .try_collect()
678            .await
679            .map_err(anyhow::Error::from);
680
681        assert!(result.is_ok());
682        let result = result.unwrap();
683
684        assert_eq!(
685            vec![
686                Persisted::from(id_1, Event::C)
687                    .version(2)
688                    .sequence_number(2),
689                Persisted::from(id_2, Event::A)
690                    .version(2)
691                    .sequence_number(3)
692            ],
693            result
694        );
695    }
696
697    async fn stream_to_vec(
698        store: &InMemoryStore<&'static str, Event>,
699        id: &'static str,
700        select: Select,
701    ) -> anyhow::Result<Vec<Persisted<&'static str, Event>>> {
702        store
703            .stream(id, select)
704            .await
705            .map_err(anyhow::Error::from)?
706            .try_collect()
707            .await
708            .map_err(anyhow::Error::from)
709    }
710}