eventcore_testing/
chaos.rs1use std::{future::Future, sync::Mutex};
2
3use eventcore_types::{
4 Event, EventStore, EventStoreError, EventStreamReader, EventStreamSlice, Operation, StreamId,
5 StreamVersion, StreamWrites,
6};
7use nutype::nutype;
8use rand::{Rng, SeedableRng, random, rngs::StdRng};
9
10#[nutype(
29 validate(greater_or_equal = 0.0, less_or_equal = 1.0),
30 derive(Debug, Clone, Copy, PartialEq, PartialOrd, Display, Into)
31)]
32pub struct FailureProbability(f32);
33
34#[nutype(
53 validate(greater_or_equal = 0.0, less_or_equal = 1.0),
54 derive(Debug, Clone, Copy, PartialEq, PartialOrd, Display, Into)
55)]
56pub struct VersionConflictProbability(f32);
57
58#[derive(Debug, Clone)]
59pub struct ChaosConfig {
60 deterministic_seed: Option<u64>,
61 failure_probability: FailureProbability,
62 version_conflict_probability: VersionConflictProbability,
63}
64
65impl ChaosConfig {
66 pub fn deterministic() -> Self {
67 Self {
68 deterministic_seed: Some(0),
69 ..Self::default()
70 }
71 }
72
73 pub fn with_failure_probability(mut self, probability: f32) -> Self {
74 self.failure_probability = FailureProbability::try_new(probability.clamp(0.0, 1.0))
75 .expect("clamped value is always valid");
76 self
77 }
78
79 pub fn with_version_conflict_probability(mut self, probability: f32) -> Self {
80 self.version_conflict_probability =
81 VersionConflictProbability::try_new(probability.clamp(0.0, 1.0))
82 .expect("clamped value is always valid");
83 self
84 }
85}
86
87impl Default for ChaosConfig {
88 fn default() -> Self {
89 Self {
90 deterministic_seed: None,
91 failure_probability: FailureProbability::try_new(0.0)
92 .expect("0.0 is valid probability"),
93 version_conflict_probability: VersionConflictProbability::try_new(0.0)
94 .expect("0.0 is valid probability"),
95 }
96 }
97}
98
99pub trait ChaosEventStoreExt: Sized {
100 fn with_chaos(self, config: ChaosConfig) -> ChaosEventStore<Self>;
101}
102
103pub struct ChaosEventStore<S> {
104 store: S,
105 config: ChaosConfig,
106 rng: Mutex<StdRng>,
107}
108
109impl<S> ChaosEventStore<S> {
110 pub fn new(store: S, config: ChaosConfig) -> Self {
111 let rng = match config.deterministic_seed {
112 Some(seed) => StdRng::seed_from_u64(seed),
113 None => StdRng::seed_from_u64(random()),
114 };
115
116 Self {
117 store,
118 config,
119 rng: Mutex::new(rng),
120 }
121 }
122
123 fn should_inject<P: Into<f32>>(&self, probability: P) -> bool {
124 let prob_f32: f32 = probability.into();
125
126 if prob_f32 <= 0.0 {
127 return false;
128 }
129
130 if prob_f32 >= 1.0 {
131 return true;
132 }
133
134 let mut rng = self
135 .rng
136 .lock()
137 .expect("chaos RNG mutex should not be poisoned");
138
139 rng.random_bool(prob_f32 as f64)
140 }
141}
142
143impl<S> EventStore for ChaosEventStore<S>
144where
145 S: EventStore + Sync,
146{
147 fn read_stream<E: Event>(
148 &self,
149 stream_id: StreamId,
150 ) -> impl Future<Output = Result<EventStreamReader<E>, EventStoreError>> + Send {
151 let should_fail = self.should_inject(self.config.failure_probability);
152 let store = &self.store;
153
154 async move {
155 if should_fail {
156 return Err(EventStoreError::StoreFailure {
157 operation: Operation::ReadStream,
158 });
159 }
160
161 store.read_stream(stream_id).await
162 }
163 }
164
165 fn append_events(
166 &self,
167 writes: StreamWrites,
168 ) -> impl Future<Output = Result<EventStreamSlice, EventStoreError>> + Send {
169 let should_conflict = self.should_inject(self.config.version_conflict_probability);
170 let should_fail = self.should_inject(self.config.failure_probability);
171 let store = &self.store;
172
173 async move {
174 if should_conflict {
175 return Err(EventStoreError::VersionConflict {
176 stream_id: StreamId::try_new("chaos-conflict").expect("valid"),
177 expected: StreamVersion::new(0),
178 actual: StreamVersion::new(1),
179 });
180 }
181
182 if should_fail {
183 return Err(EventStoreError::StoreFailure {
184 operation: Operation::AppendEvents,
185 });
186 }
187
188 store.append_events(writes).await
189 }
190 }
191}
192
193impl<S> ChaosEventStoreExt for S
194where
195 S: EventStore + Sync,
196{
197 fn with_chaos(self, config: ChaosConfig) -> ChaosEventStore<Self> {
198 ChaosEventStore::new(self, config)
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use eventcore_memory::InMemoryEventStore;
206 use eventcore_types::StreamVersion;
207 use serde::{Deserialize, Serialize};
208
209 #[derive(Debug, Clone, Serialize, Deserialize)]
210 struct PassthroughEvent {
211 stream_id: StreamId,
212 }
213
214 impl Event for PassthroughEvent {
215 fn stream_id(&self) -> &StreamId {
216 &self.stream_id
217 }
218
219 fn event_type_name() -> &'static str {
220 "PassthroughEvent"
221 }
222 }
223
224 #[test]
225 fn deterministic_config_sets_seed() {
226 let default_is_none = ChaosConfig::default().deterministic_seed.is_none();
227 let deterministic_is_some = ChaosConfig::deterministic().deterministic_seed.is_some();
228
229 assert!(default_is_none && deterministic_is_some);
230 }
231
232 #[tokio::test]
233 async fn zero_probability_passthrough_allows_normal_operations() {
234 let stream_id = StreamId::try_new("zero-probability-stream").expect("valid stream id");
235 let append_writes = StreamWrites::new()
236 .register_stream(stream_id.clone(), StreamVersion::new(0))
237 .and_then(|writes| {
238 writes.append(PassthroughEvent {
239 stream_id: stream_id.clone(),
240 })
241 })
242 .expect("writes builder should succeed");
243
244 let base_store = InMemoryEventStore::new();
245 let chaos_store = base_store.with_chaos(ChaosConfig::default());
246 let append_result = chaos_store.append_events(append_writes).await;
247 let read_result = chaos_store.read_stream::<PassthroughEvent>(stream_id).await;
248
249 assert!(append_result.is_ok() && read_result.is_ok());
250 }
251
252 #[test]
253 fn deterministic_half_probability_does_not_inject_immediately() {
254 let chaos_store = ChaosEventStore::new(
255 InMemoryEventStore::new(),
256 ChaosConfig::deterministic().with_failure_probability(0.5),
257 );
258
259 assert!(
260 !chaos_store.should_inject(FailureProbability::try_new(0.5).expect("0.5 is valid"))
261 );
262 }
263}