Skip to main content

eventcore_testing/
deterministic.rs

1//! Deterministic testing utilities for event stores.
2//!
3//! This module provides wrapper stores that inject predictable failures
4//! for testing retry logic and conflict handling.
5
6use eventcore_types::{
7    Event, EventStore, EventStoreError, EventStream, EventStreamSlice, StreamId, StreamVersion,
8    StreamWrites,
9};
10
11/// A wrapper around an event store that injects a deterministic number
12/// of version conflicts before delegating to the inner store.
13///
14/// This is useful for testing retry logic where you need predictable
15/// conflict behavior rather than the probabilistic chaos testing approach.
16///
17/// # Examples
18///
19/// ```no_run
20/// use eventcore_testing::deterministic::DeterministicConflictStore;
21/// use eventcore_memory::InMemoryEventStore;
22///
23/// // Create a store that will fail with VersionConflict twice before succeeding
24/// let inner = InMemoryEventStore::new();
25/// let store = DeterministicConflictStore::new(inner, 2);
26/// # let _ = store;
27///
28/// // First two append_events calls will return VersionConflict
29/// // Third call will delegate to inner store
30/// ```
31pub struct DeterministicConflictStore<S> {
32    inner: S,
33    remaining_conflicts: std::sync::atomic::AtomicU32,
34}
35
36impl<S> DeterministicConflictStore<S> {
37    /// Creates a new `DeterministicConflictStore` that will inject `conflict_count`
38    /// version conflicts before delegating to the inner store.
39    ///
40    /// # Arguments
41    ///
42    /// * `store` - The inner event store to delegate to after conflicts are exhausted
43    /// * `conflict_count` - Number of conflicts to inject before delegation
44    pub fn new(store: S, conflict_count: u32) -> Self {
45        Self {
46            inner: store,
47            remaining_conflicts: std::sync::atomic::AtomicU32::new(conflict_count),
48        }
49    }
50}
51
52impl<S> EventStore for DeterministicConflictStore<S>
53where
54    S: EventStore + Sync,
55{
56    async fn read_stream<E: Event>(
57        &self,
58        stream_id: StreamId,
59    ) -> Result<EventStream<E>, EventStoreError> {
60        self.inner.read_stream(stream_id).await
61    }
62
63    async fn append_events(
64        &self,
65        writes: StreamWrites,
66    ) -> Result<EventStreamSlice, EventStoreError> {
67        let remaining = self.remaining_conflicts.fetch_update(
68            std::sync::atomic::Ordering::SeqCst,
69            std::sync::atomic::Ordering::SeqCst,
70            |n| if n > 0 { Some(n - 1) } else { None },
71        );
72
73        if remaining.is_ok() {
74            return Err(EventStoreError::VersionConflict {
75                stream_id: StreamId::try_new("deterministic-conflict").expect("valid"),
76                expected: StreamVersion::new(0),
77                actual: StreamVersion::new(1),
78            });
79        }
80
81        self.inner.append_events(writes).await
82    }
83}