1use async_trait::async_trait;
21use serde_json::Value;
22
23use crate::replay_token::log::{ReplayLog, ReplayLogError};
24use crate::replay_token::token::{canonical_hash, ReplayToken, SamplingParams};
25
26#[async_trait]
37pub trait EffectInvoker: Send + Sync {
38 async fn invoke(
39 &self,
40 effect_name: &str,
41 inputs: &Value,
42 model_version: &str,
43 sampling: &SamplingParams,
44 ) -> Result<Value, EffectInvokerError>;
45}
46
47#[derive(Debug)]
48pub enum EffectInvokerError {
49 NonReplayable(String),
50 Runtime(String),
51}
52
53impl std::fmt::Display for EffectInvokerError {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 match self {
56 Self::NonReplayable(m) => write!(f, "non-replayable: {m}"),
57 Self::Runtime(m) => write!(f, "invoker runtime: {m}"),
58 }
59 }
60}
61
62impl std::error::Error for EffectInvokerError {}
63
64#[derive(Debug, Clone, PartialEq)]
68pub enum ReplayOutcome {
69 Match {
73 token_hash_hex: String,
74 },
75 Diverged {
78 token_hash_hex: String,
79 divergence: ReplayDivergence,
80 },
81}
82
83#[derive(Debug, Clone, PartialEq)]
84pub struct ReplayDivergence {
85 pub expected_outputs_hash_hex: String,
86 pub actual_outputs_hash_hex: String,
87 pub actual_outputs: Value,
88}
89
90#[derive(Debug)]
93pub enum ReplayExecutorError {
94 Log(ReplayLogError),
95 Invoker(EffectInvokerError),
96}
97
98impl std::fmt::Display for ReplayExecutorError {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 match self {
101 Self::Log(e) => write!(f, "replay log: {e}"),
102 Self::Invoker(e) => write!(f, "effect invoker: {e}"),
103 }
104 }
105}
106
107impl std::error::Error for ReplayExecutorError {}
108
109impl From<ReplayLogError> for ReplayExecutorError {
110 fn from(e: ReplayLogError) -> Self {
111 Self::Log(e)
112 }
113}
114
115impl From<EffectInvokerError> for ReplayExecutorError {
116 fn from(e: EffectInvokerError) -> Self {
117 Self::Invoker(e)
118 }
119}
120
121pub struct ReplayExecutor<L: ReplayLog, I: EffectInvoker> {
124 pub log: L,
125 pub invoker: I,
126}
127
128impl<L: ReplayLog, I: EffectInvoker> ReplayExecutor<L, I> {
129 pub fn new(log: L, invoker: I) -> Self {
130 ReplayExecutor { log, invoker }
131 }
132
133 pub async fn replay_token(
135 &self,
136 token_hash_hex: &str,
137 ) -> Result<ReplayOutcome, ReplayExecutorError> {
138 let token = self.log.get(token_hash_hex).await?;
139 Ok(self.verify_token(&token).await?)
140 }
141
142 pub async fn replay_flow(
146 &self,
147 flow_id: &str,
148 ) -> Result<Vec<ReplayOutcome>, ReplayExecutorError> {
149 let tokens = self.log.tokens_for_flow(flow_id).await?;
150 let mut outcomes = Vec::with_capacity(tokens.len());
151 for t in tokens {
152 let outcome = self.verify_token(&t).await?;
153 let diverged = matches!(outcome, ReplayOutcome::Diverged { .. });
154 outcomes.push(outcome);
155 if diverged {
156 break;
157 }
158 }
159 Ok(outcomes)
160 }
161
162 pub async fn verify_token(
164 &self,
165 token: &ReplayToken,
166 ) -> Result<ReplayOutcome, EffectInvokerError> {
167 let actual = self
168 .invoker
169 .invoke(
170 &token.effect_name,
171 &token.inputs,
172 &token.model_version,
173 &token.sampling,
174 )
175 .await?;
176 let actual_hash = canonical_hash(&actual);
177 let actual_hash_hex = hex(&actual_hash);
178 if actual_hash_hex == token.outputs_hash_hex {
179 Ok(ReplayOutcome::Match {
180 token_hash_hex: token.token_hash_hex.clone(),
181 })
182 } else {
183 Ok(ReplayOutcome::Diverged {
184 token_hash_hex: token.token_hash_hex.clone(),
185 divergence: ReplayDivergence {
186 expected_outputs_hash_hex: token.outputs_hash_hex.clone(),
187 actual_outputs_hash_hex: actual_hash_hex,
188 actual_outputs: actual,
189 },
190 })
191 }
192 }
193}
194
195fn hex(bytes: &[u8]) -> String {
196 let mut out = String::with_capacity(bytes.len() * 2);
197 for b in bytes {
198 out.push_str(&format!("{b:02x}"));
199 }
200 out
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206 use crate::replay_token::log::InMemoryReplayLog;
207 use crate::replay_token::token::{ReplayTokenBuilder, SamplingParams};
208 use chrono::{TimeZone, Utc};
209 use serde_json::json;
210
211 struct FixedInvoker {
212 returns: Value,
213 }
214
215 #[async_trait]
216 impl EffectInvoker for FixedInvoker {
217 async fn invoke(
218 &self,
219 _effect_name: &str,
220 _inputs: &Value,
221 _model_version: &str,
222 _sampling: &SamplingParams,
223 ) -> Result<Value, EffectInvokerError> {
224 Ok(self.returns.clone())
225 }
226 }
227
228 fn mk_token(effect: &str, outputs: Value) -> ReplayToken {
229 ReplayTokenBuilder::new()
230 .effect_name(effect)
231 .inputs(json!({"flow_id": "f1"}))
232 .outputs(outputs)
233 .model_version("axon.builtin.test.v1")
234 .sampling(SamplingParams::default())
235 .timestamp(
236 Utc.with_ymd_and_hms(2026, 4, 22, 12, 0, 0).unwrap(),
237 )
238 .nonce([0u8; 16])
239 .mint()
240 }
241
242 #[tokio::test]
243 async fn match_when_outputs_are_bit_identical() {
244 let log = InMemoryReplayLog::new();
245 let t = mk_token("call_tool:x", json!({"a": 1, "b": 2}));
246 log.append(t.clone()).await.unwrap();
247
248 let invoker = FixedInvoker {
249 returns: json!({"b": 2, "a": 1}), };
251 let executor = ReplayExecutor::new(log, invoker);
252 let outcome = executor.replay_token(&t.token_hash_hex).await.unwrap();
253 matches!(outcome, ReplayOutcome::Match { .. });
254 }
255
256 #[tokio::test]
257 async fn diverge_when_outputs_differ() {
258 let log = InMemoryReplayLog::new();
259 let t = mk_token("call_tool:x", json!({"a": 1}));
260 log.append(t.clone()).await.unwrap();
261
262 let invoker = FixedInvoker {
263 returns: json!({"a": 999}),
264 };
265 let executor = ReplayExecutor::new(log, invoker);
266 let outcome = executor.replay_token(&t.token_hash_hex).await.unwrap();
267 match outcome {
268 ReplayOutcome::Diverged { divergence, .. } => {
269 assert_eq!(
270 divergence.expected_outputs_hash_hex,
271 t.outputs_hash_hex
272 );
273 assert_ne!(
274 divergence.actual_outputs_hash_hex,
275 t.outputs_hash_hex
276 );
277 assert_eq!(divergence.actual_outputs, json!({"a": 999}));
278 }
279 other => panic!("expected Diverged, got {other:?}"),
280 }
281 }
282
283 #[tokio::test]
284 async fn replay_flow_short_circuits_at_first_divergence() {
285 let log = InMemoryReplayLog::new();
286 let t1 = mk_token("step1", json!({"x": 1}));
287 let t2 = mk_token("step2", json!({"x": 2}));
288 let t3 = mk_token("step3", json!({"x": 3}));
289 log.append(t1.clone()).await.unwrap();
290 log.append(t2.clone()).await.unwrap();
293 log.append(t3.clone()).await.unwrap();
294
295 let invoker = FixedInvoker {
296 returns: json!({"x": 1}),
297 };
298 let executor = ReplayExecutor::new(log, invoker);
299 let outcomes = executor.replay_flow("f1").await.unwrap();
300 assert_eq!(outcomes.len(), 2);
302 matches!(outcomes[0], ReplayOutcome::Match { .. });
303 matches!(outcomes[1], ReplayOutcome::Diverged { .. });
304 }
305}