sourcery_core/
store.rs

1//! Persistence layer abstractions.
2//!
3//! This module describes the storage contract (`EventStore`), wire formats
4//! (`PersistableEvent`, `StoredEvent`), transactions, and a reference
5//! in-memory implementation. Filters and positions live here to keep storage
6//! concerns together.
7use std::{future::Future, marker::PhantomData};
8
9use thiserror::Error;
10
11use crate::{
12    codec::{Codec, SerializableEvent},
13    concurrency::{ConcurrencyConflict, ConcurrencyStrategy, Optimistic, Unchecked},
14};
15
16pub mod inmemory;
17
18/// Raw event data ready to be written to a store backend.
19///
20/// This is the boundary between Repository and `EventStore`. Repository
21/// serializes events to this form, `EventStore` adds position and persistence.
22///
23/// Generic over metadata type `M` to support different metadata structures.
24#[derive(Clone)]
25pub struct PersistableEvent<M> {
26    pub kind: String,
27    pub data: Vec<u8>,
28    pub metadata: M,
29}
30
31/// Event materialized from the store, with position.
32///
33/// Generic parameters:
34/// - `Id`: Aggregate identifier type
35/// - `Pos`: Position type for ordering (`()` for no ordering, `u64` for global
36///   sequence, etc.)
37/// - `M`: Metadata type (defaults to `EventMetadata`)
38#[derive(Clone)]
39pub struct StoredEvent<Id, Pos, M> {
40    pub aggregate_kind: String,
41    pub aggregate_id: Id,
42    pub kind: String,
43    pub position: Pos,
44    pub data: Vec<u8>,
45    pub metadata: M,
46}
47
48/// Convenience alias for event batches loaded from a store.
49pub type LoadEventsResult<Id, Pos, Meta, Err> = Result<Vec<StoredEvent<Id, Pos, Meta>>, Err>;
50
51/// Filter describing which events should be loaded from the store.
52#[derive(Clone, Debug, PartialEq, Eq)]
53pub struct EventFilter<Id, Pos = ()> {
54    pub event_kind: String,
55    pub aggregate_kind: Option<String>,
56    pub aggregate_id: Option<Id>,
57    /// Only load events with position strictly greater than this value.
58    /// Used for snapshot-based loading to skip already-applied events.
59    pub after_position: Option<Pos>,
60}
61
62impl<Id, Pos> EventFilter<Id, Pos> {
63    /// Load all events of the specified kind across every aggregate.
64    #[must_use]
65    pub fn for_event(kind: impl Into<String>) -> Self {
66        Self {
67            event_kind: kind.into(),
68            aggregate_kind: None,
69            aggregate_id: None,
70            after_position: None,
71        }
72    }
73
74    /// Load events of the specified kind for a single aggregate instance.
75    #[must_use]
76    pub fn for_aggregate(
77        event_kind: impl Into<String>,
78        aggregate_kind: impl Into<String>,
79        aggregate_id: impl Into<Id>,
80    ) -> Self {
81        Self {
82            event_kind: event_kind.into(),
83            aggregate_kind: Some(aggregate_kind.into()),
84            aggregate_id: Some(aggregate_id.into()),
85            after_position: None,
86        }
87    }
88
89    /// Only load events with position strictly greater than the given value.
90    ///
91    /// This is used for snapshot-based loading: load a snapshot at position N,
92    /// then load events with `after(N)` to get only the events that occurred
93    /// after the snapshot was taken.
94    #[must_use]
95    pub fn after(mut self, position: Pos) -> Self {
96        self.after_position = Some(position);
97        self
98    }
99}
100
101/// Error from append operations with version checking.
102#[derive(Debug, Error)]
103pub enum AppendError<Pos, StoreError>
104where
105    Pos: std::fmt::Debug,
106    StoreError: std::error::Error,
107{
108    /// Concurrency conflict - another writer modified the stream.
109    #[error(transparent)]
110    Conflict(#[from] ConcurrencyConflict<Pos>),
111    /// Underlying store error.
112    #[error("store error: {0}")]
113    Store(#[source] StoreError),
114}
115
116impl<Pos: std::fmt::Debug, StoreError: std::error::Error> AppendError<Pos, StoreError> {
117    /// Create a store error variant.
118    pub const fn store(err: StoreError) -> Self {
119        Self::Store(err)
120    }
121}
122
123/// Transaction for appending events to an aggregate instance.
124///
125/// Events are accumulated in the transaction and persisted atomically when
126/// `commit()` is called. If the transaction is dropped without calling
127/// `commit()`, the events are silently discarded (rolled back). This allows
128/// errors during event serialization to be handled gracefully.
129///
130/// The `C` type parameter determines the concurrency strategy:
131/// - [`Unchecked`]: No version checking (default)
132/// - [`Optimistic`]: Version checked on commit
133pub struct Transaction<'a, S: EventStore, C: ConcurrencyStrategy = Unchecked> {
134    store: &'a mut S,
135    aggregate_kind: String,
136    aggregate_id: S::Id,
137    expected_version: Option<S::Position>,
138    events: Vec<PersistableEvent<S::Metadata>>,
139    committed: bool,
140    _concurrency: PhantomData<C>,
141}
142
143impl<'a, S: EventStore, C: ConcurrencyStrategy> Transaction<'a, S, C> {
144    pub const fn new(
145        store: &'a mut S,
146        aggregate_kind: String,
147        aggregate_id: S::Id,
148        expected_version: Option<S::Position>,
149    ) -> Self {
150        Self {
151            store,
152            aggregate_kind,
153            aggregate_id,
154            expected_version,
155            events: Vec::new(),
156            committed: false,
157            _concurrency: PhantomData,
158        }
159    }
160
161    /// Append an event to the transaction.
162    ///
163    /// For sum-type events (enums), this serializes each variant to its
164    /// persistable form.
165    ///
166    /// # Errors
167    ///
168    /// Returns a codec error if serialization fails.
169    pub fn append<E>(
170        &mut self,
171        event: E,
172        metadata: S::Metadata,
173    ) -> Result<(), <S::Codec as Codec>::Error>
174    where
175        E: SerializableEvent,
176    {
177        let persistable = event.to_persistable(self.store.codec(), metadata)?;
178        tracing::trace!(event_kind = %persistable.kind, "event appended to transaction");
179        self.events.push(persistable);
180        Ok(())
181    }
182}
183
184impl<S: EventStore> Transaction<'_, S, Unchecked> {
185    /// Commit the transaction without version checking.
186    ///
187    /// Events are persisted atomically. No conflict detection is performed.
188    ///
189    /// # Errors
190    ///
191    /// Returns a store error if persistence fails.
192    pub async fn commit(mut self) -> Result<(), S::Error> {
193        let events = std::mem::take(&mut self.events);
194        let event_count = events.len();
195        tracing::debug!(
196            aggregate_kind = %self.aggregate_kind,
197            event_count,
198            "committing transaction (unchecked)"
199        );
200        self.committed = true;
201        self.store
202            .append(&self.aggregate_kind, &self.aggregate_id, None, events)
203            .await
204            .map_err(|e| match e {
205                AppendError::Store(e) => e,
206                AppendError::Conflict(_) => unreachable!("conflict impossible without version"),
207            })
208    }
209}
210
211impl<S: EventStore> Transaction<'_, S, Optimistic> {
212    /// Commit the transaction with version checking.
213    ///
214    /// The commit will fail with a [`ConcurrencyConflict`] if the stream
215    /// version has changed since the aggregate was loaded.
216    ///
217    /// When `expected_version` is `None`, this means we expect a new aggregate
218    /// (empty stream). The commit will fail with a conflict if the stream
219    /// already has events.
220    ///
221    /// # Errors
222    ///
223    /// Returns [`AppendError::Conflict`] if another writer modified the stream,
224    /// or [`AppendError::Store`] if persistence fails.
225    pub async fn commit(mut self) -> Result<(), AppendError<S::Position, S::Error>> {
226        let events = std::mem::take(&mut self.events);
227        let event_count = events.len();
228        tracing::debug!(
229            aggregate_kind = %self.aggregate_kind,
230            event_count,
231            expected_version = ?self.expected_version,
232            "committing transaction (optimistic)"
233        );
234        self.committed = true;
235
236        match self.expected_version {
237            Some(version) => {
238                // Expected specific version - delegate to store's version checking
239                self.store
240                    .append(
241                        &self.aggregate_kind,
242                        &self.aggregate_id,
243                        Some(version),
244                        events,
245                    )
246                    .await
247            }
248            None => {
249                // Expected new stream - verify stream is actually empty
250                self.store
251                    .append_expecting_new(&self.aggregate_kind, &self.aggregate_id, events)
252                    .await
253            }
254        }
255    }
256}
257
258impl<S: EventStore, C: ConcurrencyStrategy> Drop for Transaction<'_, S, C> {
259    fn drop(&mut self) {
260        if !self.committed && !self.events.is_empty() {
261            tracing::trace!(
262                aggregate_kind = %self.aggregate_kind,
263                expected_version = ?self.expected_version,
264                event_count = self.events.len(),
265                "transaction dropped without commit; discarding buffered events"
266            );
267        }
268    }
269}
270
271/// Abstraction over the persistence layer for event streams.
272///
273/// This trait supports both stream-based and type-partitioned storage
274/// implementations.
275///
276/// Associated types allow stores to customize their behavior:
277/// - `Id`: Aggregate identifier type
278/// - `Position`: Ordering strategy (`()` for stream-based, `u64` for global
279///   ordering)
280/// - `Metadata`: Infrastructure metadata type (timestamps, causation tracking,
281///   etc.)
282/// - `Codec`: Serialization strategy for domain events
283// ANCHOR: event_store_trait
284pub trait EventStore: Send + Sync {
285    /// Aggregate identifier type.
286    ///
287    /// This type must be clonable so repositories can reuse IDs across calls.
288    /// Common choices: `String`, `Uuid`, or custom ID types.
289    type Id: Clone + Send + Sync + 'static;
290
291    /// Position type used for ordering events and version checking.
292    ///
293    /// Must be `Copy + PartialEq` to support optimistic concurrency.
294    /// Use `()` if ordering is not needed.
295    type Position: Copy + PartialEq + std::fmt::Debug + Send + Sync + 'static;
296
297    /// Store-specific error type.
298    type Error: std::error::Error + Send + Sync + 'static;
299
300    /// Serialization codec.
301    type Codec: Codec + Clone + Send + Sync + 'static;
302
303    /// Metadata type for infrastructure concerns.
304    type Metadata: Send + Sync + 'static;
305
306    fn codec(&self) -> &Self::Codec;
307
308    /// Get the current version (latest position) for an aggregate stream.
309    ///
310    /// Returns `None` for streams with no events.
311    ///
312    /// # Errors
313    ///
314    /// Returns a store-specific error when the operation fails.
315    fn stream_version<'a>(
316        &'a self,
317        aggregate_kind: &'a str,
318        aggregate_id: &'a Self::Id,
319    ) -> impl Future<Output = Result<Option<Self::Position>, Self::Error>> + Send + 'a;
320
321    /// Begin a transaction for appending events to an aggregate.
322    ///
323    /// The transaction type is determined by the concurrency strategy `C`.
324    ///
325    /// # Arguments
326    /// * `aggregate_kind` - The aggregate type identifier (`Aggregate::KIND`)
327    /// * `aggregate_id` - The aggregate instance identifier
328    /// * `expected_version` - The version expected for optimistic concurrency
329    fn begin<C: ConcurrencyStrategy>(
330        &mut self,
331        aggregate_kind: &str,
332        aggregate_id: Self::Id,
333        expected_version: Option<Self::Position>,
334    ) -> Transaction<'_, Self, C>
335    where
336        Self: Sized;
337
338    /// Append events with optional version checking.
339    ///
340    /// If `expected_version` is `Some`, the append fails with a concurrency
341    /// conflict if the current stream version doesn't match.
342    /// If `expected_version` is `None`, no version checking is performed.
343    ///
344    /// # Errors
345    ///
346    /// Returns [`AppendError::Conflict`] if the version doesn't match, or
347    /// [`AppendError::Store`] if persistence fails.
348    fn append<'a>(
349        &'a mut self,
350        aggregate_kind: &'a str,
351        aggregate_id: &'a Self::Id,
352        expected_version: Option<Self::Position>,
353        events: Vec<PersistableEvent<Self::Metadata>>,
354    ) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;
355
356    /// Load events matching the specified filters.
357    ///
358    /// Each filter describes an event kind and optional aggregate identity:
359    /// - [`EventFilter::for_event`] loads every event of the given kind
360    /// - [`EventFilter::for_aggregate`] narrows to a single aggregate instance
361    ///
362    /// The store optimizes based on its storage model and returns events
363    /// merged by position (if positions are available).
364    ///
365    /// # Errors
366    ///
367    /// Returns a store-specific error when loading fails.
368    fn load_events<'a>(
369        &'a self,
370        filters: &'a [EventFilter<Self::Id, Self::Position>],
371    ) -> impl Future<
372        Output = LoadEventsResult<Self::Id, Self::Position, Self::Metadata, Self::Error>,
373    > + Send
374    + 'a;
375
376    /// Append events expecting an empty stream.
377    ///
378    /// This method is used by optimistic concurrency when creating new
379    /// aggregates. It fails with a [`ConcurrencyConflict`] if the stream
380    /// already has events.
381    ///
382    /// # Errors
383    ///
384    /// Returns [`AppendError::Conflict`] if the stream is not empty,
385    /// or [`AppendError::Store`] if persistence fails.
386    fn append_expecting_new<'a>(
387        &'a mut self,
388        aggregate_kind: &'a str,
389        aggregate_id: &'a Self::Id,
390        events: Vec<PersistableEvent<Self::Metadata>>,
391    ) -> impl Future<Output = Result<(), AppendError<Self::Position, Self::Error>>> + Send + 'a;
392}
393// ANCHOR_END: event_store_trait
394
395#[derive(Clone, Debug, PartialEq, Eq, Hash)]
396pub(crate) struct StreamKey<Id> {
397    aggregate_kind: String,
398    aggregate_id: Id,
399}
400
401impl<Id> StreamKey<Id> {
402    pub(crate) fn new(aggregate_kind: impl Into<String>, aggregate_id: Id) -> Self {
403        Self {
404            aggregate_kind: aggregate_kind.into(),
405            aggregate_id,
406        }
407    }
408}
409
410/// JSON codec backed by `serde_json`.
411#[derive(Clone, Copy, Debug, Default)]
412pub struct JsonCodec;
413
414impl crate::codec::Codec for JsonCodec {
415    type Error = serde_json::Error;
416
417    fn serialize<T>(&self, value: &T) -> Result<Vec<u8>, Self::Error>
418    where
419        T: serde::Serialize,
420    {
421        serde_json::to_vec(value)
422    }
423
424    fn deserialize<T>(&self, data: &[u8]) -> Result<T, Self::Error>
425    where
426        T: serde::de::DeserializeOwned,
427    {
428        serde_json::from_slice(data)
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435    use crate::store::inmemory;
436
437    #[test]
438    fn event_filter_for_event_is_unrestricted() {
439        let filter: EventFilter<String> = EventFilter::for_event("my-event");
440        assert_eq!(filter.event_kind, "my-event");
441        assert_eq!(filter.aggregate_kind, None);
442        assert_eq!(filter.aggregate_id, None);
443        assert_eq!(filter.after_position, None);
444    }
445
446    #[test]
447    fn event_filter_for_aggregate_is_restricted() {
448        let filter: EventFilter<String> =
449            EventFilter::for_aggregate("my-event", "my-aggregate", "123");
450        assert_eq!(filter.event_kind, "my-event");
451        assert_eq!(filter.aggregate_kind.as_deref(), Some("my-aggregate"));
452        assert_eq!(filter.aggregate_id.as_deref(), Some("123"));
453        assert_eq!(filter.after_position, None);
454    }
455
456    #[test]
457    fn event_filter_after_sets_after_position() {
458        let filter: EventFilter<String, u64> = EventFilter::for_event("e").after(10);
459        assert_eq!(filter.after_position, Some(10));
460    }
461
462    #[test]
463    fn json_codec_roundtrips() {
464        #[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
465        struct ValueAdded {
466            amount: i32,
467        }
468
469        let codec = JsonCodec;
470        let value = ValueAdded { amount: 42 };
471        let bytes = codec.serialize(&value).unwrap();
472        let decoded: ValueAdded = codec.deserialize(&bytes).unwrap();
473        assert_eq!(decoded, value);
474    }
475
476    #[test]
477    fn json_codec_rejects_invalid_json() {
478        #[derive(Debug, PartialEq, Eq, serde::Deserialize)]
479        struct ValueAdded {
480            amount: i32,
481        }
482
483        let codec = JsonCodec;
484        let result: Result<ValueAdded, _> = codec.deserialize(b"not valid json");
485        assert!(result.is_err());
486    }
487
488    #[test]
489    fn json_codec_rejects_wrong_shape() {
490        #[derive(Debug, PartialEq, Eq, serde::Deserialize)]
491        struct ValueAdded {
492            amount: i32,
493        }
494
495        let codec = JsonCodec;
496        let result: Result<ValueAdded, _> = codec.deserialize(br#"{"wrong_field":123}"#);
497        assert!(result.is_err());
498    }
499
500    async fn append_raw_event(
501        store: &mut inmemory::Store<String, JsonCodec, ()>,
502        aggregate_kind: &str,
503        aggregate_id: &str,
504        event_kind: &str,
505        json_bytes: &[u8],
506    ) {
507        store
508            .append(
509                aggregate_kind,
510                &aggregate_id.to_string(),
511                None,
512                vec![PersistableEvent {
513                    kind: event_kind.to_string(),
514                    data: json_bytes.to_vec(),
515                    metadata: (),
516                }],
517            )
518            .await
519            .unwrap();
520    }
521
522    #[tokio::test]
523    async fn in_memory_event_store_appends_and_loads_single_event() {
524        let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
525        let data = br#"{"amount":10}"#;
526
527        append_raw_event(&mut store, "counter", "c1", "value-added", data).await;
528
529        let filters = vec![EventFilter::for_aggregate("value-added", "counter", "c1")];
530        let events = store.load_events(&filters).await.unwrap();
531
532        assert_eq!(events.len(), 1);
533        assert_eq!(events[0].aggregate_kind, "counter");
534        assert_eq!(events[0].aggregate_id, "c1");
535        assert_eq!(events[0].kind, "value-added");
536        assert_eq!(events[0].data, data);
537        assert_eq!(events[0].position, 0);
538    }
539
540    #[tokio::test]
541    async fn in_memory_event_store_loads_multiple_kinds_from_one_stream() {
542        let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
543        append_raw_event(
544            &mut store,
545            "counter",
546            "c1",
547            "value-added",
548            br#"{"amount":10}"#,
549        )
550        .await;
551        append_raw_event(
552            &mut store,
553            "counter",
554            "c1",
555            "value-subtracted",
556            br#"{"amount":5}"#,
557        )
558        .await;
559
560        let filters = vec![
561            EventFilter::for_aggregate("value-added", "counter", "c1"),
562            EventFilter::for_aggregate("value-subtracted", "counter", "c1"),
563        ];
564        let loaded = store.load_events(&filters).await.unwrap();
565
566        assert_eq!(loaded.len(), 2);
567        assert_eq!(loaded[0].kind, "value-added");
568        assert_eq!(loaded[1].kind, "value-subtracted");
569    }
570
571    #[tokio::test]
572    async fn in_memory_event_store_returns_empty_when_no_events_match() {
573        let store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
574        let events = store
575            .load_events(&[EventFilter::for_event("nonexistent")])
576            .await
577            .unwrap();
578        assert!(events.is_empty());
579    }
580
581    #[tokio::test]
582    async fn in_memory_event_store_filters_by_event_kind_and_aggregate_id() {
583        let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
584        append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await;
585        append_raw_event(&mut store, "counter", "c2", "value-added", b"{}").await;
586
587        let filters = vec![EventFilter::for_aggregate("value-added", "counter", "c1")];
588        let events = store.load_events(&filters).await.unwrap();
589
590        assert_eq!(events.len(), 1);
591        assert_eq!(events[0].aggregate_id, "c1");
592    }
593
594    #[tokio::test]
595    async fn in_memory_event_store_orders_events_by_global_position() {
596        let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
597        append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await;
598        append_raw_event(&mut store, "counter", "c2", "value-added", b"{}").await;
599        append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await;
600
601        let events = store
602            .load_events(&[EventFilter::for_event("value-added")])
603            .await
604            .unwrap();
605
606        let positions: Vec<u64> = events.iter().map(|e| e.position).collect();
607        assert_eq!(positions, vec![0, 1, 2]);
608        assert_eq!(events[0].aggregate_id, "c1");
609        assert_eq!(events[1].aggregate_id, "c2");
610        assert_eq!(events[2].aggregate_id, "c1");
611    }
612
613    #[tokio::test]
614    async fn in_memory_event_store_deduplicates_overlapping_filters() {
615        let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
616        append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await;
617
618        let filters = vec![
619            EventFilter::for_aggregate("value-added", "counter", "c1"),
620            EventFilter::for_event("value-added"),
621        ];
622        let events = store.load_events(&filters).await.unwrap();
623        assert_eq!(events.len(), 1);
624    }
625
626    #[tokio::test]
627    async fn in_memory_event_store_applies_after_position_filter() {
628        let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
629        append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await; // pos 0
630        append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await; // pos 1
631        append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await; // pos 2
632
633        let events = store
634            .load_events(&[EventFilter::for_event("value-added").after(1)])
635            .await
636            .unwrap();
637
638        let positions: Vec<u64> = events.iter().map(|e| e.position).collect();
639        assert_eq!(positions, vec![2]);
640    }
641
642    #[tokio::test]
643    async fn in_memory_event_store_stream_version_is_none_for_empty_stream() {
644        let store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
645        let version = store
646            .stream_version("counter", &"nonexistent".to_string())
647            .await
648            .unwrap();
649        assert_eq!(version, None);
650    }
651
652    #[tokio::test]
653    async fn in_memory_event_store_version_checking_detects_conflict() {
654        let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
655        append_raw_event(&mut store, "counter", "c1", "value-added", b"{}").await;
656
657        let ok = store
658            .append(
659                "counter",
660                &"c1".to_string(),
661                Some(0),
662                vec![PersistableEvent {
663                    kind: "value-added".to_string(),
664                    data: b"{}".to_vec(),
665                    metadata: (),
666                }],
667            )
668            .await;
669        assert!(ok.is_ok());
670
671        let conflict = store
672            .append(
673                "counter",
674                &"c1".to_string(),
675                Some(0),
676                vec![PersistableEvent {
677                    kind: "value-added".to_string(),
678                    data: b"{}".to_vec(),
679                    metadata: (),
680                }],
681            )
682            .await;
683        assert!(matches!(conflict, Err(AppendError::Conflict(_))));
684    }
685
686    #[derive(Clone, Debug, serde::Serialize)]
687    struct TestAdded {
688        amount: i32,
689    }
690
691    #[derive(Clone, Debug)]
692    enum TestEvent {
693        Added(TestAdded),
694    }
695
696    impl crate::codec::SerializableEvent for TestEvent {
697        fn to_persistable<C: crate::codec::Codec, M>(
698            self,
699            codec: &C,
700            metadata: M,
701        ) -> Result<PersistableEvent<M>, C::Error> {
702            match self {
703                Self::Added(e) => Ok(PersistableEvent {
704                    kind: "added".to_string(),
705                    data: codec.serialize(&e)?,
706                    metadata,
707                }),
708            }
709        }
710    }
711
712    #[tokio::test]
713    async fn unchecked_transaction_commit_persists_events() {
714        let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
715        let mut tx =
716            store.begin::<crate::concurrency::Unchecked>("counter", "c1".to_string(), None);
717        tx.append(TestEvent::Added(TestAdded { amount: 10 }), ())
718            .unwrap();
719        tx.commit().await.unwrap();
720
721        let events = store
722            .load_events(&[EventFilter::for_aggregate("added", "counter", "c1")])
723            .await
724            .unwrap();
725        assert_eq!(events.len(), 1);
726        assert_eq!(events[0].position, 0);
727        assert_eq!(events[0].kind, "added");
728    }
729
730    #[tokio::test]
731    async fn dropping_transaction_without_commit_discards_buffered_events() {
732        let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
733        {
734            let mut tx =
735                store.begin::<crate::concurrency::Unchecked>("counter", "c1".to_string(), None);
736            tx.append(TestEvent::Added(TestAdded { amount: 10 }), ())
737                .unwrap();
738        }
739
740        let events = store
741            .load_events(&[EventFilter::for_event("added")])
742            .await
743            .unwrap();
744        assert!(events.is_empty());
745    }
746
747    #[tokio::test]
748    async fn optimistic_transaction_detects_stale_expected_version() {
749        let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
750        append_raw_event(&mut store, "counter", "c1", "added", br#"{"amount":10}"#).await;
751
752        let mut tx =
753            store.begin::<crate::concurrency::Optimistic>("counter", "c1".to_string(), Some(999));
754        tx.append(TestEvent::Added(TestAdded { amount: 1 }), ())
755            .unwrap();
756
757        let result = tx.commit().await;
758        assert!(matches!(result, Err(AppendError::Conflict(_))));
759    }
760
761    #[tokio::test]
762    async fn optimistic_transaction_detects_non_new_stream_when_expect_new() {
763        let mut store: inmemory::Store<String, JsonCodec, ()> = inmemory::Store::new(JsonCodec);
764        append_raw_event(&mut store, "counter", "c1", "added", br#"{"amount":10}"#).await;
765
766        let mut tx =
767            store.begin::<crate::concurrency::Optimistic>("counter", "c1".to_string(), None);
768        tx.append(TestEvent::Added(TestAdded { amount: 1 }), ())
769            .unwrap();
770
771        let result = tx.commit().await;
772        assert!(matches!(result, Err(AppendError::Conflict(_))));
773    }
774}