1use std::collections::HashMap;
4use std::fmt::Debug;
5use std::hash::Hash;
6use std::sync::atomic::{AtomicU32, Ordering};
7use std::sync::Arc;
8
9use eventually_core::aggregate::Aggregate;
10use eventually_core::store::persistent::EventBuilderWithVersion;
11use eventually_core::store::{AppendError, EventStream, Expected, Persisted, Select};
12use eventually_core::subscription::EventSubscriber;
13use eventually_core::versioning::Versioned;
14
15use futures::future::BoxFuture;
16use futures::stream::{empty, iter, StreamExt, TryStreamExt};
17
18use parking_lot::RwLock;
19
20use tokio::sync::broadcast::{channel, RecvError, Sender};
21
22#[cfg(feature = "with-tracing")]
23use tracing_futures::Instrument;
24
25const SUBSCRIBE_CHANNEL_DEFAULT_CAP: usize = 128;
26
27#[derive(Debug, thiserror::Error, PartialEq, Eq)]
31#[error("conflicting versions, expected {expected}, got instead {actual}")]
32pub struct ConflictError {
33 pub expected: u32,
35 pub actual: u32,
37}
38
39impl AppendError for ConflictError {
40 fn is_conflict_error(&self) -> bool {
41 true
42 }
43}
44
45#[derive(Debug, thiserror::Error)]
52#[error("failed to read event from subscription watch channel: {0}")]
53pub struct SubscriberError(#[from] RecvError);
54
55pub struct EventStoreBuilder;
59
60impl EventStoreBuilder {
61 #[inline]
65 pub fn for_aggregate<T>(_: &T) -> EventStore<T::Id, T::Event>
66 where
67 T: Aggregate,
68 T::Id: Hash + Eq,
69 {
70 Default::default()
71 }
72}
73
74#[derive(Debug, Clone)]
79pub struct EventStore<Id, Event>
80where
81 Id: Hash + Eq,
82{
83 global_offset: Arc<AtomicU32>,
84 tx: Sender<Persisted<Id, Event>>,
85 backend: Arc<RwLock<HashMap<Id, Vec<Persisted<Id, Event>>>>>,
86}
87
88impl<Id, Event> EventStore<Id, Event>
89where
90 Id: Hash + Eq,
91{
92 pub fn new(subscribe_capacity: usize) -> Self {
99 let (tx, _) = channel(subscribe_capacity);
102
103 Self {
104 tx,
105 global_offset: Arc::new(AtomicU32::new(0)),
106 backend: Arc::new(RwLock::new(HashMap::new())),
107 }
108 }
109}
110
111impl<Id, Event> Default for EventStore<Id, Event>
112where
113 Id: Hash + Eq,
114{
115 #[inline]
116 fn default() -> Self {
117 Self::new(SUBSCRIBE_CHANNEL_DEFAULT_CAP)
118 }
119}
120
121impl<Id, Event> EventSubscriber for EventStore<Id, Event>
122where
123 Id: Hash + Eq + Sync + Send + Clone,
124 Event: Sync + Send + Clone,
125{
126 type SourceId = Id;
127 type Event = Event;
128 type Error = SubscriberError;
129
130 fn subscribe_all(
131 &self,
132 ) -> BoxFuture<Result<eventually_core::subscription::EventStream<Self>, Self::Error>> {
133 let rx = self.tx.subscribe();
138
139 Box::pin(async move { Ok(rx.into_stream().map_err(SubscriberError).boxed()) })
140 }
141}
142
143impl<Id, Event> eventually_core::store::EventStore for EventStore<Id, Event>
144where
145 Id: Hash + Eq + Sync + Send + Debug + Clone,
146 Event: Sync + Send + Debug + Clone,
147{
148 type SourceId = Id;
149 type Event = Event;
150 type Error = ConflictError;
151
152 fn append(
153 &mut self,
154 id: Self::SourceId,
155 version: Expected,
156 events: Vec<Self::Event>,
157 ) -> BoxFuture<Result<u32, Self::Error>> {
158 #[cfg(feature = "with-tracing")]
159 let span = tracing::info_span!(
160 "EventStore::append",
161 id = ?id,
162 version = ?version,
163 events = ?events
164 );
165
166 let fut = async move {
167 let expected = self
168 .backend
169 .read()
170 .get(&id)
171 .and_then(|events| events.last())
172 .map(|event| event.version())
173 .unwrap_or(0);
174
175 if let Expected::Exact(actual) = version {
176 if expected != actual {
177 return Err(ConflictError { expected, actual });
178 }
179 }
180
181 let mut persisted_events: Vec<Persisted<Id, Event>> =
182 into_persisted_events(expected, id.clone(), events)
183 .into_iter()
184 .map(|event| {
185 let offset = self.global_offset.fetch_add(1, Ordering::SeqCst);
186 event.sequence_number(offset)
187 })
188 .collect();
189
190 let broadcast_copy = persisted_events.clone();
192
193 let last_version = persisted_events
194 .last()
195 .map(Persisted::version)
196 .unwrap_or(expected);
197
198 self.backend
199 .write()
200 .entry(id)
201 .and_modify(|events| events.append(&mut persisted_events))
202 .or_insert_with(|| persisted_events);
203
204 #[allow(unused_must_use)]
210 {
211 broadcast_copy.into_iter().for_each(|event| {
213 self.tx.send(event);
214 });
215 }
216
217 Ok(last_version)
218 };
219
220 #[cfg(feature = "with-tracing")]
221 let fut = fut.instrument(span);
222
223 Box::pin(fut)
224 }
225
226 fn stream(
227 &self,
228 id: Self::SourceId,
229 select: Select,
230 ) -> BoxFuture<Result<EventStream<Self>, Self::Error>> {
231 #[cfg(feature = "with-tracing")]
232 let span = tracing::info_span!(
233 "EventStore::stream",
234 id = ?id,
235 select = ?select
236 );
237
238 let fut = async move {
239 Ok(self
240 .backend
241 .read()
242 .get(&id)
243 .map(move |events| {
244 let stream = events
245 .clone()
246 .into_iter()
247 .filter(move |event| match select {
248 Select::All => true,
249 Select::From(v) => event.version() >= v,
250 });
251
252 iter(stream).map(Ok).boxed()
253 })
254 .unwrap_or_else(|| empty().boxed()))
255 };
256
257 #[cfg(feature = "with-tracing")]
258 let fut = fut.instrument(span);
259
260 Box::pin(fut)
261 }
262
263 fn stream_all(&self, select: Select) -> BoxFuture<Result<EventStream<Self>, Self::Error>> {
264 #[cfg(feature = "with-tracing")]
265 let span = tracing::info_span!(
266 "EventStore::stream_all",
267 select = ?select
268 );
269
270 let mut events: Vec<Persisted<Id, Event>> = self
271 .backend
272 .read()
273 .values()
274 .flatten()
275 .cloned()
276 .filter(move |event| match select {
277 Select::All => true,
278 Select::From(sequence_number) => event.sequence_number() >= sequence_number,
279 })
280 .collect();
281
282 events.sort_by(|a, b| a.sequence_number().cmp(&b.sequence_number()));
284
285 let fut = futures::future::ok(iter(events).map(Ok).boxed());
286
287 #[cfg(feature = "with-tracing")]
288 let fut = fut.instrument(span);
289
290 Box::pin(fut)
291 }
292
293 fn remove(&mut self, id: Self::SourceId) -> BoxFuture<Result<(), Self::Error>> {
294 #[cfg(feature = "with-tracing")]
295 let span = tracing::info_span!(
296 "EventStore::remove",
297 id = ?id
298 );
299
300 let fut = async move {
301 self.backend.write().remove(&id);
302
303 Ok(())
304 };
305
306 #[cfg(feature = "with-tracing")]
307 let fut = fut.instrument(span);
308
309 Box::pin(fut)
310 }
311}
312
313fn into_persisted_events<Id, T>(
314 last_version: u32,
315 id: Id,
316 events: Vec<T>,
317) -> Vec<EventBuilderWithVersion<Id, T>>
318where
319 Id: Clone,
320{
321 events
322 .into_iter()
323 .enumerate()
324 .map(|(i, event)| Persisted::from(id.clone(), event).version(last_version + (i as u32) + 1))
325 .collect()
326}
327
328#[cfg(test)]
329mod tests {
330 use super::{ConflictError, EventStore as InMemoryStore};
331
332 use std::cell::RefCell;
333 use std::sync::Arc;
334
335 use eventually_core::store::{EventStore, Expected, Persisted, Select};
336 use eventually_core::subscription::EventSubscriber;
337
338 use futures::{StreamExt, TryStreamExt};
339
340 use tokio::sync::Barrier;
341
342 #[derive(Debug, PartialEq, Eq, Clone, Copy)]
343 enum Event {
344 A,
345 B,
346 C,
347 }
348
349 #[tokio::test]
350 async fn subscribe_returns_all_the_latest_events() {
351 let id_1 = "test-subscribe-1";
352 let id_2 = "test-subscribe-2";
353
354 let store = InMemoryStore::<&'static str, Event>::default();
356 let store_1 = store.clone();
357
358 let barrier = Arc::new(Barrier::new(2));
361 let barrier_1 = barrier.clone();
362
363 let join_handle_1 = tokio::spawn(async move {
365 let mut events = store_1.subscribe_all().await.unwrap().enumerate();
366 barrier_1.wait().await;
367
368 while let Some((i, res)) = events.next().await {
369 assert!(res.is_ok());
370 let event = res.unwrap();
371
372 match i {
373 0 => assert_eq!(
374 Persisted::from(id_1, Event::A)
375 .version(1)
376 .sequence_number(0),
377 event
378 ),
379 1 => assert_eq!(
380 Persisted::from(id_1, Event::B)
381 .version(2)
382 .sequence_number(1),
383 event
384 ),
385 2 => assert_eq!(
386 Persisted::from(id_1, Event::C)
387 .version(3)
388 .sequence_number(2),
389 event
390 ),
391 3 => {
392 assert_eq!(
393 Persisted::from(id_2, Event::A)
394 .version(1)
395 .sequence_number(3),
396 event
397 );
398 break;
401 }
402 _ => panic!("should not reach this point"),
403 };
404 }
405 });
406
407 let store = RefCell::new(store);
410 barrier.wait().await;
411
412 assert!(store
413 .borrow_mut()
414 .append(id_1, Expected::Exact(0), vec![Event::A])
415 .await
416 .is_ok());
417
418 assert!(store
419 .borrow_mut()
420 .append(id_1, Expected::Exact(1), vec![Event::B])
421 .await
422 .is_ok());
423
424 let store = store.into_inner();
425 let store_2 = store.clone();
426
427 let barrier = Arc::new(Barrier::new(2));
430 let barrier_2 = barrier.clone();
431
432 let join_handle_2 = tokio::spawn(async move {
435 let mut events = store_2.subscribe_all().await.unwrap().enumerate();
436 barrier_2.wait().await;
437
438 while let Some((i, res)) = events.next().await {
439 assert!(res.is_ok());
440 let event = res.unwrap();
441
442 match i {
443 0 => assert_eq!(
444 Persisted::from(id_1, Event::C)
445 .version(3)
446 .sequence_number(2),
447 event
448 ),
449 1 => {
450 assert_eq!(
451 Persisted::from(id_2, Event::A)
452 .version(1)
453 .sequence_number(3),
454 event
455 );
456 break;
457 }
458 _ => panic!("should not reach this point"),
459 };
460 }
461 });
462
463 let store = RefCell::new(store);
464 barrier.wait().await;
465
466 assert!(store
467 .borrow_mut()
468 .append(id_1, Expected::Exact(2), vec![Event::C])
469 .await
470 .is_ok());
471
472 assert!(store
473 .borrow_mut()
474 .append(id_2, Expected::Exact(0), vec![Event::A])
475 .await
476 .is_ok());
477
478 tokio::join!(join_handle_1, join_handle_2);
480 }
481
482 #[tokio::test]
483 async fn append_with_any_versions_works() {
484 let mut store = InMemoryStore::<&'static str, Event>::default();
485 let id = "test-append";
486
487 let events = vec![Event::A, Event::B, Event::C];
488
489 assert!(store
490 .append(id, Expected::Any, events.clone())
491 .await
492 .is_ok());
493
494 assert!(store
495 .append(id, Expected::Any, events.clone())
496 .await
497 .is_ok());
498 }
499
500 #[tokio::test]
501 async fn append_with_expected_versions_works() {
502 let mut store = InMemoryStore::<&'static str, Event>::default();
503 let id = "test-append";
504
505 let events = vec![Event::A, Event::B, Event::C];
506
507 let result = store.append(id, Expected::Exact(0), events.clone()).await;
508 assert!(result.is_ok());
509
510 let last_version = result.unwrap();
511
512 assert!(store
513 .append(id, Expected::Exact(last_version), events.clone())
514 .await
515 .is_ok());
516 }
517
518 #[tokio::test]
519 async fn append_with_wrong_expected_versions_fails() {
520 let id = "test-append";
521 let mut store = InMemoryStore::<&'static str, Event>::default();
522
523 let events = vec![Event::A, Event::B, Event::C];
524
525 let result = store.append(id, Expected::Exact(0), events.clone()).await;
526 assert!(result.is_ok());
527
528 let last_version = result.unwrap();
529 let poisoned_last_version = last_version + 1; let result = store
532 .append(id, Expected::Exact(poisoned_last_version), events.clone())
533 .await;
534
535 assert_eq!(
536 Err(ConflictError {
537 expected: last_version,
538 actual: poisoned_last_version
539 }),
540 result
541 );
542 }
543
544 #[tokio::test]
545 async fn remove() {
546 let id = "test-remove";
547 let mut store = InMemoryStore::<&'static str, Event>::default();
548
549 assert!(store.remove(id).await.is_ok());
551 assert!(stream_to_vec(&store, id, Select::All)
552 .await
553 .unwrap()
554 .is_empty());
555
556 let events = vec![Event::A, Event::B, Event::C];
558 assert!(store
559 .append(id, Expected::Exact(0), events.clone())
560 .await
561 .is_ok());
562
563 assert!(store.remove(id).await.is_ok());
564 assert!(stream_to_vec(&store, id, Select::All)
565 .await
566 .unwrap()
567 .is_empty());
568 }
569
570 #[tokio::test]
571 async fn stream() {
572 let id = "test-stream";
573 let mut store = InMemoryStore::<&'static str, Event>::default();
574
575 let events_1 = vec![Event::A, Event::B, Event::C];
576 let events_2 = vec![Event::B, Event::A];
577
578 let result = store.append(id, Expected::Exact(0), events_1).await;
579 assert!(result.is_ok());
580
581 let last_version = result.unwrap();
582 assert!(store
583 .append(id, Expected::Exact(last_version), events_2)
584 .await
585 .is_ok());
586
587 assert_eq!(
589 stream_to_vec(&store, id, Select::All).await.unwrap(),
590 vec![
591 Persisted::from(id, Event::A).version(1).sequence_number(0),
592 Persisted::from(id, Event::B).version(2).sequence_number(1),
593 Persisted::from(id, Event::C).version(3).sequence_number(2),
594 Persisted::from(id, Event::B).version(4).sequence_number(3),
595 Persisted::from(id, Event::A).version(5).sequence_number(4)
596 ]
597 );
598
599 assert_eq!(
601 stream_to_vec(&store, id, Select::From(4)).await.unwrap(),
602 vec![
603 Persisted::from(id, Event::B).version(4).sequence_number(3),
604 Persisted::from(id, Event::A).version(5).sequence_number(4)
605 ]
606 );
607
608 assert!(stream_to_vec(&store, id, Select::From(10))
610 .await
611 .unwrap()
612 .is_empty());
613 }
614
615 #[tokio::test]
616 async fn stream_all() {
617 let id_1 = "test-stream-all-1";
618 let id_2 = "test-stream-all-2";
619
620 let mut store = InMemoryStore::<&'static str, Event>::default();
621
622 assert!(store
623 .append(id_1, Expected::Any, vec![Event::A])
624 .await
625 .is_ok());
626
627 assert!(store
628 .append(id_2, Expected::Any, vec![Event::B])
629 .await
630 .is_ok());
631
632 assert!(store
633 .append(id_1, Expected::Any, vec![Event::C])
634 .await
635 .is_ok());
636
637 assert!(store
638 .append(id_2, Expected::Any, vec![Event::A])
639 .await
640 .is_ok());
641
642 let result: anyhow::Result<Vec<Persisted<&str, Event>>> = store
644 .stream_all(Select::All)
645 .await
646 .unwrap()
647 .try_collect()
648 .await
649 .map_err(anyhow::Error::from);
650
651 assert!(result.is_ok());
652 let result = result.unwrap();
653
654 assert_eq!(
655 vec![
656 Persisted::from(id_1, Event::A)
657 .version(1)
658 .sequence_number(0),
659 Persisted::from(id_2, Event::B)
660 .version(1)
661 .sequence_number(1),
662 Persisted::from(id_1, Event::C)
663 .version(2)
664 .sequence_number(2),
665 Persisted::from(id_2, Event::A)
666 .version(2)
667 .sequence_number(3)
668 ],
669 result
670 );
671
672 let result: anyhow::Result<Vec<Persisted<&str, Event>>> = store
674 .stream_all(Select::From(2))
675 .await
676 .unwrap()
677 .try_collect()
678 .await
679 .map_err(anyhow::Error::from);
680
681 assert!(result.is_ok());
682 let result = result.unwrap();
683
684 assert_eq!(
685 vec![
686 Persisted::from(id_1, Event::C)
687 .version(2)
688 .sequence_number(2),
689 Persisted::from(id_2, Event::A)
690 .version(2)
691 .sequence_number(3)
692 ],
693 result
694 );
695 }
696
697 async fn stream_to_vec(
698 store: &InMemoryStore<&'static str, Event>,
699 id: &'static str,
700 select: Select,
701 ) -> anyhow::Result<Vec<Persisted<&'static str, Event>>> {
702 store
703 .stream(id, select)
704 .await
705 .map_err(anyhow::Error::from)?
706 .try_collect()
707 .await
708 .map_err(anyhow::Error::from)
709 }
710}