Skip to main content

eventcore_testing/
deterministic.rs

1//! Deterministic testing utilities for event stores.
2//!
3//! This module provides wrapper stores that inject predictable failures
4//! for testing retry logic and conflict handling.
5
6use eventcore_types::{
7    Event, EventStore, EventStoreError, EventStreamReader, EventStreamSlice, StreamId, StreamWrites,
8};
9
10/// A wrapper around an event store that injects a deterministic number
11/// of version conflicts before delegating to the inner store.
12///
13/// This is useful for testing retry logic where you need predictable
14/// conflict behavior rather than the probabilistic chaos testing approach.
15///
16/// # Examples
17///
18/// ```ignore
19/// use eventcore_testing::deterministic::DeterministicConflictStore;
20/// use eventcore_memory::InMemoryEventStore;
21///
22/// // Create a store that will fail with VersionConflict twice before succeeding
23/// let inner = InMemoryEventStore::new();
24/// let store = DeterministicConflictStore::new(inner, 2);
25///
26/// // First two append_events calls will return VersionConflict
27/// // Third call will delegate to inner store
28/// ```
29pub struct DeterministicConflictStore<S> {
30    inner: S,
31    remaining_conflicts: std::sync::atomic::AtomicU32,
32}
33
34impl<S> DeterministicConflictStore<S> {
35    /// Creates a new `DeterministicConflictStore` that will inject `conflict_count`
36    /// version conflicts before delegating to the inner store.
37    ///
38    /// # Arguments
39    ///
40    /// * `store` - The inner event store to delegate to after conflicts are exhausted
41    /// * `conflict_count` - Number of conflicts to inject before delegation
42    pub fn new(store: S, conflict_count: u32) -> Self {
43        Self {
44            inner: store,
45            remaining_conflicts: std::sync::atomic::AtomicU32::new(conflict_count),
46        }
47    }
48}
49
50impl<S> EventStore for DeterministicConflictStore<S>
51where
52    S: EventStore + Sync,
53{
54    async fn read_stream<E: Event>(
55        &self,
56        stream_id: StreamId,
57    ) -> Result<EventStreamReader<E>, EventStoreError> {
58        self.inner.read_stream(stream_id).await
59    }
60
61    async fn append_events(
62        &self,
63        writes: StreamWrites,
64    ) -> Result<EventStreamSlice, EventStoreError> {
65        let remaining = self.remaining_conflicts.fetch_update(
66            std::sync::atomic::Ordering::SeqCst,
67            std::sync::atomic::Ordering::SeqCst,
68            |n| if n > 0 { Some(n - 1) } else { None },
69        );
70
71        if remaining.is_ok() {
72            return Err(EventStoreError::VersionConflict);
73        }
74
75        self.inner.append_events(writes).await
76    }
77}