eventcore_testing/deterministic.rs
1//! Deterministic testing utilities for event stores.
2//!
3//! This module provides wrapper stores that inject predictable failures
4//! for testing retry logic and conflict handling.
5
6use eventcore_types::{
7 Event, EventStore, EventStoreError, EventStreamReader, EventStreamSlice, StreamId,
8 StreamVersion, StreamWrites,
9};
10
11/// A wrapper around an event store that injects a deterministic number
12/// of version conflicts before delegating to the inner store.
13///
14/// This is useful for testing retry logic where you need predictable
15/// conflict behavior rather than the probabilistic chaos testing approach.
16///
17/// # Examples
18///
19/// ```ignore
20/// use eventcore_testing::deterministic::DeterministicConflictStore;
21/// use eventcore_memory::InMemoryEventStore;
22///
23/// // Create a store that will fail with VersionConflict twice before succeeding
24/// let inner = InMemoryEventStore::new();
25/// let store = DeterministicConflictStore::new(inner, 2);
26///
27/// // First two append_events calls will return VersionConflict
28/// // Third call will delegate to inner store
29/// ```
30pub struct DeterministicConflictStore<S> {
31 inner: S,
32 remaining_conflicts: std::sync::atomic::AtomicU32,
33}
34
35impl<S> DeterministicConflictStore<S> {
36 /// Creates a new `DeterministicConflictStore` that will inject `conflict_count`
37 /// version conflicts before delegating to the inner store.
38 ///
39 /// # Arguments
40 ///
41 /// * `store` - The inner event store to delegate to after conflicts are exhausted
42 /// * `conflict_count` - Number of conflicts to inject before delegation
43 pub fn new(store: S, conflict_count: u32) -> Self {
44 Self {
45 inner: store,
46 remaining_conflicts: std::sync::atomic::AtomicU32::new(conflict_count),
47 }
48 }
49}
50
51impl<S> EventStore for DeterministicConflictStore<S>
52where
53 S: EventStore + Sync,
54{
55 async fn read_stream<E: Event>(
56 &self,
57 stream_id: StreamId,
58 ) -> Result<EventStreamReader<E>, EventStoreError> {
59 self.inner.read_stream(stream_id).await
60 }
61
62 async fn append_events(
63 &self,
64 writes: StreamWrites,
65 ) -> Result<EventStreamSlice, EventStoreError> {
66 let remaining = self.remaining_conflicts.fetch_update(
67 std::sync::atomic::Ordering::SeqCst,
68 std::sync::atomic::Ordering::SeqCst,
69 |n| if n > 0 { Some(n - 1) } else { None },
70 );
71
72 if remaining.is_ok() {
73 return Err(EventStoreError::VersionConflict {
74 stream_id: StreamId::try_new("deterministic-conflict").expect("valid"),
75 expected: StreamVersion::new(0),
76 actual: StreamVersion::new(1),
77 });
78 }
79
80 self.inner.append_events(writes).await
81 }
82}