Skip to main content

eventcore_testing/
chaos.rs

1//! Chaos testing for EventCore backends.
2//!
3//! This module provides [`ChaosEventStore`], a wrapper around any `EventStore`
4//! that injects probabilistic read/write failures and version conflicts. Use it
5//! to verify that command retry logic handles transient errors correctly.
6//!
7//! The canonical entry point is the [`ChaosEventStoreExt::with_chaos`] extension
8//! method, which is implemented for every `EventStore`:
9//!
10//! ```no_run
11//! use eventcore_memory::InMemoryEventStore;
12//! use eventcore_testing::chaos::{ChaosConfig, ChaosEventStoreExt};
13//!
14//! // Deterministic seed so failures are reproducible across runs.
15//! let base_store = InMemoryEventStore::new();
16//! let chaos_store =
17//!     base_store.with_chaos(ChaosConfig::deterministic().with_failure_probability(0.5));
18//!
19//! // `chaos_store` is itself an `EventStore`, so it can be passed to
20//! // `eventcore::execute()` like any other backend.
21//! # let _ = chaos_store;
22//! ```
23
24use std::{future::Future, sync::Mutex};
25
26use eventcore_types::{
27    Event, EventStore, EventStoreError, EventStream, EventStreamSlice, Operation, StreamId,
28    StreamVersion, StreamWrites,
29};
30use nutype::nutype;
31use rand::{RngExt, SeedableRng, random, rngs::StdRng};
32
33/// Probability of injecting read/write failures for chaos testing.
34///
35/// FailureProbability represents a value in the range [0.0, 1.0] where 0.0 means
36/// never inject failures and 1.0 means always inject failures.
37///
38/// # Examples
39///
40/// ```no_run
41/// use eventcore_testing::chaos::FailureProbability;
42///
43/// let never = FailureProbability::try_new(0.0).expect("0.0 is valid");
44/// let sometimes = FailureProbability::try_new(0.5).expect("0.5 is valid");
45/// let always = FailureProbability::try_new(1.0).expect("1.0 is valid");
46/// # let _ = (never, sometimes, always);
47///
48/// // Values outside [0.0, 1.0] are rejected
49/// assert!(FailureProbability::try_new(1.5).is_err());
50/// assert!(FailureProbability::try_new(-0.1).is_err());
51/// ```
52#[nutype(
53    validate(greater_or_equal = 0.0, less_or_equal = 1.0),
54    derive(Debug, Clone, Copy, PartialEq, PartialOrd, Display, Into)
55)]
56pub struct FailureProbability(f32);
57
58/// Probability of injecting version conflicts for chaos testing.
59///
60/// VersionConflictProbability represents a value in the range [0.0, 1.0] where 0.0
61/// means never inject conflicts and 1.0 means always inject conflicts.
62///
63/// # Examples
64///
65/// ```no_run
66/// use eventcore_testing::chaos::VersionConflictProbability;
67///
68/// let never = VersionConflictProbability::try_new(0.0).expect("0.0 is valid");
69/// let sometimes = VersionConflictProbability::try_new(0.5).expect("0.5 is valid");
70/// let always = VersionConflictProbability::try_new(1.0).expect("1.0 is valid");
71/// # let _ = (never, sometimes, always);
72///
73/// // Values outside [0.0, 1.0] are rejected
74/// assert!(VersionConflictProbability::try_new(1.5).is_err());
75/// assert!(VersionConflictProbability::try_new(-0.1).is_err());
76/// ```
77#[nutype(
78    validate(greater_or_equal = 0.0, less_or_equal = 1.0),
79    derive(Debug, Clone, Copy, PartialEq, PartialOrd, Display, Into)
80)]
81pub struct VersionConflictProbability(f32);
82
83/// Configuration controlling how a [`ChaosEventStore`] injects failures.
84///
85/// A `ChaosConfig` holds an optional deterministic seed plus the probabilities
86/// of injecting store failures and version conflicts. The probabilities default
87/// to `0.0` (no injection); use the builder methods to raise them.
88///
89/// Start from either [`ChaosConfig::default`] (random seed, no injection) or
90/// [`ChaosConfig::deterministic`] (fixed seed for reproducible runs), then chain
91/// [`with_failure_probability`](ChaosConfig::with_failure_probability) and
92/// [`with_version_conflict_probability`](ChaosConfig::with_version_conflict_probability).
93///
94/// ```no_run
95/// use eventcore_testing::chaos::ChaosConfig;
96///
97/// let config = ChaosConfig::deterministic()
98///     .with_failure_probability(0.25)
99///     .with_version_conflict_probability(0.1);
100/// # let _ = config;
101/// ```
102#[derive(Debug, Clone)]
103pub struct ChaosConfig {
104    deterministic_seed: Option<u64>,
105    failure_probability: FailureProbability,
106    version_conflict_probability: VersionConflictProbability,
107}
108
109impl ChaosConfig {
110    /// Creates a config with a fixed seed so injected failures are reproducible.
111    ///
112    /// The failure and version-conflict probabilities still default to `0.0`;
113    /// chain [`with_failure_probability`](ChaosConfig::with_failure_probability)
114    /// and
115    /// [`with_version_conflict_probability`](ChaosConfig::with_version_conflict_probability)
116    /// to enable injection.
117    pub fn deterministic() -> Self {
118        Self {
119            deterministic_seed: Some(0),
120            ..Self::default()
121        }
122    }
123
124    /// Sets the probability of injecting a store failure on reads and appends.
125    ///
126    /// `probability` is clamped to the `[0.0, 1.0]` range, where `0.0` never
127    /// injects a failure and `1.0` always does.
128    pub fn with_failure_probability(mut self, probability: f32) -> Self {
129        self.failure_probability = FailureProbability::try_new(probability.clamp(0.0, 1.0))
130            .expect("clamped value is always valid");
131        self
132    }
133
134    /// Sets the probability of injecting a version conflict on appends.
135    ///
136    /// `probability` is clamped to the `[0.0, 1.0]` range, where `0.0` never
137    /// injects a conflict and `1.0` always does.
138    pub fn with_version_conflict_probability(mut self, probability: f32) -> Self {
139        self.version_conflict_probability =
140            VersionConflictProbability::try_new(probability.clamp(0.0, 1.0))
141                .expect("clamped value is always valid");
142        self
143    }
144}
145
146impl Default for ChaosConfig {
147    fn default() -> Self {
148        Self {
149            deterministic_seed: None,
150            failure_probability: FailureProbability::try_new(0.0)
151                .expect("0.0 is valid probability"),
152            version_conflict_probability: VersionConflictProbability::try_new(0.0)
153                .expect("0.0 is valid probability"),
154        }
155    }
156}
157
158/// Extension trait that wraps any `EventStore` in a [`ChaosEventStore`].
159///
160/// This is the canonical entry point for chaos testing. It is implemented for
161/// every `EventStore`, so call `with_chaos` directly on a base store:
162///
163/// ```no_run
164/// use eventcore_memory::InMemoryEventStore;
165/// use eventcore_testing::chaos::{ChaosConfig, ChaosEventStoreExt};
166///
167/// let chaos_store =
168///     InMemoryEventStore::new().with_chaos(ChaosConfig::deterministic().with_failure_probability(0.5));
169/// # let _ = chaos_store;
170/// ```
171pub trait ChaosEventStoreExt: Sized {
172    /// Wraps `self` in a [`ChaosEventStore`] configured by `config`.
173    fn with_chaos(self, config: ChaosConfig) -> ChaosEventStore<Self>;
174}
175
176/// An `EventStore` wrapper that injects probabilistic failures and conflicts.
177///
178/// `ChaosEventStore` forwards reads and appends to the wrapped store, but first
179/// rolls against the probabilities in its [`ChaosConfig`]: it may return a
180/// `StoreFailure` on either operation, or a `VersionConflict` on appends. Because
181/// it implements `EventStore`, it can be passed anywhere a backend is expected,
182/// including `eventcore::execute()`.
183///
184/// Prefer constructing one via [`ChaosEventStoreExt::with_chaos`] rather than
185/// calling [`ChaosEventStore::new`] directly.
186pub struct ChaosEventStore<S> {
187    store: S,
188    config: ChaosConfig,
189    rng: Mutex<StdRng>,
190}
191
192impl<S> ChaosEventStore<S> {
193    /// Wraps `store` with chaos injection driven by `config`.
194    ///
195    /// If the config carries a deterministic seed, the internal RNG is seeded
196    /// from it for reproducible failures; otherwise a random seed is used.
197    /// Most callers should use [`ChaosEventStoreExt::with_chaos`] instead.
198    pub fn new(store: S, config: ChaosConfig) -> Self {
199        let rng = match config.deterministic_seed {
200            Some(seed) => StdRng::seed_from_u64(seed),
201            None => StdRng::seed_from_u64(random()),
202        };
203
204        Self {
205            store,
206            config,
207            rng: Mutex::new(rng),
208        }
209    }
210
211    fn should_inject<P: Into<f32>>(&self, probability: P) -> bool {
212        let prob_f32: f32 = probability.into();
213
214        if prob_f32 <= 0.0 {
215            return false;
216        }
217
218        if prob_f32 >= 1.0 {
219            return true;
220        }
221
222        let mut rng = self
223            .rng
224            .lock()
225            .expect("chaos RNG mutex should not be poisoned");
226
227        rng.random_bool(prob_f32 as f64)
228    }
229}
230
231impl<S> EventStore for ChaosEventStore<S>
232where
233    S: EventStore + Sync,
234{
235    fn read_stream<E: Event>(
236        &self,
237        stream_id: StreamId,
238    ) -> impl Future<Output = Result<EventStream<E>, EventStoreError>> + Send {
239        let should_fail = self.should_inject(self.config.failure_probability);
240        let store = &self.store;
241
242        async move {
243            if should_fail {
244                return Err(EventStoreError::StoreFailure {
245                    operation: Operation::ReadStream,
246                });
247            }
248
249            store.read_stream(stream_id).await
250        }
251    }
252
253    fn append_events(
254        &self,
255        writes: StreamWrites,
256    ) -> impl Future<Output = Result<EventStreamSlice, EventStoreError>> + Send {
257        let should_conflict = self.should_inject(self.config.version_conflict_probability);
258        let should_fail = self.should_inject(self.config.failure_probability);
259        let store = &self.store;
260
261        async move {
262            if should_conflict {
263                return Err(EventStoreError::VersionConflict {
264                    stream_id: StreamId::try_new("chaos-conflict").expect("valid"),
265                    expected: StreamVersion::new(0),
266                    actual: StreamVersion::new(1),
267                });
268            }
269
270            if should_fail {
271                return Err(EventStoreError::StoreFailure {
272                    operation: Operation::AppendEvents,
273                });
274            }
275
276            store.append_events(writes).await
277        }
278    }
279}
280
281impl<S> ChaosEventStoreExt for S
282where
283    S: EventStore + Sync,
284{
285    fn with_chaos(self, config: ChaosConfig) -> ChaosEventStore<Self> {
286        ChaosEventStore::new(self, config)
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293    use eventcore_memory::InMemoryEventStore;
294    use eventcore_types::StreamVersion;
295    use serde::{Deserialize, Serialize};
296
297    #[derive(Debug, Clone, Serialize, Deserialize)]
298    struct PassthroughEvent {
299        stream_id: StreamId,
300    }
301
302    impl Event for PassthroughEvent {
303        fn stream_id(&self) -> &StreamId {
304            &self.stream_id
305        }
306
307        fn event_type_name() -> &'static str {
308            "PassthroughEvent"
309        }
310    }
311
312    #[test]
313    fn deterministic_config_sets_seed() {
314        let default_is_none = ChaosConfig::default().deterministic_seed.is_none();
315        let deterministic_is_some = ChaosConfig::deterministic().deterministic_seed.is_some();
316
317        assert!(default_is_none && deterministic_is_some);
318    }
319
320    #[tokio::test]
321    async fn zero_probability_passthrough_allows_normal_operations() {
322        let stream_id = StreamId::try_new("zero-probability-stream").expect("valid stream id");
323        let append_writes = StreamWrites::new()
324            .register_stream(stream_id.clone(), StreamVersion::new(0))
325            .and_then(|writes| {
326                writes.append(PassthroughEvent {
327                    stream_id: stream_id.clone(),
328                })
329            })
330            .expect("writes builder should succeed");
331
332        let base_store = InMemoryEventStore::new();
333        let chaos_store = base_store.with_chaos(ChaosConfig::default());
334        let append_result = chaos_store.append_events(append_writes).await;
335        let read_result = chaos_store.read_stream::<PassthroughEvent>(stream_id).await;
336
337        assert!(append_result.is_ok() && read_result.is_ok());
338    }
339
340    #[test]
341    fn deterministic_half_probability_does_not_inject_immediately() {
342        let chaos_store = ChaosEventStore::new(
343            InMemoryEventStore::new(),
344            ChaosConfig::deterministic().with_failure_probability(0.5),
345        );
346
347        assert!(
348            !chaos_store.should_inject(FailureProbability::try_new(0.5).expect("0.5 is valid"))
349        );
350    }
351}