Skip to main content

eventcore_testing/
chaos.rs

1use std::{future::Future, sync::Mutex};
2
3use eventcore_types::{
4    Event, EventStore, EventStoreError, EventStreamReader, EventStreamSlice, Operation, StreamId,
5    StreamWrites,
6};
7use nutype::nutype;
8use rand::{Rng, SeedableRng, random, rngs::StdRng};
9
10/// Probability of injecting read/write failures for chaos testing.
11///
12/// FailureProbability represents a value in the range [0.0, 1.0] where 0.0 means
13/// never inject failures and 1.0 means always inject failures.
14///
15/// # Examples
16///
17/// ```ignore
18/// use eventcore_testing::chaos::FailureProbability;
19///
20/// let never = FailureProbability::try_new(0.0).expect("0.0 is valid");
21/// let sometimes = FailureProbability::try_new(0.5).expect("0.5 is valid");
22/// let always = FailureProbability::try_new(1.0).expect("1.0 is valid");
23///
24/// // Values outside [0.0, 1.0] are rejected
25/// assert!(FailureProbability::try_new(1.5).is_err());
26/// assert!(FailureProbability::try_new(-0.1).is_err());
27/// ```
28#[nutype(
29    validate(greater_or_equal = 0.0, less_or_equal = 1.0),
30    derive(Debug, Clone, Copy, PartialEq, PartialOrd, Display, Into)
31)]
32pub struct FailureProbability(f32);
33
34/// Probability of injecting version conflicts for chaos testing.
35///
36/// VersionConflictProbability represents a value in the range [0.0, 1.0] where 0.0
37/// means never inject conflicts and 1.0 means always inject conflicts.
38///
39/// # Examples
40///
41/// ```ignore
42/// use eventcore_testing::chaos::VersionConflictProbability;
43///
44/// let never = VersionConflictProbability::try_new(0.0).expect("0.0 is valid");
45/// let sometimes = VersionConflictProbability::try_new(0.5).expect("0.5 is valid");
46/// let always = VersionConflictProbability::try_new(1.0).expect("1.0 is valid");
47///
48/// // Values outside [0.0, 1.0] are rejected
49/// assert!(VersionConflictProbability::try_new(1.5).is_err());
50/// assert!(VersionConflictProbability::try_new(-0.1).is_err());
51/// ```
52#[nutype(
53    validate(greater_or_equal = 0.0, less_or_equal = 1.0),
54    derive(Debug, Clone, Copy, PartialEq, PartialOrd, Display, Into)
55)]
56pub struct VersionConflictProbability(f32);
57
58#[derive(Debug, Clone)]
59pub struct ChaosConfig {
60    deterministic_seed: Option<u64>,
61    failure_probability: FailureProbability,
62    version_conflict_probability: VersionConflictProbability,
63}
64
65impl ChaosConfig {
66    pub fn deterministic() -> Self {
67        Self {
68            deterministic_seed: Some(0),
69            ..Self::default()
70        }
71    }
72
73    pub fn with_failure_probability(mut self, probability: f32) -> Self {
74        self.failure_probability = FailureProbability::try_new(probability.clamp(0.0, 1.0))
75            .expect("clamped value is always valid");
76        self
77    }
78
79    pub fn with_version_conflict_probability(mut self, probability: f32) -> Self {
80        self.version_conflict_probability =
81            VersionConflictProbability::try_new(probability.clamp(0.0, 1.0))
82                .expect("clamped value is always valid");
83        self
84    }
85}
86
87impl Default for ChaosConfig {
88    fn default() -> Self {
89        Self {
90            deterministic_seed: None,
91            failure_probability: FailureProbability::try_new(0.0)
92                .expect("0.0 is valid probability"),
93            version_conflict_probability: VersionConflictProbability::try_new(0.0)
94                .expect("0.0 is valid probability"),
95        }
96    }
97}
98
99pub trait ChaosEventStoreExt: Sized {
100    fn with_chaos(self, config: ChaosConfig) -> ChaosEventStore<Self>;
101}
102
103pub struct ChaosEventStore<S> {
104    store: S,
105    config: ChaosConfig,
106    rng: Mutex<StdRng>,
107}
108
109impl<S> ChaosEventStore<S> {
110    pub fn new(store: S, config: ChaosConfig) -> Self {
111        let rng = match config.deterministic_seed {
112            Some(seed) => StdRng::seed_from_u64(seed),
113            None => StdRng::seed_from_u64(random()),
114        };
115
116        Self {
117            store,
118            config,
119            rng: Mutex::new(rng),
120        }
121    }
122
123    fn should_inject<P: Into<f32>>(&self, probability: P) -> bool {
124        let prob_f32: f32 = probability.into();
125
126        if prob_f32 <= 0.0 {
127            return false;
128        }
129
130        if prob_f32 >= 1.0 {
131            return true;
132        }
133
134        let mut rng = self
135            .rng
136            .lock()
137            .expect("chaos RNG mutex should not be poisoned");
138
139        rng.random_bool(prob_f32 as f64)
140    }
141}
142
143impl<S> EventStore for ChaosEventStore<S>
144where
145    S: EventStore + Sync,
146{
147    fn read_stream<E: Event>(
148        &self,
149        stream_id: StreamId,
150    ) -> impl Future<Output = Result<EventStreamReader<E>, EventStoreError>> + Send {
151        let should_fail = self.should_inject(self.config.failure_probability);
152        let store = &self.store;
153
154        async move {
155            if should_fail {
156                return Err(EventStoreError::StoreFailure {
157                    operation: Operation::ReadStream,
158                });
159            }
160
161            store.read_stream(stream_id).await
162        }
163    }
164
165    fn append_events(
166        &self,
167        writes: StreamWrites,
168    ) -> impl Future<Output = Result<EventStreamSlice, EventStoreError>> + Send {
169        let should_conflict = self.should_inject(self.config.version_conflict_probability);
170        let should_fail = self.should_inject(self.config.failure_probability);
171        let store = &self.store;
172
173        async move {
174            if should_conflict {
175                return Err(EventStoreError::VersionConflict);
176            }
177
178            if should_fail {
179                return Err(EventStoreError::StoreFailure {
180                    operation: Operation::AppendEvents,
181                });
182            }
183
184            store.append_events(writes).await
185        }
186    }
187}
188
189impl<S> ChaosEventStoreExt for S
190where
191    S: EventStore + Sync,
192{
193    fn with_chaos(self, config: ChaosConfig) -> ChaosEventStore<Self> {
194        ChaosEventStore::new(self, config)
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use eventcore::StreamVersion;
202    use eventcore_memory::InMemoryEventStore;
203    use serde::{Deserialize, Serialize};
204
205    #[derive(Debug, Clone, Serialize, Deserialize)]
206    struct PassthroughEvent {
207        stream_id: StreamId,
208    }
209
210    impl Event for PassthroughEvent {
211        fn stream_id(&self) -> &StreamId {
212            &self.stream_id
213        }
214    }
215
216    #[test]
217    fn deterministic_config_sets_seed() {
218        let default_is_none = ChaosConfig::default().deterministic_seed.is_none();
219        let deterministic_is_some = ChaosConfig::deterministic().deterministic_seed.is_some();
220
221        assert!(default_is_none && deterministic_is_some);
222    }
223
224    #[tokio::test]
225    async fn zero_probability_passthrough_allows_normal_operations() {
226        let stream_id = StreamId::try_new("zero-probability-stream").expect("valid stream id");
227        let append_writes = StreamWrites::new()
228            .register_stream(stream_id.clone(), StreamVersion::new(0))
229            .and_then(|writes| {
230                writes.append(PassthroughEvent {
231                    stream_id: stream_id.clone(),
232                })
233            })
234            .expect("writes builder should succeed");
235
236        let base_store = InMemoryEventStore::new();
237        let chaos_store = base_store.with_chaos(ChaosConfig::default());
238        let append_result = chaos_store.append_events(append_writes).await;
239        let read_result = chaos_store.read_stream::<PassthroughEvent>(stream_id).await;
240
241        assert!(append_result.is_ok() && read_result.is_ok());
242    }
243
244    #[test]
245    fn deterministic_half_probability_does_not_inject_immediately() {
246        let chaos_store = ChaosEventStore::new(
247            InMemoryEventStore::new(),
248            ChaosConfig::deterministic().with_failure_probability(0.5),
249        );
250
251        assert!(
252            !chaos_store.should_inject(FailureProbability::try_new(0.5).expect("0.5 is valid"))
253        );
254    }
255}