Skip to main content

shadow_core/replay/
engine.rs

1//! Replay engine.
2//!
3//! Given a baseline trace and an [`LlmBackend`], produce a candidate trace
4//! with the same request order but LLM-derived responses freshly generated
5//! by the backend. See SPEC §10 for the algorithm.
6
7use serde_json::json;
8use thiserror::Error;
9
10use crate::agentlog::{Kind, Record};
11use crate::replay::backend::{LlmBackend, LlmError};
12
13/// Errors from [`run_replay`].
14#[derive(Debug, Error)]
15pub enum ReplayError {
16    /// The baseline trace is empty — no root metadata record.
17    #[error("baseline trace is empty\nhint: a baseline trace must start with a metadata record (SPEC §3.3)")]
18    EmptyBaseline,
19
20    /// The baseline trace does not start with a `metadata` record.
21    #[error("baseline trace root is {found:?}, expected Metadata\nhint: SPEC §3.3 requires the first record to be of kind metadata")]
22    BadBaselineRoot {
23        /// The actually-encountered kind.
24        found: Kind,
25    },
26
27    /// The backend failed on at least one request.
28    #[error("backend error: {0}")]
29    Backend(#[from] LlmError),
30}
31
32/// Clock abstraction so tests can pin timestamps.
33pub trait Clock: Send + Sync {
34    /// Return the current RFC 3339 UTC timestamp string (millisecond precision).
35    fn now_iso(&self) -> String;
36}
37
38/// A clock that returns a fixed incrementing counter. Used by tests.
39pub struct FixedClock {
40    base: String,
41    counter: std::sync::atomic::AtomicU64,
42}
43
44impl FixedClock {
45    /// Build a clock that returns `"<base>#<n>"` for each call.
46    pub fn new(base: impl Into<String>) -> Self {
47        Self {
48            base: base.into(),
49            counter: std::sync::atomic::AtomicU64::new(0),
50        }
51    }
52}
53
54impl Clock for FixedClock {
55    fn now_iso(&self) -> String {
56        let n = self
57            .counter
58            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
59        format!("{}#{n}", self.base)
60    }
61}
62
63/// Run a replay: walk `baseline`, dispatch every `chat_request` to `backend`,
64/// and produce a fresh trace with the same structure.
65///
66/// Algorithm (SPEC §10.1):
67/// 1. Emit a new `metadata` record with `parent = None` and an envelope
68///    `meta.baseline_of = baseline_root_id`.
69/// 2. For each baseline record in file order:
70///    - `chat_request`: re-emit with a fresh ts and parent = previous
71///      output record id; then call `backend.complete(request.payload)`
72///      and emit a `chat_response` whose parent = the re-emitted request.
73///    - `tool_call`, `tool_result`, `error`: copy-through with fresh ts
74///      and relinked parent.
75///    - `chat_response`, `metadata`, `replay_summary`: skipped (the
76///      backend produces responses; replay_summary is added at the end;
77///      a baseline can only have one metadata record and it's the root).
78/// 3. Emit a `replay_summary` at the end.
79pub async fn run_replay<B: LlmBackend + ?Sized>(
80    baseline: &[Record],
81    backend: &B,
82    clock: &dyn Clock,
83) -> Result<Vec<Record>, ReplayError> {
84    let baseline_root = baseline.first().ok_or(ReplayError::EmptyBaseline)?;
85    if baseline_root.kind != Kind::Metadata {
86        return Err(ReplayError::BadBaselineRoot {
87            found: baseline_root.kind,
88        });
89    }
90
91    let mut out = Vec::with_capacity(baseline.len() + 1);
92
93    // 1. New metadata root with baseline_of pointer.
94    let meta_payload = if let Some(obj) = baseline_root.payload.as_object() {
95        let mut new_payload = obj.clone();
96        new_payload.insert(
97            "baseline_of".to_string(),
98            serde_json::Value::String(baseline_root.id.clone()),
99        );
100        serde_json::Value::Object(new_payload)
101    } else {
102        json!({ "baseline_of": baseline_root.id })
103    };
104    let new_root = Record::new(Kind::Metadata, meta_payload, clock.now_iso(), None);
105    let mut last_parent = new_root.id.clone();
106    out.push(new_root);
107
108    let mut input_count: u64 = 0;
109    let mut output_count: u64 = 0;
110    let mut error_count: u64 = 0;
111    let start = std::time::Instant::now();
112
113    // 2. Walk baseline.
114    for (i, record) in baseline.iter().enumerate() {
115        match record.kind {
116            Kind::Metadata => {
117                if i == 0 {
118                    continue; // root already handled
119                }
120                // Multiple metadata records are an invariant violation, but
121                // SPEC §3.3 already forbids this; we defensively copy-through.
122                let copy = Record::new(
123                    Kind::Metadata,
124                    record.payload.clone(),
125                    clock.now_iso(),
126                    Some(last_parent.clone()),
127                );
128                last_parent = copy.id.clone();
129                out.push(copy);
130            }
131            Kind::ChatRequest => {
132                input_count += 1;
133                let req = Record::new(
134                    Kind::ChatRequest,
135                    record.payload.clone(),
136                    clock.now_iso(),
137                    Some(last_parent.clone()),
138                );
139                let req_id = req.id.clone();
140                out.push(req);
141                match backend.complete(&record.payload).await {
142                    Ok(response_payload) => {
143                        let resp = Record::new(
144                            Kind::ChatResponse,
145                            response_payload,
146                            clock.now_iso(),
147                            Some(req_id.clone()),
148                        );
149                        last_parent = resp.id.clone();
150                        out.push(resp);
151                        output_count += 1;
152                    }
153                    Err(e) => {
154                        error_count += 1;
155                        let err = Record::new(
156                            Kind::Error,
157                            json!({
158                                "source": "llm",
159                                "code": "backend_error",
160                                "message": e.to_string(),
161                                "retriable": matches!(e, LlmError::Io(_)),
162                            }),
163                            clock.now_iso(),
164                            Some(req_id.clone()),
165                        );
166                        last_parent = err.id.clone();
167                        out.push(err);
168                    }
169                }
170            }
171            Kind::ChatResponse => {
172                // Baseline responses are discarded; the backend produces
173                // the candidate response. (Technically the baseline trace
174                // has a chat_response for every chat_request; skipping
175                // here keeps the output one-response-per-request.)
176                continue;
177            }
178            Kind::ToolCall | Kind::ToolResult | Kind::Error => {
179                let copy = Record::new(
180                    record.kind,
181                    record.payload.clone(),
182                    clock.now_iso(),
183                    Some(last_parent.clone()),
184                );
185                last_parent = copy.id.clone();
186                out.push(copy);
187            }
188            // v0.2 record kinds: chunk / harness_event / blob_ref are
189            // first-class but the chat-replay engine doesn't synthesize
190            // them itself. The Python-side replay loop handles streams
191            // and harness events; here we copy-through so a baseline
192            // trace that contains them survives a `shadow replay` pass
193            // without losing data.
194            Kind::Chunk | Kind::HarnessEvent | Kind::BlobRef => {
195                let copy = Record::new(
196                    record.kind,
197                    record.payload.clone(),
198                    clock.now_iso(),
199                    Some(last_parent.clone()),
200                );
201                last_parent = copy.id.clone();
202                out.push(copy);
203            }
204            Kind::ReplaySummary => continue, // never copy-through
205        }
206    }
207
208    // 3. Replay summary.
209    let duration_ms = start.elapsed().as_millis() as u64;
210    let baseline_id = baseline_root.id.clone();
211    let summary = Record::new(
212        Kind::ReplaySummary,
213        json!({
214            "baseline_trace_id": baseline_id,
215            "backend_id": backend.id(),
216            "input_count": input_count,
217            "output_count": output_count,
218            "error_count": error_count,
219            "duration_ms": duration_ms,
220        }),
221        clock.now_iso(),
222        Some(last_parent),
223    );
224    out.push(summary);
225
226    Ok(out)
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use crate::replay::mock::MockLlm;
233    use serde_json::json;
234
235    fn baseline_trace() -> Vec<Record> {
236        let meta = Record::new(
237            Kind::Metadata,
238            json!({"sdk": {"name": "shadow", "version": "0.1.0"}, "tags": {"env": "demo"}}),
239            "2026-04-21T10:00:00Z",
240            None,
241        );
242        let req = Record::new(
243            Kind::ChatRequest,
244            json!({"model": "claude-opus-4-7", "messages": [{"role": "user", "content": "hi"}], "params": {}}),
245            "2026-04-21T10:00:00.100Z",
246            Some(meta.id.clone()),
247        );
248        let resp = Record::new(
249            Kind::ChatResponse,
250            json!({"model": "claude-opus-4-7", "content": [{"text": "Hello!", "type": "text"}], "stop_reason": "end_turn", "latency_ms": 100, "usage": {"input_tokens": 5, "output_tokens": 3, "thinking_tokens": 0}}),
251            "2026-04-21T10:00:00.500Z",
252            Some(req.id.clone()),
253        );
254        vec![meta, req, resp]
255    }
256
257    #[tokio::test]
258    async fn happy_path_produces_parallel_structure() {
259        let baseline = baseline_trace();
260        let backend = MockLlm::from_trace(&baseline);
261        let clock = FixedClock::new("2026-04-22T00:00:00Z");
262
263        let candidate = run_replay(&baseline, &backend, &clock).await.unwrap();
264
265        // Expected structure: metadata, chat_request, chat_response, replay_summary.
266        assert_eq!(candidate.len(), 4);
267        assert_eq!(candidate[0].kind, Kind::Metadata);
268        assert_eq!(candidate[1].kind, Kind::ChatRequest);
269        assert_eq!(candidate[2].kind, Kind::ChatResponse);
270        assert_eq!(candidate[3].kind, Kind::ReplaySummary);
271    }
272
273    #[tokio::test]
274    async fn parent_chain_is_monotonically_linked() {
275        let baseline = baseline_trace();
276        let backend = MockLlm::from_trace(&baseline);
277        let clock = FixedClock::new("ts");
278        let candidate = run_replay(&baseline, &backend, &clock).await.unwrap();
279
280        // First record is root (parent=None).
281        assert!(candidate[0].parent.is_none());
282        // Every subsequent record's parent must be an earlier record's id.
283        let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
284        seen.insert(candidate[0].id.clone());
285        for r in &candidate[1..] {
286            let parent = r.parent.as_ref().expect("non-root must have parent");
287            assert!(
288                seen.contains(parent),
289                "unknown parent {parent} for {:?}",
290                r.kind
291            );
292            seen.insert(r.id.clone());
293        }
294    }
295
296    #[tokio::test]
297    async fn baseline_of_pointer_survives_in_candidate_metadata() {
298        let baseline = baseline_trace();
299        let backend = MockLlm::from_trace(&baseline);
300        let clock = FixedClock::new("ts");
301        let candidate = run_replay(&baseline, &backend, &clock).await.unwrap();
302
303        let meta = &candidate[0];
304        let baseline_of = meta.payload.get("baseline_of").and_then(|v| v.as_str());
305        assert_eq!(baseline_of, Some(baseline[0].id.as_str()));
306    }
307
308    #[tokio::test]
309    async fn summary_reports_correct_counts() {
310        let baseline = baseline_trace();
311        let backend = MockLlm::from_trace(&baseline);
312        let clock = FixedClock::new("ts");
313        let candidate = run_replay(&baseline, &backend, &clock).await.unwrap();
314
315        let summary = candidate.last().unwrap();
316        assert_eq!(summary.kind, Kind::ReplaySummary);
317        assert_eq!(
318            summary.payload.get("input_count").unwrap().as_u64(),
319            Some(1)
320        );
321        assert_eq!(
322            summary.payload.get("output_count").unwrap().as_u64(),
323            Some(1)
324        );
325        assert_eq!(
326            summary.payload.get("error_count").unwrap().as_u64(),
327            Some(0)
328        );
329        assert_eq!(
330            summary.payload.get("backend_id").unwrap().as_str(),
331            Some("mock")
332        );
333    }
334
335    #[tokio::test]
336    async fn empty_baseline_errors() {
337        let backend = MockLlm::from_trace(&[]);
338        let clock = FixedClock::new("ts");
339        let err = run_replay(&[], &backend, &clock).await.unwrap_err();
340        assert!(matches!(err, ReplayError::EmptyBaseline));
341    }
342
343    #[tokio::test]
344    async fn non_metadata_root_errors() {
345        let req = Record::new(
346            Kind::ChatRequest,
347            json!({"model": "x"}),
348            "2026-04-21T10:00:00Z",
349            None,
350        );
351        let backend = MockLlm::from_trace(&[]);
352        let clock = FixedClock::new("ts");
353        let err = run_replay(&[req], &backend, &clock).await.unwrap_err();
354        match err {
355            ReplayError::BadBaselineRoot { found } => assert_eq!(found, Kind::ChatRequest),
356            other => panic!("expected BadBaselineRoot, got {other:?}"),
357        }
358    }
359
360    #[tokio::test]
361    async fn missing_response_is_captured_as_error_record() {
362        // Baseline has two requests; mock only knows about one.
363        let mut baseline = baseline_trace();
364        // Add a second request+response, then drop the response.
365        let extra_req = Record::new(
366            Kind::ChatRequest,
367            json!({"model": "claude-opus-4-7", "messages": [{"role": "user", "content": "second"}], "params": {}}),
368            "2026-04-21T10:01:00Z",
369            Some(baseline[2].id.clone()),
370        );
371        baseline.push(extra_req);
372
373        let backend = MockLlm::from_trace(&baseline); // only first req has a response
374        let clock = FixedClock::new("ts");
375        let candidate = run_replay(&baseline, &backend, &clock).await.unwrap();
376
377        // Count error records in the candidate.
378        let errors: Vec<_> = candidate.iter().filter(|r| r.kind == Kind::Error).collect();
379        assert_eq!(errors.len(), 1);
380        assert_eq!(
381            errors[0].payload.get("source").and_then(|v| v.as_str()),
382            Some("llm")
383        );
384        // Summary counts the error.
385        let summary = candidate.last().unwrap();
386        assert_eq!(
387            summary.payload.get("error_count").unwrap().as_u64(),
388            Some(1)
389        );
390        assert_eq!(
391            summary.payload.get("input_count").unwrap().as_u64(),
392            Some(2)
393        );
394    }
395
396    #[tokio::test]
397    async fn tool_records_are_copied_through() {
398        let meta = Record::new(
399            Kind::Metadata,
400            json!({"sdk": {"name": "shadow"}}),
401            "2026-04-21T10:00:00Z",
402            None,
403        );
404        let req = Record::new(
405            Kind::ChatRequest,
406            json!({"model": "x", "messages": [], "params": {}}),
407            "2026-04-21T10:00:00.100Z",
408            Some(meta.id.clone()),
409        );
410        let resp = Record::new(
411            Kind::ChatResponse,
412            json!({"model": "x", "content": [], "stop_reason": "tool_use", "latency_ms": 1, "usage": {"input_tokens": 1, "output_tokens": 1, "thinking_tokens": 0}}),
413            "2026-04-21T10:00:00.500Z",
414            Some(req.id.clone()),
415        );
416        let tool_call = Record::new(
417            Kind::ToolCall,
418            json!({"tool_name": "search", "tool_call_id": "t1", "arguments": {}}),
419            "2026-04-21T10:00:00.600Z",
420            Some(resp.id.clone()),
421        );
422        let tool_result = Record::new(
423            Kind::ToolResult,
424            json!({"tool_call_id": "t1", "output": "done", "is_error": false, "latency_ms": 10}),
425            "2026-04-21T10:00:00.700Z",
426            Some(tool_call.id.clone()),
427        );
428        let baseline = vec![meta, req, resp, tool_call, tool_result];
429        let backend = MockLlm::from_trace(&baseline);
430        let clock = FixedClock::new("ts");
431        let candidate = run_replay(&baseline, &backend, &clock).await.unwrap();
432        let kinds: Vec<Kind> = candidate.iter().map(|r| r.kind).collect();
433        assert_eq!(
434            kinds,
435            vec![
436                Kind::Metadata,
437                Kind::ChatRequest,
438                Kind::ChatResponse,
439                Kind::ToolCall,
440                Kind::ToolResult,
441                Kind::ReplaySummary,
442            ]
443        );
444    }
445}