Skip to main content

eventcore_testing/
chaos.rs

1use std::{future::Future, sync::Mutex};
2
3use eventcore_types::{
4    Event, EventStore, EventStoreError, EventStreamReader, EventStreamSlice, Operation, StreamId,
5    StreamVersion, StreamWrites,
6};
7use nutype::nutype;
8use rand::{Rng, SeedableRng, random, rngs::StdRng};
9
10/// Probability of injecting read/write failures for chaos testing.
11///
12/// FailureProbability represents a value in the range [0.0, 1.0] where 0.0 means
13/// never inject failures and 1.0 means always inject failures.
14///
15/// # Examples
16///
17/// ```ignore
18/// use eventcore_testing::chaos::FailureProbability;
19///
20/// let never = FailureProbability::try_new(0.0).expect("0.0 is valid");
21/// let sometimes = FailureProbability::try_new(0.5).expect("0.5 is valid");
22/// let always = FailureProbability::try_new(1.0).expect("1.0 is valid");
23///
24/// // Values outside [0.0, 1.0] are rejected
25/// assert!(FailureProbability::try_new(1.5).is_err());
26/// assert!(FailureProbability::try_new(-0.1).is_err());
27/// ```
28#[nutype(
29    validate(greater_or_equal = 0.0, less_or_equal = 1.0),
30    derive(Debug, Clone, Copy, PartialEq, PartialOrd, Display, Into)
31)]
32pub struct FailureProbability(f32);
33
34/// Probability of injecting version conflicts for chaos testing.
35///
36/// VersionConflictProbability represents a value in the range [0.0, 1.0] where 0.0
37/// means never inject conflicts and 1.0 means always inject conflicts.
38///
39/// # Examples
40///
41/// ```ignore
42/// use eventcore_testing::chaos::VersionConflictProbability;
43///
44/// let never = VersionConflictProbability::try_new(0.0).expect("0.0 is valid");
45/// let sometimes = VersionConflictProbability::try_new(0.5).expect("0.5 is valid");
46/// let always = VersionConflictProbability::try_new(1.0).expect("1.0 is valid");
47///
48/// // Values outside [0.0, 1.0] are rejected
49/// assert!(VersionConflictProbability::try_new(1.5).is_err());
50/// assert!(VersionConflictProbability::try_new(-0.1).is_err());
51/// ```
52#[nutype(
53    validate(greater_or_equal = 0.0, less_or_equal = 1.0),
54    derive(Debug, Clone, Copy, PartialEq, PartialOrd, Display, Into)
55)]
56pub struct VersionConflictProbability(f32);
57
58#[derive(Debug, Clone)]
59pub struct ChaosConfig {
60    deterministic_seed: Option<u64>,
61    failure_probability: FailureProbability,
62    version_conflict_probability: VersionConflictProbability,
63}
64
65impl ChaosConfig {
66    pub fn deterministic() -> Self {
67        Self {
68            deterministic_seed: Some(0),
69            ..Self::default()
70        }
71    }
72
73    pub fn with_failure_probability(mut self, probability: f32) -> Self {
74        self.failure_probability = FailureProbability::try_new(probability.clamp(0.0, 1.0))
75            .expect("clamped value is always valid");
76        self
77    }
78
79    pub fn with_version_conflict_probability(mut self, probability: f32) -> Self {
80        self.version_conflict_probability =
81            VersionConflictProbability::try_new(probability.clamp(0.0, 1.0))
82                .expect("clamped value is always valid");
83        self
84    }
85}
86
87impl Default for ChaosConfig {
88    fn default() -> Self {
89        Self {
90            deterministic_seed: None,
91            failure_probability: FailureProbability::try_new(0.0)
92                .expect("0.0 is valid probability"),
93            version_conflict_probability: VersionConflictProbability::try_new(0.0)
94                .expect("0.0 is valid probability"),
95        }
96    }
97}
98
99pub trait ChaosEventStoreExt: Sized {
100    fn with_chaos(self, config: ChaosConfig) -> ChaosEventStore<Self>;
101}
102
103pub struct ChaosEventStore<S> {
104    store: S,
105    config: ChaosConfig,
106    rng: Mutex<StdRng>,
107}
108
109impl<S> ChaosEventStore<S> {
110    pub fn new(store: S, config: ChaosConfig) -> Self {
111        let rng = match config.deterministic_seed {
112            Some(seed) => StdRng::seed_from_u64(seed),
113            None => StdRng::seed_from_u64(random()),
114        };
115
116        Self {
117            store,
118            config,
119            rng: Mutex::new(rng),
120        }
121    }
122
123    fn should_inject<P: Into<f32>>(&self, probability: P) -> bool {
124        let prob_f32: f32 = probability.into();
125
126        if prob_f32 <= 0.0 {
127            return false;
128        }
129
130        if prob_f32 >= 1.0 {
131            return true;
132        }
133
134        let mut rng = self
135            .rng
136            .lock()
137            .expect("chaos RNG mutex should not be poisoned");
138
139        rng.random_bool(prob_f32 as f64)
140    }
141}
142
143impl<S> EventStore for ChaosEventStore<S>
144where
145    S: EventStore + Sync,
146{
147    fn read_stream<E: Event>(
148        &self,
149        stream_id: StreamId,
150    ) -> impl Future<Output = Result<EventStreamReader<E>, EventStoreError>> + Send {
151        let should_fail = self.should_inject(self.config.failure_probability);
152        let store = &self.store;
153
154        async move {
155            if should_fail {
156                return Err(EventStoreError::StoreFailure {
157                    operation: Operation::ReadStream,
158                });
159            }
160
161            store.read_stream(stream_id).await
162        }
163    }
164
165    fn append_events(
166        &self,
167        writes: StreamWrites,
168    ) -> impl Future<Output = Result<EventStreamSlice, EventStoreError>> + Send {
169        let should_conflict = self.should_inject(self.config.version_conflict_probability);
170        let should_fail = self.should_inject(self.config.failure_probability);
171        let store = &self.store;
172
173        async move {
174            if should_conflict {
175                return Err(EventStoreError::VersionConflict {
176                    stream_id: StreamId::try_new("chaos-conflict").expect("valid"),
177                    expected: StreamVersion::new(0),
178                    actual: StreamVersion::new(1),
179                });
180            }
181
182            if should_fail {
183                return Err(EventStoreError::StoreFailure {
184                    operation: Operation::AppendEvents,
185                });
186            }
187
188            store.append_events(writes).await
189        }
190    }
191}
192
193impl<S> ChaosEventStoreExt for S
194where
195    S: EventStore + Sync,
196{
197    fn with_chaos(self, config: ChaosConfig) -> ChaosEventStore<Self> {
198        ChaosEventStore::new(self, config)
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use eventcore_memory::InMemoryEventStore;
206    use eventcore_types::StreamVersion;
207    use serde::{Deserialize, Serialize};
208
209    #[derive(Debug, Clone, Serialize, Deserialize)]
210    struct PassthroughEvent {
211        stream_id: StreamId,
212    }
213
214    impl Event for PassthroughEvent {
215        fn stream_id(&self) -> &StreamId {
216            &self.stream_id
217        }
218
219        fn event_type_name() -> &'static str {
220            "PassthroughEvent"
221        }
222    }
223
224    #[test]
225    fn deterministic_config_sets_seed() {
226        let default_is_none = ChaosConfig::default().deterministic_seed.is_none();
227        let deterministic_is_some = ChaosConfig::deterministic().deterministic_seed.is_some();
228
229        assert!(default_is_none && deterministic_is_some);
230    }
231
232    #[tokio::test]
233    async fn zero_probability_passthrough_allows_normal_operations() {
234        let stream_id = StreamId::try_new("zero-probability-stream").expect("valid stream id");
235        let append_writes = StreamWrites::new()
236            .register_stream(stream_id.clone(), StreamVersion::new(0))
237            .and_then(|writes| {
238                writes.append(PassthroughEvent {
239                    stream_id: stream_id.clone(),
240                })
241            })
242            .expect("writes builder should succeed");
243
244        let base_store = InMemoryEventStore::new();
245        let chaos_store = base_store.with_chaos(ChaosConfig::default());
246        let append_result = chaos_store.append_events(append_writes).await;
247        let read_result = chaos_store.read_stream::<PassthroughEvent>(stream_id).await;
248
249        assert!(append_result.is_ok() && read_result.is_ok());
250    }
251
252    #[test]
253    fn deterministic_half_probability_does_not_inject_immediately() {
254        let chaos_store = ChaosEventStore::new(
255            InMemoryEventStore::new(),
256            ChaosConfig::deterministic().with_failure_probability(0.5),
257        );
258
259        assert!(
260            !chaos_store.should_inject(FailureProbability::try_new(0.5).expect("0.5 is valid"))
261        );
262    }
263}