eventcore_testing/
chaos.rs1use std::{future::Future, sync::Mutex};
2
3use eventcore_types::{
4 Event, EventStore, EventStoreError, EventStreamReader, EventStreamSlice, Operation, StreamId,
5 StreamWrites,
6};
7use nutype::nutype;
8use rand::{Rng, SeedableRng, random, rngs::StdRng};
9
10#[nutype(
29 validate(greater_or_equal = 0.0, less_or_equal = 1.0),
30 derive(Debug, Clone, Copy, PartialEq, PartialOrd, Display, Into)
31)]
32pub struct FailureProbability(f32);
33
34#[nutype(
53 validate(greater_or_equal = 0.0, less_or_equal = 1.0),
54 derive(Debug, Clone, Copy, PartialEq, PartialOrd, Display, Into)
55)]
56pub struct VersionConflictProbability(f32);
57
58#[derive(Debug, Clone)]
59pub struct ChaosConfig {
60 deterministic_seed: Option<u64>,
61 failure_probability: FailureProbability,
62 version_conflict_probability: VersionConflictProbability,
63}
64
65impl ChaosConfig {
66 pub fn deterministic() -> Self {
67 Self {
68 deterministic_seed: Some(0),
69 ..Self::default()
70 }
71 }
72
73 pub fn with_failure_probability(mut self, probability: f32) -> Self {
74 self.failure_probability = FailureProbability::try_new(probability.clamp(0.0, 1.0))
75 .expect("clamped value is always valid");
76 self
77 }
78
79 pub fn with_version_conflict_probability(mut self, probability: f32) -> Self {
80 self.version_conflict_probability =
81 VersionConflictProbability::try_new(probability.clamp(0.0, 1.0))
82 .expect("clamped value is always valid");
83 self
84 }
85}
86
87impl Default for ChaosConfig {
88 fn default() -> Self {
89 Self {
90 deterministic_seed: None,
91 failure_probability: FailureProbability::try_new(0.0)
92 .expect("0.0 is valid probability"),
93 version_conflict_probability: VersionConflictProbability::try_new(0.0)
94 .expect("0.0 is valid probability"),
95 }
96 }
97}
98
99pub trait ChaosEventStoreExt: Sized {
100 fn with_chaos(self, config: ChaosConfig) -> ChaosEventStore<Self>;
101}
102
103pub struct ChaosEventStore<S> {
104 store: S,
105 config: ChaosConfig,
106 rng: Mutex<StdRng>,
107}
108
109impl<S> ChaosEventStore<S> {
110 pub fn new(store: S, config: ChaosConfig) -> Self {
111 let rng = match config.deterministic_seed {
112 Some(seed) => StdRng::seed_from_u64(seed),
113 None => StdRng::seed_from_u64(random()),
114 };
115
116 Self {
117 store,
118 config,
119 rng: Mutex::new(rng),
120 }
121 }
122
123 fn should_inject<P: Into<f32>>(&self, probability: P) -> bool {
124 let prob_f32: f32 = probability.into();
125
126 if prob_f32 <= 0.0 {
127 return false;
128 }
129
130 if prob_f32 >= 1.0 {
131 return true;
132 }
133
134 let mut rng = self
135 .rng
136 .lock()
137 .expect("chaos RNG mutex should not be poisoned");
138
139 rng.random_bool(prob_f32 as f64)
140 }
141}
142
143impl<S> EventStore for ChaosEventStore<S>
144where
145 S: EventStore + Sync,
146{
147 fn read_stream<E: Event>(
148 &self,
149 stream_id: StreamId,
150 ) -> impl Future<Output = Result<EventStreamReader<E>, EventStoreError>> + Send {
151 let should_fail = self.should_inject(self.config.failure_probability);
152 let store = &self.store;
153
154 async move {
155 if should_fail {
156 return Err(EventStoreError::StoreFailure {
157 operation: Operation::ReadStream,
158 });
159 }
160
161 store.read_stream(stream_id).await
162 }
163 }
164
165 fn append_events(
166 &self,
167 writes: StreamWrites,
168 ) -> impl Future<Output = Result<EventStreamSlice, EventStoreError>> + Send {
169 let should_conflict = self.should_inject(self.config.version_conflict_probability);
170 let should_fail = self.should_inject(self.config.failure_probability);
171 let store = &self.store;
172
173 async move {
174 if should_conflict {
175 return Err(EventStoreError::VersionConflict);
176 }
177
178 if should_fail {
179 return Err(EventStoreError::StoreFailure {
180 operation: Operation::AppendEvents,
181 });
182 }
183
184 store.append_events(writes).await
185 }
186 }
187}
188
189impl<S> ChaosEventStoreExt for S
190where
191 S: EventStore + Sync,
192{
193 fn with_chaos(self, config: ChaosConfig) -> ChaosEventStore<Self> {
194 ChaosEventStore::new(self, config)
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use eventcore::StreamVersion;
202 use eventcore_memory::InMemoryEventStore;
203 use serde::{Deserialize, Serialize};
204
205 #[derive(Debug, Clone, Serialize, Deserialize)]
206 struct PassthroughEvent {
207 stream_id: StreamId,
208 }
209
210 impl Event for PassthroughEvent {
211 fn stream_id(&self) -> &StreamId {
212 &self.stream_id
213 }
214 }
215
216 #[test]
217 fn deterministic_config_sets_seed() {
218 let default_is_none = ChaosConfig::default().deterministic_seed.is_none();
219 let deterministic_is_some = ChaosConfig::deterministic().deterministic_seed.is_some();
220
221 assert!(default_is_none && deterministic_is_some);
222 }
223
224 #[tokio::test]
225 async fn zero_probability_passthrough_allows_normal_operations() {
226 let stream_id = StreamId::try_new("zero-probability-stream").expect("valid stream id");
227 let append_writes = StreamWrites::new()
228 .register_stream(stream_id.clone(), StreamVersion::new(0))
229 .and_then(|writes| {
230 writes.append(PassthroughEvent {
231 stream_id: stream_id.clone(),
232 })
233 })
234 .expect("writes builder should succeed");
235
236 let base_store = InMemoryEventStore::new();
237 let chaos_store = base_store.with_chaos(ChaosConfig::default());
238 let append_result = chaos_store.append_events(append_writes).await;
239 let read_result = chaos_store.read_stream::<PassthroughEvent>(stream_id).await;
240
241 assert!(append_result.is_ok() && read_result.is_ok());
242 }
243
244 #[test]
245 fn deterministic_half_probability_does_not_inject_immediately() {
246 let chaos_store = ChaosEventStore::new(
247 InMemoryEventStore::new(),
248 ChaosConfig::deterministic().with_failure_probability(0.5),
249 );
250
251 assert!(
252 !chaos_store.should_inject(FailureProbability::try_new(0.5).expect("0.5 is valid"))
253 );
254 }
255}