Skip to main content

axon/replay_token/
executor.rs

1//! [`ReplayExecutor`] — re-run an effect from a token and detect
2//! divergence.
3//!
4//! The executor is intentionally mechanism-free: it doesn't know how
5//! to invoke any specific effect (that's adopter territory). It
6//! orchestrates the **protocol**:
7//!
8//! 1. Fetch the token from a [`ReplayLog`].
9//! 2. Hand the token's `inputs` + `sampling` + `model_version` to an
10//!    [`EffectInvoker`] the adopter supplied.
11//! 3. Canonical-hash the returned outputs.
12//! 4. Compare the recomputed hash to `token.outputs_hash_hex`.
13//! 5. Report either [`ReplayOutcome::Match`] or
14//!    [`ReplayOutcome::Diverged`].
15//!
16//! Adopters plug in whichever effect dispatcher they use — there's
17//! no assumption about provider, transport, or async runtime beyond
18//! `async_trait`.
19
20use async_trait::async_trait;
21use serde_json::Value;
22
23use crate::replay_token::log::{ReplayLog, ReplayLogError};
24use crate::replay_token::token::{canonical_hash, ReplayToken, SamplingParams};
25
26// ── Adopter-supplied effect invoker ──────────────────────────────────
27
28/// What the adopter plugs in. Given the replay inputs +
29/// model/sampling context, produce the effect's output value.
30///
31/// Implementations should be deterministic for deterministic effects
32/// (DB reads against a snapshot, pure transformations) and honour
33/// `sampling.seed` for LLM inference. Non-seedable providers return
34/// an [`EffectInvokerError::NonReplayable`] so the executor's
35/// divergence report pinpoints the cause.
36#[async_trait]
37pub trait EffectInvoker: Send + Sync {
38    async fn invoke(
39        &self,
40        effect_name: &str,
41        inputs: &Value,
42        model_version: &str,
43        sampling: &SamplingParams,
44    ) -> Result<Value, EffectInvokerError>;
45}
46
47#[derive(Debug)]
48pub enum EffectInvokerError {
49    NonReplayable(String),
50    Runtime(String),
51}
52
53impl std::fmt::Display for EffectInvokerError {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        match self {
56            Self::NonReplayable(m) => write!(f, "non-replayable: {m}"),
57            Self::Runtime(m) => write!(f, "invoker runtime: {m}"),
58        }
59    }
60}
61
62impl std::error::Error for EffectInvokerError {}
63
64// ── Outcome ──────────────────────────────────────────────────────────
65
66/// What a single-token replay produced.
67#[derive(Debug, Clone, PartialEq)]
68pub enum ReplayOutcome {
69    /// The recomputed outputs hashed identically to the token's
70    /// recorded hash — the effect is deterministically reproducible
71    /// under the conditions the token recorded.
72    Match {
73        token_hash_hex: String,
74    },
75    /// The outputs diverged. `divergence` carries the full report so
76    /// an operator can pinpoint what differs without re-running.
77    Diverged {
78        token_hash_hex: String,
79        divergence: ReplayDivergence,
80    },
81}
82
83#[derive(Debug, Clone, PartialEq)]
84pub struct ReplayDivergence {
85    pub expected_outputs_hash_hex: String,
86    pub actual_outputs_hash_hex: String,
87    pub actual_outputs: Value,
88}
89
90// ── Executor errors ──────────────────────────────────────────────────
91
92#[derive(Debug)]
93pub enum ReplayExecutorError {
94    Log(ReplayLogError),
95    Invoker(EffectInvokerError),
96}
97
98impl std::fmt::Display for ReplayExecutorError {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        match self {
101            Self::Log(e) => write!(f, "replay log: {e}"),
102            Self::Invoker(e) => write!(f, "effect invoker: {e}"),
103        }
104    }
105}
106
107impl std::error::Error for ReplayExecutorError {}
108
109impl From<ReplayLogError> for ReplayExecutorError {
110    fn from(e: ReplayLogError) -> Self {
111        Self::Log(e)
112    }
113}
114
115impl From<EffectInvokerError> for ReplayExecutorError {
116    fn from(e: EffectInvokerError) -> Self {
117        Self::Invoker(e)
118    }
119}
120
121// ── Executor ─────────────────────────────────────────────────────────
122
123pub struct ReplayExecutor<L: ReplayLog, I: EffectInvoker> {
124    pub log: L,
125    pub invoker: I,
126}
127
128impl<L: ReplayLog, I: EffectInvoker> ReplayExecutor<L, I> {
129    pub fn new(log: L, invoker: I) -> Self {
130        ReplayExecutor { log, invoker }
131    }
132
133    /// Replay a single token; return the outcome.
134    pub async fn replay_token(
135        &self,
136        token_hash_hex: &str,
137    ) -> Result<ReplayOutcome, ReplayExecutorError> {
138        let token = self.log.get(token_hash_hex).await?;
139        Ok(self.verify_token(&token).await?)
140    }
141
142    /// Replay every token for a flow, short-circuiting at the first
143    /// divergence. Adopters that prefer to collect every divergence
144    /// can call [`Self::verify_token`] in a custom loop.
145    pub async fn replay_flow(
146        &self,
147        flow_id: &str,
148    ) -> Result<Vec<ReplayOutcome>, ReplayExecutorError> {
149        let tokens = self.log.tokens_for_flow(flow_id).await?;
150        let mut outcomes = Vec::with_capacity(tokens.len());
151        for t in tokens {
152            let outcome = self.verify_token(&t).await?;
153            let diverged = matches!(outcome, ReplayOutcome::Diverged { .. });
154            outcomes.push(outcome);
155            if diverged {
156                break;
157            }
158        }
159        Ok(outcomes)
160    }
161
162    /// Low-level — given a concrete token, re-invoke + compare.
163    pub async fn verify_token(
164        &self,
165        token: &ReplayToken,
166    ) -> Result<ReplayOutcome, EffectInvokerError> {
167        let actual = self
168            .invoker
169            .invoke(
170                &token.effect_name,
171                &token.inputs,
172                &token.model_version,
173                &token.sampling,
174            )
175            .await?;
176        let actual_hash = canonical_hash(&actual);
177        let actual_hash_hex = hex(&actual_hash);
178        if actual_hash_hex == token.outputs_hash_hex {
179            Ok(ReplayOutcome::Match {
180                token_hash_hex: token.token_hash_hex.clone(),
181            })
182        } else {
183            Ok(ReplayOutcome::Diverged {
184                token_hash_hex: token.token_hash_hex.clone(),
185                divergence: ReplayDivergence {
186                    expected_outputs_hash_hex: token.outputs_hash_hex.clone(),
187                    actual_outputs_hash_hex: actual_hash_hex,
188                    actual_outputs: actual,
189                },
190            })
191        }
192    }
193}
194
195fn hex(bytes: &[u8]) -> String {
196    let mut out = String::with_capacity(bytes.len() * 2);
197    for b in bytes {
198        out.push_str(&format!("{b:02x}"));
199    }
200    out
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use crate::replay_token::log::InMemoryReplayLog;
207    use crate::replay_token::token::{ReplayTokenBuilder, SamplingParams};
208    use chrono::{TimeZone, Utc};
209    use serde_json::json;
210
211    struct FixedInvoker {
212        returns: Value,
213    }
214
215    #[async_trait]
216    impl EffectInvoker for FixedInvoker {
217        async fn invoke(
218            &self,
219            _effect_name: &str,
220            _inputs: &Value,
221            _model_version: &str,
222            _sampling: &SamplingParams,
223        ) -> Result<Value, EffectInvokerError> {
224            Ok(self.returns.clone())
225        }
226    }
227
228    fn mk_token(effect: &str, outputs: Value) -> ReplayToken {
229        ReplayTokenBuilder::new()
230            .effect_name(effect)
231            .inputs(json!({"flow_id": "f1"}))
232            .outputs(outputs)
233            .model_version("axon.builtin.test.v1")
234            .sampling(SamplingParams::default())
235            .timestamp(
236                Utc.with_ymd_and_hms(2026, 4, 22, 12, 0, 0).unwrap(),
237            )
238            .nonce([0u8; 16])
239            .mint()
240    }
241
242    #[tokio::test]
243    async fn match_when_outputs_are_bit_identical() {
244        let log = InMemoryReplayLog::new();
245        let t = mk_token("call_tool:x", json!({"a": 1, "b": 2}));
246        log.append(t.clone()).await.unwrap();
247
248        let invoker = FixedInvoker {
249            returns: json!({"b": 2, "a": 1}), // keys reordered — canonical hash is key-order-independent
250        };
251        let executor = ReplayExecutor::new(log, invoker);
252        let outcome = executor.replay_token(&t.token_hash_hex).await.unwrap();
253        matches!(outcome, ReplayOutcome::Match { .. });
254    }
255
256    #[tokio::test]
257    async fn diverge_when_outputs_differ() {
258        let log = InMemoryReplayLog::new();
259        let t = mk_token("call_tool:x", json!({"a": 1}));
260        log.append(t.clone()).await.unwrap();
261
262        let invoker = FixedInvoker {
263            returns: json!({"a": 999}),
264        };
265        let executor = ReplayExecutor::new(log, invoker);
266        let outcome = executor.replay_token(&t.token_hash_hex).await.unwrap();
267        match outcome {
268            ReplayOutcome::Diverged { divergence, .. } => {
269                assert_eq!(
270                    divergence.expected_outputs_hash_hex,
271                    t.outputs_hash_hex
272                );
273                assert_ne!(
274                    divergence.actual_outputs_hash_hex,
275                    t.outputs_hash_hex
276                );
277                assert_eq!(divergence.actual_outputs, json!({"a": 999}));
278            }
279            other => panic!("expected Diverged, got {other:?}"),
280        }
281    }
282
283    #[tokio::test]
284    async fn replay_flow_short_circuits_at_first_divergence() {
285        let log = InMemoryReplayLog::new();
286        let t1 = mk_token("step1", json!({"x": 1}));
287        let t2 = mk_token("step2", json!({"x": 2}));
288        let t3 = mk_token("step3", json!({"x": 3}));
289        log.append(t1.clone()).await.unwrap();
290        // We'll force a divergence on step2 by making the invoker
291        // always return {x:1}.
292        log.append(t2.clone()).await.unwrap();
293        log.append(t3.clone()).await.unwrap();
294
295        let invoker = FixedInvoker {
296            returns: json!({"x": 1}),
297        };
298        let executor = ReplayExecutor::new(log, invoker);
299        let outcomes = executor.replay_flow("f1").await.unwrap();
300        // step1 matches, step2 diverges, step3 was never attempted.
301        assert_eq!(outcomes.len(), 2);
302        matches!(outcomes[0], ReplayOutcome::Match { .. });
303        matches!(outcomes[1], ReplayOutcome::Diverged { .. });
304    }
305}