1use std::{future::Future, marker::PhantomData};
8
9use thiserror::Error;
10
11use crate::{
12 codec::{Codec, SerializableEvent},
13 concurrency::{ConcurrencyConflict, ConcurrencyStrategy, Optimistic, Unchecked},
14};
15
16pub mod inmemory;
17
18#[derive(Clone)]
25pub struct PersistableEvent<M> {
26 pub kind: String,
27 pub data: Vec<u8>,
28 pub metadata: M,
29}
30
31#[derive(Clone)]
39pub struct StoredEvent<Id, Pos, M> {
40 pub aggregate_kind: String,
41 pub aggregate_id: Id,
42 pub kind: String,
43 pub position: Pos,
44 pub data: Vec<u8>,
45 pub metadata: M,
46}
47
48pub type LoadEventsResult<Id, Pos, Meta, Err> = Result<Vec<StoredEvent<Id, Pos, Meta>>, Err>;
50
51#[derive(Clone, Debug, PartialEq, Eq)]
53pub struct EventFilter<Id, Pos = ()> {
54 pub event_kind: String,
55 pub aggregate_kind: Option<String>,
56 pub aggregate_id: Option<Id>,
57 pub after_position: Option<Pos>,
60}
61
62impl<Id, Pos> EventFilter<Id, Pos> {
63 #[must_use]
65 pub fn for_event(kind: impl Into<String>) -> Self {
66 Self {
67 event_kind: kind.into(),
68 aggregate_kind: None,
69 aggregate_id: None,
70 after_position: None,
71 }
72 }
73
74 #[must_use]
76 pub fn for_aggregate(
77 event_kind: impl Into<String>,
78 aggregate_kind: impl Into<String>,
79 aggregate_id: impl Into<Id>,
80 ) -> Self {
81 Self {
82 event_kind: event_kind.into(),
83 aggregate_kind: Some(aggregate_kind.into()),
84 aggregate_id: Some(aggregate_id.into()),
85 after_position: None,
86 }
87 }
88
89 #[must_use]
95 pub fn after(mut self, position: Pos) -> Self {
96 self.after_position = Some(position);
97 self
98 }
99}
100
101#[derive(Debug, Error)]
103pub enum AppendError<Pos, StoreError>
104where
105 Pos: std::fmt::Debug,
106 StoreError: std::error::Error,
107{
108 #[error(transparent)]
110 Conflict(#[from] ConcurrencyConflict<Pos>),
111 #[error("store error: {0}")]
113 Store(#[source] StoreError),
114}
115
116impl<Pos: std::fmt::Debug, StoreError: std::error::Error> AppendError<Pos, StoreError> {
117 pub const fn store(err: StoreError) -> Self {
119 Self::Store(err)
120 }
121}
122
123pub struct Transaction<'a, S: EventStore, C: ConcurrencyStrategy = Unchecked> {
134 store: &'a mut S,
135 aggregate_kind: String,
136 aggregate_id: S::Id,
137 expected_version: Option<S::Position>,
138 events: Vec<PersistableEvent<S::Metadata>>,
139 committed: bool,
140 _concurrency: PhantomData<C>,
141}
142
143impl<'a, S: EventStore, C: ConcurrencyStrategy> Transaction<'a, S, C> {
144 pub const fn new(
145 store: &'a mut S,
146 aggregate_kind: String,
147 aggregate_id: S::Id,
148 expected_version: Option<S::Position>,
149 ) -> Self {
150 Self {
151 store,
152 aggregate_kind,
153 aggregate_id,
154 expected_version,
155 events: Vec::new(),
156 committed: false,
157 _concurrency: PhantomData,
158 }
159 }
160
161 pub fn append<E>(
170 &mut self,
171 event: E,
172 metadata: S::Metadata,
173 ) -> Result<(), <S::Codec as Codec>::Error>
174 where
175 E: SerializableEvent,
176 {
177 let persistable = event.to_persistable(self.store.codec(), metadata)?;
178 tracing::trace!(event_kind = %persistable.kind, "event appended to transaction");
179 self.events.push(persistable);
180 Ok(())
181 }
182}
183
184impl<S: EventStore> Transaction<'_, S, Unchecked> {
185 pub async fn commit(mut self) -> Result<(), S::Error> {
193 let events = std::mem::take(&mut self.events);
194 let event_count = events.len();
195 tracing::debug!(
196 aggregate_kind = %self.aggregate_kind,
197 event_count,
198 "committing transaction (unchecked)"
199 );
200 self.committed = true;
201 self.store
202 .append(&self.aggregate_kind, &self.aggregate_id, None, events)
203 .await
204 .map_err(|e| match e {
205 AppendError::Store(e) => e,
206 AppendError::Conflict(_) => unreachable!("conflict impossible without version"),
207 })
208 }
209}
210
211impl<S: EventStore> Transaction<'_, S, Optimistic> {
212 pub async fn commit(mut self) -> Result<(), AppendError<S::Position, S::Error>> {
226 let events = std::mem::take(&mut self.events);
227 let event_count = events.len();
228 tracing::debug!(
229 aggregate_kind = %self.aggregate_kind,
230 event_count,
231 expected_version = ?self.expected_version,
232 "committing transaction (optimistic)"
233 );
234 self.committed = true;
235
236 match self.expected_version {
237 Some(version) => {
238 self.store
240 .append(
241 &self.aggregate_kind,
242 &self.aggregate_id,
243 Some(version),
244 events,
245 )
246 .await
247 }
248 None => {
249 self.store
251 .append_expecting_new(&self.aggregate_kind, &self.aggregate_id, events)
252 .await
253 }
254 }
255 }
256}
257
258impl<S: EventStore, C: ConcurrencyStrategy> Drop for Transaction<'_, S, C> {
259 fn drop(&mut self) {
260 if !self.committed && !self.events.is_empty() {
261 tracing::trace!(
262 aggregate_kind = %self.aggregate_kind,
263 expected_version = ?self.expected_version,
264 event_count = self.events.len(),
265 "transaction dropped without commit; discarding buffered events"
266 );
267 }
268 }
269}
270
271pub trait EventStore: Send + Sync {
285 type Id: Clone + Send + Sync + 'static;
290
291 type Position: Copy + PartialEq + std::fmt::Debug + Send + Sync + 'static;
296
297 type Error: std::error::Error + Send + Sync + 'static;
299
300 type Codec: Codec + Clone + Send + Sync + 'static;
302
303 type Metadata: Send + Sync + 'static;
305
306 fn codec(&self) -> &Self::Codec;
307
308 fn stream_version<'a>(
316 &'a self,
317 aggregate_kind: &'a str,
318 aggregate_id: &'a Self::Id,
319 ) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a;
320
321 fn begin<C: ConcurrencyStrategy>(
330 &mut self,
331 aggregate_kind: &str,
332 aggregate_id: Self::Id,
333 expected_version: Option<Self::Position>,
334 ) -> Transaction<'_, Self, C>
335 where
336 Self: Sized;
337
338 fn append<'a>(
349 &'a mut self,
350 aggregate_kind: &'a str,
351 aggregate_id: &'a Self::Id,
352 expected_version: Option<Self::Position>,
353 events: Vec<PersistableEvent<Self::Metadata>>,
354 ) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;
355
356 fn load_events<'a>(
369 &'a self,
370 filters: &'a [EventFilter<Self::Id, Self::Position>],
371 ) -> impl Future<
372 Output = LoadEventsResult<Self::Id, Self::Position, Self::Metadata, Self::Error>,
373 > + Send
374 + 'a;
375
376 fn append_expecting_new<'a>(
387 &'a mut self,
388 aggregate_kind: &'a str,
389 aggregate_id: &'a Self::Id,
390 events: Vec<PersistableEvent<Self::Metadata>>,
391 ) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;
392}
393#[derive(Clone, Debug, PartialEq, Eq, Hash)]
396pub(crate) struct StreamKey<Id> {
397 aggregate_kind: String,
398 aggregate_id: Id,
399}
400
401impl<Id> StreamKey<Id> {
402 pub(crate) fn new(aggregate_kind: impl Into<String>, aggregate_id: Id) -> Self {
403 Self {
404 aggregate_kind: aggregate_kind.into(),
405 aggregate_id,
406 }
407 }
408}
409
410#[derive(Clone, Copy, Debug, Default)]
412pub struct JsonCodec;
413
414impl crate::codec::Codec for JsonCodec {
415 type Error = serde_json::Error;
416
417 fn serialize<T>(&self, value: &T) -> Result<Vec<u8>, Self::Error>
418 where
419 T: serde::Serialize,
420 {
421 serde_json::to_vec(value)
422 }
423
424 fn deserialize<T>(&self, data: &[u8]) -> Result<T, Self::Error>
425 where
426 T: serde::de::DeserializeOwned,
427 {
428 serde_json::from_slice(data)
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435 use crate::store::inmemory;
436
437 #[test]
438 fn event_filter_for_event_is_unrestricted() {
439 let filter: EventFilter<String> = EventFilter::for_event("my-event");
440 assert_eq!(filter.event_kind, "my-event");
441 assert_eq!(filter.aggregate_kind, None);
442 assert_eq!(filter.aggregate_id, None);
443 assert_eq!(filter.after_position, None);
444 }
445
446 #[test]
447 fn event_filter_for_aggregate_is_restricted() {
448 let filter: EventFilter<String> =
449 EventFilter::for_aggregate("my-event", "my-aggregate", "123");
450 assert_eq!(filter.event_kind, "my-event");
451 assert_eq!(filter.aggregate_kind.as_deref(), Some("my-aggregate"));
452 assert_eq!(filter.aggregate_id.as_deref(), Some("123"));
453 assert_eq!(filter.after_position, None);
454 }
455
456 #[test]
457 fn event_filter_after_sets_after_position() {
458 let filter: EventFilter<String, u64> = EventFilter::for_event("e").after(10);
459 assert_eq!(filter.after_position, Some(10));
460 }
461
462 #[test]
463 fn json_codec_roundtrips() {
464 #[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
465 struct ValueAdded {
466 amount: i32,
467 }
468
469 let codec = JsonCodec;
470 let value = ValueAdded { amount: 42 };
471 let bytes = codec.serialize(&value).unwrap();
472 let decoded: ValueAdded = codec.deserialize(&bytes).unwrap();
473 assert_eq!(decoded, value);
474 }
475
476 #[test]
477 fn json_codec_rejects_invalid_json() {
478 #[derive(Debug, PartialEq, Eq, serde::Deserialize)]
479 struct ValueAdded {
480 amount: i32,
481 }
482
483 let codec = JsonCodec;
484 let result: Result<ValueAdded, _> = codec.deserialize(b"not valid json");
485 assert!(result.is_err());
486 }
487
488 #[test]
489 fn json_codec_rejects_wrong_shape() {
490 #[derive(Debug, PartialEq, Eq, serde::Deserialize)]
491 struct ValueAdded {
492 amount: i32,
493 }
494
495 let codec = JsonCodec;
496 let result: Result<ValueAdded, _> = codec.deserialize(br#"{"wrong_field":123}"#);
497 assert!(result.is_err());
498 }
499
500 async fn append_raw_event(
501 store: &mut inmemory::Store<String, JsonCodec, ()>,
502 aggregate_kind: &str,
503 aggregate_id: &str,
504 event_kind: &str,
505 json_bytes: &[u8],
506 ) {
507 store
508 .append(
509 aggregate_kind,
510 &aggregate_id.to_string(),
511 None,
512 vec![PersistableEvent {
513 kind: event_kind.to_string(),
514 data: json_bytes.to_vec(),
515 metadata: (),
516 }],
517 )
518 .await
519 .unwrap();
520 }
521
522 #[tokio::test]
523 async fn in_memory_event_store_appends_and_loads_single_event() {
524 let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
525 let data = br#"{"amount":10}"#;
526
527 append_raw_event(&mut store, "counter", "c1", "value-added", data).await;
528
529 let filters = vec![EventFilter::for_aggregate("value-added", "counter", "c1")];
530 let events = store.load_events(&filters).await.unwrap();
531
532 assert_eq!(events.len(), 1);
533 assert_eq!(events[0].aggregate_kind, "counter");
534 assert_eq!(events[0].aggregate_id, "c1");
535 assert_eq!(events[0].kind, "value-added");
536 assert_eq!(events[0].data, data);
537 assert_eq!(events[0].position, 0);
538 }
539
540 #[tokio::test]
541 async fn in_memory_event_store_loads_multiple_kinds_from_one_stream() {
542 let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
543 append_raw_event(
544 &mut store,
545 "counter",
546 "c1",
547 "value-added",
548 br#"{"amount":10}"#,
549 )
550 .await;
551 append_raw_event(
552 &mut store,
553 "counter",
554 "c1",
555 "value-subtracted",
556 br#"{"amount":5}"#,
557 )
558 .await;
559
560 let filters = vec![
561 EventFilter::for_aggregate("value-added", "counter", "c1"),
562 EventFilter::for_aggregate("value-subtracted", "counter", "c1"),
563 ];
564 let loaded = store.load_events(&filters).await.unwrap();
565
566 assert_eq!(loaded.len(), 2);
567 assert_eq!(loaded[0].kind, "value-added");
568 assert_eq!(loaded[1].kind, "value-subtracted");
569 }
570
571 #[tokio::test]
572 async fn in_memory_event_store_returns_empty_when_no_events_match() {
573 let store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
574 let events = store
575 .load_events(&[EventFilter::for_event("nonexistent")])
576 .await
577 .unwrap();
578 assert!(events.is_empty());
579 }
580
581 #[tokio::test]
582 async fn in_memory_event_store_filters_by_event_kind_and_aggregate_id() {
583 let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
584 append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await;
585 append_raw_event(&mut store, "counter", "c2", "value-added", b"{}").await;
586
587 let filters = vec![EventFilter::for_aggregate("value-added", "counter", "c1")];
588 let events = store.load_events(&filters).await.unwrap();
589
590 assert_eq!(events.len(), 1);
591 assert_eq!(events[0].aggregate_id, "c1");
592 }
593
594 #[tokio::test]
595 async fn in_memory_event_store_orders_events_by_global_position() {
596 let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
597 append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await;
598 append_raw_event(&mut store, "counter", "c2", "value-added", b"{}").await;
599 append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await;
600
601 let events = store
602 .load_events(&[EventFilter::for_event("value-added")])
603 .await
604 .unwrap();
605
606 let positions: Vec<u64> = events.iter().map(|e| e.position).collect();
607 assert_eq!(positions, vec![0, 1, 2]);
608 assert_eq!(events[0].aggregate_id, "c1");
609 assert_eq!(events[1].aggregate_id, "c2");
610 assert_eq!(events[2].aggregate_id, "c1");
611 }
612
613 #[tokio::test]
614 async fn in_memory_event_store_deduplicates_overlapping_filters() {
615 let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
616 append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await;
617
618 let filters = vec![
619 EventFilter::for_aggregate("value-added", "counter", "c1"),
620 EventFilter::for_event("value-added"),
621 ];
622 let events = store.load_events(&filters).await.unwrap();
623 assert_eq!(events.len(), 1);
624 }
625
626 #[tokio::test]
627 async fn in_memory_event_store_applies_after_position_filter() {
628 let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
629 append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await; append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await; append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await; let events = store
634 .load_events(&[EventFilter::for_event("value-added").after(1)])
635 .await
636 .unwrap();
637
638 let positions: Vec<u64> = events.iter().map(|e| e.position).collect();
639 assert_eq!(positions, vec![2]);
640 }
641
642 #[tokio::test]
643 async fn in_memory_event_store_stream_version_is_none_for_empty_stream() {
644 let store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
645 let version = store
646 .stream_version("counter", &"nonexistent".to_string())
647 .await
648 .unwrap();
649 assert_eq!(version, None);
650 }
651
652 #[tokio::test]
653 async fn in_memory_event_store_version_checking_detects_conflict() {
654 let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
655 append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await;
656
657 let ok = store
658 .append(
659 "counter",
660 &"c1".to_string(),
661 Some(0),
662 vec![PersistableEvent {
663 kind: "value-added".to_string(),
664 data: b"{}".to_vec(),
665 metadata: (),
666 }],
667 )
668 .await;
669 assert!(ok.is_ok());
670
671 let conflict = store
672 .append(
673 "counter",
674 &"c1".to_string(),
675 Some(0),
676 vec![PersistableEvent {
677 kind: "value-added".to_string(),
678 data: b"{}".to_vec(),
679 metadata: (),
680 }],
681 )
682 .await;
683 assert!(matches!(conflict, Err(AppendError::Conflict(_))));
684 }
685
686 #[derive(Clone, Debug, serde::Serialize)]
687 struct TestAdded {
688 amount: i32,
689 }
690
691 #[derive(Clone, Debug)]
692 enum TestEvent {
693 Added(TestAdded),
694 }
695
696 impl crate::codec::SerializableEvent for TestEvent {
697 fn to_persistable<C: crate::codec::Codec, M>(
698 self,
699 codec: &C,
700 metadata: M,
701 ) -> Result<PersistableEvent<M>, C::Error> {
702 match self {
703 Self::Added(e) => Ok(PersistableEvent {
704 kind: "added".to_string(),
705 data: codec.serialize(&e)?,
706 metadata,
707 }),
708 }
709 }
710 }
711
712 #[tokio::test]
713 async fn unchecked_transaction_commit_persists_events() {
714 let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
715 let mut tx =
716 store.begin::<crate::concurrency::Unchecked>("counter", "c1".to_string(), None);
717 tx.append(TestEvent::Added(TestAdded { amount: 10 }), ())
718 .unwrap();
719 tx.commit().await.unwrap();
720
721 let events = store
722 .load_events(&[EventFilter::for_aggregate("added", "counter", "c1")])
723 .await
724 .unwrap();
725 assert_eq!(events.len(), 1);
726 assert_eq!(events[0].position, 0);
727 assert_eq!(events[0].kind, "added");
728 }
729
730 #[tokio::test]
731 async fn dropping_transaction_without_commit_discards_buffered_events() {
732 let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
733 {
734 let mut tx =
735 store.begin::<crate::concurrency::Unchecked>("counter", "c1".to_string(), None);
736 tx.append(TestEvent::Added(TestAdded { amount: 10 }), ())
737 .unwrap();
738 }
739
740 let events = store
741 .load_events(&[EventFilter::for_event("added")])
742 .await
743 .unwrap();
744 assert!(events.is_empty());
745 }
746
747 #[tokio::test]
748 async fn optimistic_transaction_detects_stale_expected_version() {
749 let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
750 append_raw_event(&mut store, "counter", "c1", "added", br#"{"amount":10}"#).await;
751
752 let mut tx =
753 store.begin::<crate::concurrency::Optimistic>("counter", "c1".to_string(), Some(999));
754 tx.append(TestEvent::Added(TestAdded { amount: 1 }), ())
755 .unwrap();
756
757 let result = tx.commit().await;
758 assert!(matches!(result, Err(AppendError::Conflict(_))));
759 }
760
761 #[tokio::test]
762 async fn optimistic_transaction_detects_non_new_stream_when_expect_new() {
763 let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
764 append_raw_event(&mut store, "counter", "c1", "added", br#"{"amount":10}"#).await;
765
766 let mut tx =
767 store.begin::<crate::concurrency::Optimistic>("counter", "c1".to_string(), None);
768 tx.append(TestEvent::Added(TestAdded { amount: 1 }), ())
769 .unwrap();
770
771 let result = tx.commit().await;
772 assert!(matches!(result, Err(AppendError::Conflict(_))));
773 }
774}