1use rand_chacha::rand_core::{RngCore, SeedableRng};
18use rand_chacha::ChaCha20Rng;
19use serde::{Deserialize, Serialize};
20
21use crate::journal::PersistentRepr;
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub struct RngState {
31 pub seed: [u8; 32],
33 pub stream: u64,
36 pub word_pos: u128,
38}
39
40pub struct SeededRng {
48 rng: ChaCha20Rng,
49 stream: u64,
50 split_counter: u64,
52}
53
54impl SeededRng {
55 pub fn from_seed(seed: u64) -> Self {
58 let rng = ChaCha20Rng::seed_from_u64(seed);
59 Self { rng, stream: 0, split_counter: 0 }
60 }
61
62 pub fn next_u64(&mut self) -> u64 {
64 self.rng.next_u64()
65 }
66
67 pub fn snapshot(&self) -> RngState {
70 RngState { seed: self.rng.get_seed(), stream: self.stream, word_pos: self.rng.get_word_pos() }
71 }
72
73 pub fn restore(state: RngState) -> Self {
75 let mut rng = ChaCha20Rng::from_seed(state.seed);
76 rng.set_stream(state.stream);
77 rng.set_word_pos(state.word_pos);
78 Self { rng, stream: state.stream, split_counter: 0 }
79 }
80
81 pub fn split(&mut self) -> Self {
85 self.split_counter += 1;
86 let child_stream = self.stream.wrapping_mul(0x9E37_79B9_7F4A_7C15).wrapping_add(self.split_counter);
88 let mut rng = ChaCha20Rng::from_seed(self.rng.get_seed());
89 rng.set_stream(child_stream);
90 Self { rng, stream: child_stream, split_counter: 0 }
91 }
92}
93
94#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99pub enum EntryKind {
100 ExternalCommand,
103 DerivedEvent,
107}
108
109const TAG_EXTERNAL: &str = "kind:external";
110const TAG_DERIVED: &str = "kind:derived";
111
112impl EntryKind {
113 pub fn tag(self) -> String {
115 match self {
116 EntryKind::ExternalCommand => TAG_EXTERNAL.to_string(),
117 EntryKind::DerivedEvent => TAG_DERIVED.to_string(),
118 }
119 }
120
121 pub fn from_tags(tags: &[String]) -> Option<EntryKind> {
123 for t in tags {
124 match t.as_str() {
125 TAG_EXTERNAL => return Some(EntryKind::ExternalCommand),
126 TAG_DERIVED => return Some(EntryKind::DerivedEvent),
127 _ => {}
128 }
129 }
130 None
131 }
132}
133
134pub fn with_kind(mut repr: PersistentRepr, kind: EntryKind) -> PersistentRepr {
137 repr.tags.push(kind.tag());
138 repr
139}
140
141#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
145pub struct RunPin {
146 pub model: String,
147 pub provider: String,
148 pub version: String,
149 pub seed: u64,
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub struct ReplayReport {
155 pub events: u64,
157 pub matched: bool,
160}
161
162pub struct ReplayHarness;
183
184impl ReplayHarness {
185 pub fn replay<E, F, G>(
191 rng_state: RngState,
192 state: &mut E::State,
193 recorded: &[PersistentRepr],
194 mut decode: F,
195 mut re_encode: G,
196 ) -> ReplayReport
197 where
198 E: crate::eventsourced::Eventsourced,
199 F: FnMut(&[u8]) -> Option<E::Event>,
200 G: FnMut(&E::Event) -> Vec<u8>,
201 {
202 let _rng = SeededRng::restore(rng_state);
206 let mut matched = true;
207 let mut count = 0u64;
208 for repr in recorded {
209 count += 1;
210 let Some(event) = decode(&repr.payload) else {
211 matched = false;
212 continue;
213 };
214 match EntryKind::from_tags(&repr.tags) {
215 Some(EntryKind::ExternalCommand) => {
216 E::apply_event(state, &event);
218 }
219 _ => {
220 let re = re_encode(&event);
223 if re != repr.payload {
224 matched = false;
225 }
226 E::apply_event(state, &event);
227 }
228 }
229 }
230 ReplayReport { events: count, matched }
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237
238 #[test]
239 fn snapshot_restore_reproduces_identical_draws() {
240 let mut rng = SeededRng::from_seed(42);
241 let _ = rng.next_u64();
243 let _ = rng.next_u64();
244 let snap = rng.snapshot();
245
246 let expected: Vec<u64> = (0..8).map(|_| rng.next_u64()).collect();
248
249 let mut restored = SeededRng::restore(snap);
251 let got: Vec<u64> = (0..8).map(|_| restored.next_u64()).collect();
252 assert_eq!(expected, got);
253 }
254
255 #[test]
256 fn same_seed_same_sequence() {
257 let mut a = SeededRng::from_seed(7);
258 let mut b = SeededRng::from_seed(7);
259 for _ in 0..32 {
260 assert_eq!(a.next_u64(), b.next_u64());
261 }
262 }
263
264 #[test]
265 fn split_yields_independent_streams() {
266 let mut parent = SeededRng::from_seed(99);
267 let before = parent.snapshot();
269 let mut child = parent.split();
270 let after = parent.snapshot();
271 assert_eq!(before.word_pos, after.word_pos, "split must not advance parent");
272
273 let child_draws: Vec<u64> = (0..16).map(|_| child.next_u64()).collect();
275 let parent_draws: Vec<u64> = (0..16).map(|_| parent.next_u64()).collect();
276 assert_ne!(child_draws, parent_draws);
277
278 let mut p2 = SeededRng::from_seed(99);
280 let mut c1 = p2.split();
281 let mut c2 = p2.split();
282 let s1: Vec<u64> = (0..8).map(|_| c1.next_u64()).collect();
283 let s2: Vec<u64> = (0..8).map(|_| c2.next_u64()).collect();
284 assert_ne!(s1, s2);
285 }
286
287 #[test]
288 fn rng_state_serde_round_trips() {
289 let mut rng = SeededRng::from_seed(123);
290 let _ = rng.next_u64();
291 let snap = rng.snapshot();
292 let json = serde_json::to_string(&snap).unwrap();
293 let back: RngState = serde_json::from_str(&json).unwrap();
294 assert_eq!(snap, back);
295 }
296
297 #[test]
298 fn entry_kind_tag_round_trips() {
299 assert_eq!(
300 EntryKind::from_tags(&[EntryKind::ExternalCommand.tag()]),
301 Some(EntryKind::ExternalCommand)
302 );
303 assert_eq!(EntryKind::from_tags(&[EntryKind::DerivedEvent.tag()]), Some(EntryKind::DerivedEvent));
304 assert_eq!(EntryKind::from_tags(&["unrelated".to_string()]), None);
305 assert_eq!(EntryKind::from_tags(&[]), None);
306 let r = PersistentRepr { tags: vec!["existing".into()], ..Default::default() };
308 let r = with_kind(r, EntryKind::DerivedEvent);
309 assert!(r.tags.iter().any(|t| t == "existing"));
310 assert_eq!(EntryKind::from_tags(&r.tags), Some(EntryKind::DerivedEvent));
311 }
312
313 #[test]
314 fn run_pin_serde_round_trips() {
315 let pin = RunPin {
316 model: "opus".into(),
317 provider: "anthropic".into(),
318 version: "4.8".into(),
319 seed: 0xDEAD_BEEF,
320 };
321 let json = serde_json::to_string(&pin).unwrap();
322 let back: RunPin = serde_json::from_str(&json).unwrap();
323 assert_eq!(pin, back);
324 }
325
326 #[derive(Default, Debug, PartialEq, Clone)]
329 struct SumState {
330 total: i64,
331 applied: u64,
332 }
333
334 #[derive(Clone, Debug)]
335 struct Delta(i64);
336
337 #[derive(Debug, thiserror::Error)]
338 #[error("never")]
339 struct NoErr;
340
341 struct SumAgg;
342
343 #[async_trait::async_trait]
344 impl crate::eventsourced::Eventsourced for SumAgg {
345 type Command = i64;
346 type Event = Delta;
347 type State = SumState;
348 type Error = NoErr;
349 fn persistence_id(&self) -> String {
350 "sum".into()
351 }
352 fn command_to_events(&self, _s: &SumState, c: i64) -> Result<Vec<Delta>, NoErr> {
353 Ok(vec![Delta(c)])
354 }
355 fn apply_event(state: &mut SumState, e: &Delta) {
356 state.total += e.0;
357 state.applied += 1;
358 }
359 fn encode_event(e: &Delta) -> Result<Vec<u8>, String> {
360 Ok(e.0.to_le_bytes().to_vec())
361 }
362 fn decode_event(b: &[u8]) -> Result<Delta, String> {
363 let mut buf = [0u8; 8];
364 buf.copy_from_slice(b);
365 Ok(Delta(i64::from_le_bytes(buf)))
366 }
367 }
368
369 fn delta_repr(n: i64, kind: EntryKind) -> PersistentRepr {
370 with_kind(PersistentRepr { payload: n.to_le_bytes().to_vec(), ..Default::default() }, kind)
371 }
372
373 #[test]
374 fn replay_yields_identical_state_twice() {
375 let snap = SeededRng::from_seed(5).snapshot();
376 let recorded = vec![
377 delta_repr(10, EntryKind::ExternalCommand),
378 delta_repr(5, EntryKind::DerivedEvent),
379 delta_repr(-3, EntryKind::DerivedEvent),
380 ];
381 let decode = |b: &[u8]| <SumAgg as crate::eventsourced::Eventsourced>::decode_event(b).ok();
382 let re_encode = |e: &Delta| <SumAgg as crate::eventsourced::Eventsourced>::encode_event(e).unwrap();
383
384 let mut s1 = SumState::default();
385 let r1 = ReplayHarness::replay::<SumAgg, _, _>(snap.clone(), &mut s1, &recorded, decode, re_encode);
386 let mut s2 = SumState::default();
387 let r2 = ReplayHarness::replay::<SumAgg, _, _>(snap, &mut s2, &recorded, decode, re_encode);
388
389 assert_eq!(s1, s2, "two replays from same seed+snapshot must be bit-identical");
390 assert_eq!(s1.total, 12);
391 assert_eq!(s1.applied, 3);
392 assert_eq!(r1, r2);
393 assert!(r1.matched);
394 assert_eq!(r1.events, 3);
395 }
396
397 #[test]
398 fn replay_detects_derived_mismatch() {
399 let snap = SeededRng::from_seed(1).snapshot();
400 let recorded = vec![delta_repr(10, EntryKind::DerivedEvent)];
401 let decode = |b: &[u8]| <SumAgg as crate::eventsourced::Eventsourced>::decode_event(b).ok();
402 let re_encode = |_e: &Delta| 999i64.to_le_bytes().to_vec();
404 let mut s = SumState::default();
405 let r = ReplayHarness::replay::<SumAgg, _, _>(snap, &mut s, &recorded, decode, re_encode);
406 assert!(!r.matched);
407 }
408}