eventcore_testing/chaos.rs
1//! Chaos testing for EventCore backends.
2//!
3//! This module provides [`ChaosEventStore`], a wrapper around any `EventStore`
4//! that injects probabilistic read/write failures and version conflicts. Use it
5//! to verify that command retry logic handles transient errors correctly.
6//!
7//! The canonical entry point is the [`ChaosEventStoreExt::with_chaos`] extension
8//! method, which is implemented for every `EventStore`:
9//!
10//! ```no_run
11//! use eventcore_memory::InMemoryEventStore;
12//! use eventcore_testing::chaos::{ChaosConfig, ChaosEventStoreExt};
13//!
14//! // Deterministic seed so failures are reproducible across runs.
15//! let base_store = InMemoryEventStore::new();
16//! let chaos_store =
17//! base_store.with_chaos(ChaosConfig::deterministic().with_failure_probability(0.5));
18//!
19//! // `chaos_store` is itself an `EventStore`, so it can be passed to
20//! // `eventcore::execute()` like any other backend.
21//! # let _ = chaos_store;
22//! ```
23
24use std::{future::Future, sync::Mutex};
25
26use eventcore_types::{
27 Event, EventStore, EventStoreError, EventStream, EventStreamSlice, Operation, StreamId,
28 StreamVersion, StreamWrites,
29};
30use nutype::nutype;
31use rand::{RngExt, SeedableRng, random, rngs::StdRng};
32
33/// Probability of injecting read/write failures for chaos testing.
34///
35/// FailureProbability represents a value in the range [0.0, 1.0] where 0.0 means
36/// never inject failures and 1.0 means always inject failures.
37///
38/// # Examples
39///
40/// ```no_run
41/// use eventcore_testing::chaos::FailureProbability;
42///
43/// let never = FailureProbability::try_new(0.0).expect("0.0 is valid");
44/// let sometimes = FailureProbability::try_new(0.5).expect("0.5 is valid");
45/// let always = FailureProbability::try_new(1.0).expect("1.0 is valid");
46/// # let _ = (never, sometimes, always);
47///
48/// // Values outside [0.0, 1.0] are rejected
49/// assert!(FailureProbability::try_new(1.5).is_err());
50/// assert!(FailureProbability::try_new(-0.1).is_err());
51/// ```
52#[nutype(
53 validate(greater_or_equal = 0.0, less_or_equal = 1.0),
54 derive(Debug, Clone, Copy, PartialEq, PartialOrd, Display, Into)
55)]
56pub struct FailureProbability(f32);
57
58/// Probability of injecting version conflicts for chaos testing.
59///
60/// VersionConflictProbability represents a value in the range [0.0, 1.0] where 0.0
61/// means never inject conflicts and 1.0 means always inject conflicts.
62///
63/// # Examples
64///
65/// ```no_run
66/// use eventcore_testing::chaos::VersionConflictProbability;
67///
68/// let never = VersionConflictProbability::try_new(0.0).expect("0.0 is valid");
69/// let sometimes = VersionConflictProbability::try_new(0.5).expect("0.5 is valid");
70/// let always = VersionConflictProbability::try_new(1.0).expect("1.0 is valid");
71/// # let _ = (never, sometimes, always);
72///
73/// // Values outside [0.0, 1.0] are rejected
74/// assert!(VersionConflictProbability::try_new(1.5).is_err());
75/// assert!(VersionConflictProbability::try_new(-0.1).is_err());
76/// ```
77#[nutype(
78 validate(greater_or_equal = 0.0, less_or_equal = 1.0),
79 derive(Debug, Clone, Copy, PartialEq, PartialOrd, Display, Into)
80)]
81pub struct VersionConflictProbability(f32);
82
83/// Configuration controlling how a [`ChaosEventStore`] injects failures.
84///
85/// A `ChaosConfig` holds an optional deterministic seed plus the probabilities
86/// of injecting store failures and version conflicts. The probabilities default
87/// to `0.0` (no injection); use the builder methods to raise them.
88///
89/// Start from either [`ChaosConfig::default`] (random seed, no injection) or
90/// [`ChaosConfig::deterministic`] (fixed seed for reproducible runs), then chain
91/// [`with_failure_probability`](ChaosConfig::with_failure_probability) and
92/// [`with_version_conflict_probability`](ChaosConfig::with_version_conflict_probability).
93///
94/// ```no_run
95/// use eventcore_testing::chaos::ChaosConfig;
96///
97/// let config = ChaosConfig::deterministic()
98/// .with_failure_probability(0.25)
99/// .with_version_conflict_probability(0.1);
100/// # let _ = config;
101/// ```
102#[derive(Debug, Clone)]
103pub struct ChaosConfig {
104 deterministic_seed: Option<u64>,
105 failure_probability: FailureProbability,
106 version_conflict_probability: VersionConflictProbability,
107}
108
109impl ChaosConfig {
110 /// Creates a config with a fixed seed so injected failures are reproducible.
111 ///
112 /// The failure and version-conflict probabilities still default to `0.0`;
113 /// chain [`with_failure_probability`](ChaosConfig::with_failure_probability)
114 /// and
115 /// [`with_version_conflict_probability`](ChaosConfig::with_version_conflict_probability)
116 /// to enable injection.
117 pub fn deterministic() -> Self {
118 Self {
119 deterministic_seed: Some(0),
120 ..Self::default()
121 }
122 }
123
124 /// Sets the probability of injecting a store failure on reads and appends.
125 ///
126 /// `probability` is clamped to the `[0.0, 1.0]` range, where `0.0` never
127 /// injects a failure and `1.0` always does.
128 pub fn with_failure_probability(mut self, probability: f32) -> Self {
129 self.failure_probability = FailureProbability::try_new(probability.clamp(0.0, 1.0))
130 .expect("clamped value is always valid");
131 self
132 }
133
134 /// Sets the probability of injecting a version conflict on appends.
135 ///
136 /// `probability` is clamped to the `[0.0, 1.0]` range, where `0.0` never
137 /// injects a conflict and `1.0` always does.
138 pub fn with_version_conflict_probability(mut self, probability: f32) -> Self {
139 self.version_conflict_probability =
140 VersionConflictProbability::try_new(probability.clamp(0.0, 1.0))
141 .expect("clamped value is always valid");
142 self
143 }
144}
145
146impl Default for ChaosConfig {
147 fn default() -> Self {
148 Self {
149 deterministic_seed: None,
150 failure_probability: FailureProbability::try_new(0.0)
151 .expect("0.0 is valid probability"),
152 version_conflict_probability: VersionConflictProbability::try_new(0.0)
153 .expect("0.0 is valid probability"),
154 }
155 }
156}
157
158/// Extension trait that wraps any `EventStore` in a [`ChaosEventStore`].
159///
160/// This is the canonical entry point for chaos testing. It is implemented for
161/// every `EventStore`, so call `with_chaos` directly on a base store:
162///
163/// ```no_run
164/// use eventcore_memory::InMemoryEventStore;
165/// use eventcore_testing::chaos::{ChaosConfig, ChaosEventStoreExt};
166///
167/// let chaos_store =
168/// InMemoryEventStore::new().with_chaos(ChaosConfig::deterministic().with_failure_probability(0.5));
169/// # let _ = chaos_store;
170/// ```
171pub trait ChaosEventStoreExt: Sized {
172 /// Wraps `self` in a [`ChaosEventStore`] configured by `config`.
173 fn with_chaos(self, config: ChaosConfig) -> ChaosEventStore<Self>;
174}
175
176/// An `EventStore` wrapper that injects probabilistic failures and conflicts.
177///
178/// `ChaosEventStore` forwards reads and appends to the wrapped store, but first
179/// rolls against the probabilities in its [`ChaosConfig`]: it may return a
180/// `StoreFailure` on either operation, or a `VersionConflict` on appends. Because
181/// it implements `EventStore`, it can be passed anywhere a backend is expected,
182/// including `eventcore::execute()`.
183///
184/// Prefer constructing one via [`ChaosEventStoreExt::with_chaos`] rather than
185/// calling [`ChaosEventStore::new`] directly.
186pub struct ChaosEventStore<S> {
187 store: S,
188 config: ChaosConfig,
189 rng: Mutex<StdRng>,
190}
191
192impl<S> ChaosEventStore<S> {
193 /// Wraps `store` with chaos injection driven by `config`.
194 ///
195 /// If the config carries a deterministic seed, the internal RNG is seeded
196 /// from it for reproducible failures; otherwise a random seed is used.
197 /// Most callers should use [`ChaosEventStoreExt::with_chaos`] instead.
198 pub fn new(store: S, config: ChaosConfig) -> Self {
199 let rng = match config.deterministic_seed {
200 Some(seed) => StdRng::seed_from_u64(seed),
201 None => StdRng::seed_from_u64(random()),
202 };
203
204 Self {
205 store,
206 config,
207 rng: Mutex::new(rng),
208 }
209 }
210
211 fn should_inject<P: Into<f32>>(&self, probability: P) -> bool {
212 let prob_f32: f32 = probability.into();
213
214 if prob_f32 <= 0.0 {
215 return false;
216 }
217
218 if prob_f32 >= 1.0 {
219 return true;
220 }
221
222 let mut rng = self
223 .rng
224 .lock()
225 .expect("chaos RNG mutex should not be poisoned");
226
227 rng.random_bool(prob_f32 as f64)
228 }
229}
230
231impl<S> EventStore for ChaosEventStore<S>
232where
233 S: EventStore + Sync,
234{
235 fn read_stream<E: Event>(
236 &self,
237 stream_id: StreamId,
238 ) -> impl Future<Output = Result<EventStream<E>, EventStoreError>> + Send {
239 let should_fail = self.should_inject(self.config.failure_probability);
240 let store = &self.store;
241
242 async move {
243 if should_fail {
244 return Err(EventStoreError::StoreFailure {
245 operation: Operation::ReadStream,
246 });
247 }
248
249 store.read_stream(stream_id).await
250 }
251 }
252
253 fn append_events(
254 &self,
255 writes: StreamWrites,
256 ) -> impl Future<Output = Result<EventStreamSlice, EventStoreError>> + Send {
257 let should_conflict = self.should_inject(self.config.version_conflict_probability);
258 let should_fail = self.should_inject(self.config.failure_probability);
259 let store = &self.store;
260
261 async move {
262 if should_conflict {
263 return Err(EventStoreError::VersionConflict {
264 stream_id: StreamId::try_new("chaos-conflict").expect("valid"),
265 expected: StreamVersion::new(0),
266 actual: StreamVersion::new(1),
267 });
268 }
269
270 if should_fail {
271 return Err(EventStoreError::StoreFailure {
272 operation: Operation::AppendEvents,
273 });
274 }
275
276 store.append_events(writes).await
277 }
278 }
279}
280
281impl<S> ChaosEventStoreExt for S
282where
283 S: EventStore + Sync,
284{
285 fn with_chaos(self, config: ChaosConfig) -> ChaosEventStore<Self> {
286 ChaosEventStore::new(self, config)
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293 use eventcore_memory::InMemoryEventStore;
294 use eventcore_types::StreamVersion;
295 use serde::{Deserialize, Serialize};
296
297 #[derive(Debug, Clone, Serialize, Deserialize)]
298 struct PassthroughEvent {
299 stream_id: StreamId,
300 }
301
302 impl Event for PassthroughEvent {
303 fn stream_id(&self) -> &StreamId {
304 &self.stream_id
305 }
306
307 fn event_type_name() -> &'static str {
308 "PassthroughEvent"
309 }
310 }
311
312 #[test]
313 fn deterministic_config_sets_seed() {
314 let default_is_none = ChaosConfig::default().deterministic_seed.is_none();
315 let deterministic_is_some = ChaosConfig::deterministic().deterministic_seed.is_some();
316
317 assert!(default_is_none && deterministic_is_some);
318 }
319
320 #[tokio::test]
321 async fn zero_probability_passthrough_allows_normal_operations() {
322 let stream_id = StreamId::try_new("zero-probability-stream").expect("valid stream id");
323 let append_writes = StreamWrites::new()
324 .register_stream(stream_id.clone(), StreamVersion::new(0))
325 .and_then(|writes| {
326 writes.append(PassthroughEvent {
327 stream_id: stream_id.clone(),
328 })
329 })
330 .expect("writes builder should succeed");
331
332 let base_store = InMemoryEventStore::new();
333 let chaos_store = base_store.with_chaos(ChaosConfig::default());
334 let append_result = chaos_store.append_events(append_writes).await;
335 let read_result = chaos_store.read_stream::<PassthroughEvent>(stream_id).await;
336
337 assert!(append_result.is_ok() && read_result.is_ok());
338 }
339
340 #[test]
341 fn deterministic_half_probability_does_not_inject_immediately() {
342 let chaos_store = ChaosEventStore::new(
343 InMemoryEventStore::new(),
344 ChaosConfig::deterministic().with_failure_probability(0.5),
345 );
346
347 assert!(
348 !chaos_store.should_inject(FailureProbability::try_new(0.5).expect("0.5 is valid"))
349 );
350 }
351}