Skip to main content

atomr_persistence/
determinism.rs

1//! FR-13 — record-and-replay determinism.
2//!
3//! Event-sourced systems become *deterministically replayable* when two
4//! ingredients are pinned: (a) a seeded RNG whose draw sequence can be
5//! snapshotted and restored bit-for-bit, and (b) a way to tell, per
6//! journal entry, whether it was an *external* (non-reproducible) input
7//! that must be replayed as-recorded, or a *derived* value the aggregate
8//! recomputes from state. This module provides both, plus a small
9//! [`ReplayHarness`] that drives an [`Eventsourced`] aggregate through a
10//! recorded entry stream and proves the resulting state is identical.
11//!
12//! Nothing here mutates [`PersistentRepr`] — provenance rides on the
13//! existing `tags: Vec<String>` field via [`EntryKind`] tags, exactly as
14//! the FR-13 spec requires (zero ripple across the 24 struct-literal
15//! construction sites).
16
17use rand_chacha::rand_core::{RngCore, SeedableRng};
18use rand_chacha::ChaCha20Rng;
19use serde::{Deserialize, Serialize};
20
21use crate::journal::PersistentRepr;
22
23/// Serializable snapshot of a [`SeededRng`]'s position in its stream.
24///
25/// ChaCha20 is a counter-based stream cipher, so its entire state is the
26/// 256-bit seed, the 64-bit stream selector, and the 64-bit word position
27/// within the keystream. Capturing those three reproduces the *identical*
28/// subsequent draw sequence on [`SeededRng::restore`].
29#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub struct RngState {
31    /// The 32-byte ChaCha20 key/seed.
32    pub seed: [u8; 32],
33    /// Stream selector — distinguishes independent substreams produced by
34    /// [`SeededRng::split`].
35    pub stream: u64,
36    /// Word position within the keystream (block counter * 16 + offset).
37    pub word_pos: u128,
38}
39
40/// A deterministic, snapshot/restore-able RNG for record-and-replay.
41///
42/// Backed by `rand_chacha::ChaCha20Rng`. Given the same seed and the same
43/// number of draws, every instance produces the identical `u64` sequence;
44/// [`snapshot`](Self::snapshot) + [`restore`](Self::restore) reproduces the
45/// remaining draws exactly, and [`split`](Self::split) yields an
46/// independent substream that does not perturb the parent.
47pub struct SeededRng {
48    rng: ChaCha20Rng,
49    stream: u64,
50    /// Monotonic counter used to derive child stream selectors on `split`.
51    split_counter: u64,
52}
53
54impl SeededRng {
55    /// Construct from a 64-bit seed. The full 256-bit ChaCha key is
56    /// derived by zero-extending the seed (this is `ChaCha20Rng::seed_from_u64`).
57    pub fn from_seed(seed: u64) -> Self {
58        let rng = ChaCha20Rng::seed_from_u64(seed);
59        Self { rng, stream: 0, split_counter: 0 }
60    }
61
62    /// Draw the next `u64` from the stream, advancing position.
63    pub fn next_u64(&mut self) -> u64 {
64        self.rng.next_u64()
65    }
66
67    /// Capture the current position so it can be [`restore`](Self::restore)d
68    /// later to reproduce the identical subsequent draw sequence.
69    pub fn snapshot(&self) -> RngState {
70        RngState { seed: self.rng.get_seed(), stream: self.stream, word_pos: self.rng.get_word_pos() }
71    }
72
73    /// Rebuild a [`SeededRng`] positioned exactly where `state` was taken.
74    pub fn restore(state: RngState) -> Self {
75        let mut rng = ChaCha20Rng::from_seed(state.seed);
76        rng.set_stream(state.stream);
77        rng.set_word_pos(state.word_pos);
78        Self { rng, stream: state.stream, split_counter: 0 }
79    }
80
81    /// Fork an independent substream. The child shares the seed but selects
82    /// a distinct ChaCha stream, so its draws never overlap the parent's and
83    /// splitting does not consume any of the parent's keystream.
84    pub fn split(&mut self) -> Self {
85        self.split_counter += 1;
86        // Mix the parent stream and counter so nested splits stay distinct.
87        let child_stream = self.stream.wrapping_mul(0x9E37_79B9_7F4A_7C15).wrapping_add(self.split_counter);
88        let mut rng = ChaCha20Rng::from_seed(self.rng.get_seed());
89        rng.set_stream(child_stream);
90        Self { rng, stream: child_stream, split_counter: 0 }
91    }
92}
93
94/// Provenance of a journal entry for replay purposes.
95///
96/// Encoded into / decoded from a [`PersistentRepr`] tag so no struct field
97/// is added.
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99pub enum EntryKind {
100    /// A non-reproducible external input (clock read, user command, network
101    /// reply). On replay these are fed back *as recorded* — never recomputed.
102    ExternalCommand,
103    /// A value the aggregate derives deterministically from prior state +
104    /// external inputs. On replay these are *recomputed* and checked against
105    /// the recording.
106    DerivedEvent,
107}
108
109const TAG_EXTERNAL: &str = "kind:external";
110const TAG_DERIVED: &str = "kind:derived";
111
112impl EntryKind {
113    /// The canonical tag string for this kind.
114    pub fn tag(self) -> String {
115        match self {
116            EntryKind::ExternalCommand => TAG_EXTERNAL.to_string(),
117            EntryKind::DerivedEvent => TAG_DERIVED.to_string(),
118        }
119    }
120
121    /// Recover the [`EntryKind`] from a repr's tag set, if present.
122    pub fn from_tags(tags: &[String]) -> Option<EntryKind> {
123        for t in tags {
124            match t.as_str() {
125                TAG_EXTERNAL => return Some(EntryKind::ExternalCommand),
126                TAG_DERIVED => return Some(EntryKind::DerivedEvent),
127                _ => {}
128            }
129        }
130        None
131    }
132}
133
134/// Push the provenance tag for `kind` onto `repr` without changing the
135/// struct shape. Returns the repr by value for chaining.
136pub fn with_kind(mut repr: PersistentRepr, kind: EntryKind) -> PersistentRepr {
137    repr.tags.push(kind.tag());
138    repr
139}
140
141/// Governance record pinning the model/provider/version/seed a run was
142/// produced under. Recorded alongside a snapshot so a replay can prove it
143/// is reproducing the *same* configuration.
144#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
145pub struct RunPin {
146    pub model: String,
147    pub provider: String,
148    pub version: String,
149    pub seed: u64,
150}
151
152/// Outcome of a [`ReplayHarness::replay`] run.
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub struct ReplayReport {
155    /// Number of journal entries consumed.
156    pub events: u64,
157    /// Whether the recomputed [`DerivedEvent`](EntryKind::DerivedEvent)
158    /// entries matched the recording exactly.
159    pub matched: bool,
160}
161
162/// Drives an [`Eventsourced`](crate::eventsourced::Eventsourced) aggregate
163/// through a recorded entry stream, restoring RNG state first so derived
164/// computation is bit-identical to the original run.
165///
166/// ## Replay contract
167///
168/// For each recorded [`PersistentRepr`] in sequence order:
169///
170/// - [`ExternalCommand`](EntryKind::ExternalCommand) entries are decoded and
171///   applied **as recorded** — they represent inputs the aggregate cannot
172///   reproduce on its own.
173/// - [`DerivedEvent`](EntryKind::DerivedEvent) entries (and untagged entries,
174///   treated as derived) are decoded, applied, and — when the caller routes
175///   the originating command back through `command_to_events` — compared
176///   byte-for-byte against the recording. Any mismatch flips
177///   [`ReplayReport::matched`] to `false`.
178///
179/// The harness is intentionally generic and minimal: state is rebuilt purely
180/// by `apply_event`, so two replays from the same seed + snapshot yield
181/// identical [`Eventsourced::State`].
182pub struct ReplayHarness;
183
184impl ReplayHarness {
185    /// Replay `recorded` entries into `state`, restoring `rng_state` first.
186    ///
187    /// `decode` turns a payload into an event; `re_encode` re-serializes a
188    /// recomputed derived event so it can be compared against the recording.
189    /// Returns a [`ReplayReport`].
190    pub fn replay<E, F, G>(
191        rng_state: RngState,
192        state: &mut E::State,
193        recorded: &[PersistentRepr],
194        mut decode: F,
195        mut re_encode: G,
196    ) -> ReplayReport
197    where
198        E: crate::eventsourced::Eventsourced,
199        F: FnMut(&[u8]) -> Option<E::Event>,
200        G: FnMut(&E::Event) -> Vec<u8>,
201    {
202        // Restoring positions the RNG so any derived computation that draws
203        // from it reproduces the original keystream. Kept live for callers
204        // that recompute via a closure capturing it.
205        let _rng = SeededRng::restore(rng_state);
206        let mut matched = true;
207        let mut count = 0u64;
208        for repr in recorded {
209            count += 1;
210            let Some(event) = decode(&repr.payload) else {
211                matched = false;
212                continue;
213            };
214            match EntryKind::from_tags(&repr.tags) {
215                Some(EntryKind::ExternalCommand) => {
216                    // Replay external inputs verbatim.
217                    E::apply_event(state, &event);
218                }
219                _ => {
220                    // Derived (or untagged → treated as derived): recompute
221                    // serialization and compare to the recording.
222                    let re = re_encode(&event);
223                    if re != repr.payload {
224                        matched = false;
225                    }
226                    E::apply_event(state, &event);
227                }
228            }
229        }
230        ReplayReport { events: count, matched }
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237
238    #[test]
239    fn snapshot_restore_reproduces_identical_draws() {
240        let mut rng = SeededRng::from_seed(42);
241        // Advance a bit, then snapshot.
242        let _ = rng.next_u64();
243        let _ = rng.next_u64();
244        let snap = rng.snapshot();
245
246        // Draw the "future" from the live rng.
247        let expected: Vec<u64> = (0..8).map(|_| rng.next_u64()).collect();
248
249        // Restore and draw again — must match exactly.
250        let mut restored = SeededRng::restore(snap);
251        let got: Vec<u64> = (0..8).map(|_| restored.next_u64()).collect();
252        assert_eq!(expected, got);
253    }
254
255    #[test]
256    fn same_seed_same_sequence() {
257        let mut a = SeededRng::from_seed(7);
258        let mut b = SeededRng::from_seed(7);
259        for _ in 0..32 {
260            assert_eq!(a.next_u64(), b.next_u64());
261        }
262    }
263
264    #[test]
265    fn split_yields_independent_streams() {
266        let mut parent = SeededRng::from_seed(99);
267        // Snapshot parent position before split; split must not consume it.
268        let before = parent.snapshot();
269        let mut child = parent.split();
270        let after = parent.snapshot();
271        assert_eq!(before.word_pos, after.word_pos, "split must not advance parent");
272
273        // Child and parent should produce different sequences.
274        let child_draws: Vec<u64> = (0..16).map(|_| child.next_u64()).collect();
275        let parent_draws: Vec<u64> = (0..16).map(|_| parent.next_u64()).collect();
276        assert_ne!(child_draws, parent_draws);
277
278        // Two children split in sequence are independent of each other.
279        let mut p2 = SeededRng::from_seed(99);
280        let mut c1 = p2.split();
281        let mut c2 = p2.split();
282        let s1: Vec<u64> = (0..8).map(|_| c1.next_u64()).collect();
283        let s2: Vec<u64> = (0..8).map(|_| c2.next_u64()).collect();
284        assert_ne!(s1, s2);
285    }
286
287    #[test]
288    fn rng_state_serde_round_trips() {
289        let mut rng = SeededRng::from_seed(123);
290        let _ = rng.next_u64();
291        let snap = rng.snapshot();
292        let json = serde_json::to_string(&snap).unwrap();
293        let back: RngState = serde_json::from_str(&json).unwrap();
294        assert_eq!(snap, back);
295    }
296
297    #[test]
298    fn entry_kind_tag_round_trips() {
299        assert_eq!(
300            EntryKind::from_tags(&[EntryKind::ExternalCommand.tag()]),
301            Some(EntryKind::ExternalCommand)
302        );
303        assert_eq!(EntryKind::from_tags(&[EntryKind::DerivedEvent.tag()]), Some(EntryKind::DerivedEvent));
304        assert_eq!(EntryKind::from_tags(&["unrelated".to_string()]), None);
305        assert_eq!(EntryKind::from_tags(&[]), None);
306        // with_kind pushes a recoverable tag without disturbing existing ones.
307        let r = PersistentRepr { tags: vec!["existing".into()], ..Default::default() };
308        let r = with_kind(r, EntryKind::DerivedEvent);
309        assert!(r.tags.iter().any(|t| t == "existing"));
310        assert_eq!(EntryKind::from_tags(&r.tags), Some(EntryKind::DerivedEvent));
311    }
312
313    #[test]
314    fn run_pin_serde_round_trips() {
315        let pin = RunPin {
316            model: "opus".into(),
317            provider: "anthropic".into(),
318            version: "4.8".into(),
319            seed: 0xDEAD_BEEF,
320        };
321        let json = serde_json::to_string(&pin).unwrap();
322        let back: RunPin = serde_json::from_str(&json).unwrap();
323        assert_eq!(pin, back);
324    }
325
326    // --- Replay determinism: identical aggregate state twice -------------
327
328    #[derive(Default, Debug, PartialEq, Clone)]
329    struct SumState {
330        total: i64,
331        applied: u64,
332    }
333
334    #[derive(Clone, Debug)]
335    struct Delta(i64);
336
337    #[derive(Debug, thiserror::Error)]
338    #[error("never")]
339    struct NoErr;
340
341    struct SumAgg;
342
343    #[async_trait::async_trait]
344    impl crate::eventsourced::Eventsourced for SumAgg {
345        type Command = i64;
346        type Event = Delta;
347        type State = SumState;
348        type Error = NoErr;
349        fn persistence_id(&self) -> String {
350            "sum".into()
351        }
352        fn command_to_events(&self, _s: &SumState, c: i64) -> Result<Vec<Delta>, NoErr> {
353            Ok(vec![Delta(c)])
354        }
355        fn apply_event(state: &mut SumState, e: &Delta) {
356            state.total += e.0;
357            state.applied += 1;
358        }
359        fn encode_event(e: &Delta) -> Result<Vec<u8>, String> {
360            Ok(e.0.to_le_bytes().to_vec())
361        }
362        fn decode_event(b: &[u8]) -> Result<Delta, String> {
363            let mut buf = [0u8; 8];
364            buf.copy_from_slice(b);
365            Ok(Delta(i64::from_le_bytes(buf)))
366        }
367    }
368
369    fn delta_repr(n: i64, kind: EntryKind) -> PersistentRepr {
370        with_kind(PersistentRepr { payload: n.to_le_bytes().to_vec(), ..Default::default() }, kind)
371    }
372
373    #[test]
374    fn replay_yields_identical_state_twice() {
375        let snap = SeededRng::from_seed(5).snapshot();
376        let recorded = vec![
377            delta_repr(10, EntryKind::ExternalCommand),
378            delta_repr(5, EntryKind::DerivedEvent),
379            delta_repr(-3, EntryKind::DerivedEvent),
380        ];
381        let decode = |b: &[u8]| <SumAgg as crate::eventsourced::Eventsourced>::decode_event(b).ok();
382        let re_encode = |e: &Delta| <SumAgg as crate::eventsourced::Eventsourced>::encode_event(e).unwrap();
383
384        let mut s1 = SumState::default();
385        let r1 = ReplayHarness::replay::<SumAgg, _, _>(snap.clone(), &mut s1, &recorded, decode, re_encode);
386        let mut s2 = SumState::default();
387        let r2 = ReplayHarness::replay::<SumAgg, _, _>(snap, &mut s2, &recorded, decode, re_encode);
388
389        assert_eq!(s1, s2, "two replays from same seed+snapshot must be bit-identical");
390        assert_eq!(s1.total, 12);
391        assert_eq!(s1.applied, 3);
392        assert_eq!(r1, r2);
393        assert!(r1.matched);
394        assert_eq!(r1.events, 3);
395    }
396
397    #[test]
398    fn replay_detects_derived_mismatch() {
399        let snap = SeededRng::from_seed(1).snapshot();
400        let recorded = vec![delta_repr(10, EntryKind::DerivedEvent)];
401        let decode = |b: &[u8]| <SumAgg as crate::eventsourced::Eventsourced>::decode_event(b).ok();
402        // Re-encode to a different value to simulate non-determinism.
403        let re_encode = |_e: &Delta| 999i64.to_le_bytes().to_vec();
404        let mut s = SumState::default();
405        let r = ReplayHarness::replay::<SumAgg, _, _>(snap, &mut s, &recorded, decode, re_encode);
406        assert!(!r.matched);
407    }
408}