Skip to main content

harness_loop/
replay.rs

1//! Session record + replay (DESIGN.md §15 v0.2+).
2//!
3//! Two halves:
4//! - [`SessionRecorder`] is a [`Hook`] that captures every lifecycle event
5//!   to a JSONL file. Wire it via `AgentLoop::with_hook` and you get a
6//!   complete trace of what the agent did.
7//! - [`read_session`] + [`replay_as_mock`] reconstruct a deterministic
8//!   `MockModel` from a recorded log so you can replay the run offline,
9//!   verify changes, or debug failures without rerunning against a real LLM.
10
11use harness_core::{
12    Action, CompactionStage, Event, Hook, HookOutcome, ModelOutput, ToolResult, World,
13};
14use serde::{Deserialize, Serialize};
15use std::fs::OpenOptions;
16use std::io::Write;
17use std::path::Path;
18use std::sync::Mutex;
19
20/// One event in the recorded session. Owned (no borrows) so it round-trips
21/// through serde without lifetime gymnastics.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(tag = "kind", rename_all = "snake_case")]
24pub enum SessionEvent {
25    Start {
26        ts_ms: i64,
27        source: String,
28    },
29    PreModel {
30        ts_ms: i64,
31        history_len: usize,
32        tools_count: usize,
33    },
34    PostModel {
35        ts_ms: i64,
36        output: ModelOutput,
37    },
38    PreTool {
39        ts_ms: i64,
40        action: Action,
41    },
42    PostTool {
43        ts_ms: i64,
44        call_id: String,
45        result: ToolResult,
46    },
47    Sensor {
48        ts_ms: i64,
49        id: String,
50        signals: usize,
51    },
52    PreCompact {
53        ts_ms: i64,
54        stage: CompactionStage,
55    },
56    PostCompact {
57        ts_ms: i64,
58        stage: CompactionStage,
59    },
60    Heartbeat {
61        ts_ms: i64,
62        iter: u32,
63    },
64    End {
65        ts_ms: i64,
66    },
67}
68
69/// Hook that serialises every relevant lifecycle event into a JSONL file.
70///
71/// Failures (locked mutex, I/O errors) are logged via `tracing::warn` but
72/// never panic — recording is a best-effort observability layer, not a
73/// correctness path.
74pub struct SessionRecorder {
75    file: Mutex<std::fs::File>,
76}
77
78impl SessionRecorder {
79    /// Open the file for append (creating it if needed).
80    pub fn new(path: &Path) -> std::io::Result<Self> {
81        if let Some(parent) = path.parent() {
82            std::fs::create_dir_all(parent)?;
83        }
84        let f = OpenOptions::new().create(true).append(true).open(path)?;
85        Ok(Self {
86            file: Mutex::new(f),
87        })
88    }
89
90    fn write(&self, ev: &SessionEvent) {
91        let Ok(mut f) = self.file.lock() else {
92            return;
93        };
94        match serde_json::to_string(ev) {
95            Ok(s) => {
96                if let Err(e) = writeln!(f, "{s}") {
97                    tracing::warn!(error=%e, "session recorder write failed");
98                }
99            }
100            Err(e) => tracing::warn!(error=%e, "session recorder serialize failed"),
101        }
102    }
103}
104
105impl Hook for SessionRecorder {
106    fn name(&self) -> &str {
107        "session-recorder"
108    }
109    fn matches(&self, _ev: &Event<'_>) -> bool {
110        true
111    }
112
113    fn fire(&self, ev: &Event<'_>, world: &mut World) -> HookOutcome {
114        let ts = world.clock.now_ms();
115        let session_ev = match ev {
116            Event::SessionStart { source } => Some(SessionEvent::Start {
117                ts_ms: ts,
118                source: format!("{source:?}"),
119            }),
120            Event::PreModel { ctx } => Some(SessionEvent::PreModel {
121                ts_ms: ts,
122                history_len: ctx.history.len(),
123                tools_count: ctx.tools.len(),
124            }),
125            Event::PostModel { out } => Some(SessionEvent::PostModel {
126                ts_ms: ts,
127                output: (*out).clone(),
128            }),
129            Event::PreToolUse { action } => Some(SessionEvent::PreTool {
130                ts_ms: ts,
131                action: (*action).clone(),
132            }),
133            Event::PostToolUse { action, result } => Some(SessionEvent::PostTool {
134                ts_ms: ts,
135                call_id: action.call_id.clone(),
136                result: (*result).clone(),
137            }),
138            Event::PostSensor { sensor, signals } => Some(SessionEvent::Sensor {
139                ts_ms: ts,
140                id: (*sensor).clone(),
141                signals: signals.len(),
142            }),
143            Event::PreCompact { stage } => Some(SessionEvent::PreCompact {
144                ts_ms: ts,
145                stage: *stage,
146            }),
147            Event::PostCompact { stage } => Some(SessionEvent::PostCompact {
148                ts_ms: ts,
149                stage: *stage,
150            }),
151            Event::Heartbeat { iter } => Some(SessionEvent::Heartbeat {
152                ts_ms: ts,
153                iter: *iter,
154            }),
155            Event::SessionEnd => Some(SessionEvent::End { ts_ms: ts }),
156            _ => None,
157        };
158        if let Some(e) = session_ev {
159            self.write(&e);
160        }
161        HookOutcome::Allow
162    }
163}
164
165/// Read a recorded JSONL session log back into memory.
166///
167/// Tolerates malformed lines (logged, skipped) so a partially-corrupted log
168/// still yields usable replay material.
169pub fn read_session(path: &Path) -> std::io::Result<Vec<SessionEvent>> {
170    let content = std::fs::read_to_string(path)?;
171    let mut events = Vec::new();
172    for (i, line) in content.lines().enumerate() {
173        let line = line.trim();
174        if line.is_empty() {
175            continue;
176        }
177        match serde_json::from_str(line) {
178            Ok(e) => events.push(e),
179            Err(err) => tracing::warn!(line=i+1, error=%err, "session log line skipped"),
180        }
181    }
182    Ok(events)
183}
184
185/// Build a [`harness_models::MockModel`] that returns each recorded
186/// `PostModel` output in order. Pair with a fresh `AgentLoop` to replay the
187/// run.
188pub fn replay_as_mock(events: &[SessionEvent]) -> harness_models::MockModel {
189    use harness_models::{MockModel, MockResponse};
190    let mut m = MockModel::new().with_name("replay");
191    for e in events {
192        if let SessionEvent::PostModel { output, .. } = e {
193            m = m.script(MockResponse {
194                text: output.text.clone(),
195                tool_calls: output.tool_calls.clone(),
196                stop_reason: output.stop_reason,
197                input_tokens: output.usage.input_tokens,
198                output_tokens: output.usage.output_tokens,
199                reasoning: output.reasoning.clone(),
200            });
201        }
202    }
203    m
204}
205
206/// Backwards-compatible alias.
207pub fn replay_as_mock_via_events(events: &[SessionEvent]) -> harness_models::MockModel {
208    replay_as_mock(events)
209}
210
211/// Stats from a single session — handy summary for the `harness trace` CLI.
212#[derive(Debug, Clone, Default)]
213pub struct SessionStats {
214    pub events: usize,
215    pub model_calls: usize,
216    pub tool_calls: usize,
217    pub iters: u32,
218    pub input_tokens: u32,
219    pub output_tokens: u32,
220    pub stages_run: usize,
221    pub duration_ms: i64,
222}
223
224impl SessionStats {
225    pub fn from(events: &[SessionEvent]) -> Self {
226        let mut s = Self {
227            events: events.len(),
228            ..Default::default()
229        };
230        let mut first_ts: Option<i64> = None;
231        let mut last_ts: Option<i64> = None;
232        for e in events {
233            let ts = match e {
234                SessionEvent::Start { ts_ms, .. }
235                | SessionEvent::PreModel { ts_ms, .. }
236                | SessionEvent::PostModel { ts_ms, .. }
237                | SessionEvent::PreTool { ts_ms, .. }
238                | SessionEvent::PostTool { ts_ms, .. }
239                | SessionEvent::Sensor { ts_ms, .. }
240                | SessionEvent::PreCompact { ts_ms, .. }
241                | SessionEvent::PostCompact { ts_ms, .. }
242                | SessionEvent::Heartbeat { ts_ms, .. }
243                | SessionEvent::End { ts_ms } => *ts_ms,
244            };
245            if first_ts.is_none() {
246                first_ts = Some(ts);
247            }
248            last_ts = Some(ts);
249
250            match e {
251                SessionEvent::PostModel { output, .. } => {
252                    s.model_calls += 1;
253                    s.input_tokens += output.usage.input_tokens;
254                    s.output_tokens += output.usage.output_tokens;
255                }
256                SessionEvent::PreTool { .. } => s.tool_calls += 1,
257                SessionEvent::PostCompact { .. } => s.stages_run += 1,
258                SessionEvent::Heartbeat { iter, .. } => s.iters = s.iters.max(*iter + 1),
259                _ => {}
260            }
261        }
262        s.duration_ms = match (first_ts, last_ts) {
263            (Some(a), Some(b)) => b - a,
264            _ => 0,
265        };
266        s
267    }
268}
269
270/// Tiny helper used by the CLI: convert a single event to a single line of
271/// pretty-printed text (does NOT include the timestamp prefix).
272pub fn format_event_short(e: &SessionEvent) -> String {
273    match e {
274        SessionEvent::Start { source, .. } => format!("session start ({source})"),
275        SessionEvent::Heartbeat { iter, .. } => format!("iter {iter}"),
276        SessionEvent::PreModel {
277            history_len,
278            tools_count,
279            ..
280        } => {
281            format!("→ model (history={history_len}, tools={tools_count})")
282        }
283        SessionEvent::PostModel { output, .. } => {
284            let calls = output.tool_calls.len();
285            let txt = output
286                .text
287                .as_deref()
288                .unwrap_or("")
289                .chars()
290                .take(60)
291                .collect::<String>();
292            if calls > 0 {
293                format!(
294                    "← model: {} tool_call(s) [{}/{} tok]",
295                    calls, output.usage.input_tokens, output.usage.output_tokens
296                )
297            } else {
298                format!(
299                    "← model: {:?} [{}/{} tok]",
300                    txt, output.usage.input_tokens, output.usage.output_tokens
301                )
302            }
303        }
304        SessionEvent::PreTool { action, .. } => {
305            format!("  → tool {} args={}", action.tool, action.args)
306        }
307        SessionEvent::PostTool {
308            call_id, result, ..
309        } => {
310            format!("  ← tool {} ok={}", call_id, result.ok)
311        }
312        SessionEvent::Sensor { id, signals, .. } => format!("  ⚑ sensor {id}: {signals} signal(s)"),
313        SessionEvent::PreCompact { stage, .. } => format!("  ⇩ pre-compact {stage:?}"),
314        SessionEvent::PostCompact { stage, .. } => format!("  ⇧ post-compact {stage:?}"),
315        SessionEvent::End { .. } => "session end".into(),
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322
323    fn sample_log() -> Vec<SessionEvent> {
324        vec![
325            SessionEvent::Start {
326                ts_ms: 0,
327                source: "Startup".into(),
328            },
329            SessionEvent::Heartbeat { ts_ms: 1, iter: 0 },
330            SessionEvent::PreModel {
331                ts_ms: 2,
332                history_len: 1,
333                tools_count: 3,
334            },
335            SessionEvent::PostModel {
336                ts_ms: 100,
337                output: ModelOutput {
338                    text: Some("hi".into()),
339                    tool_calls: Vec::new(),
340                    usage: Default::default(),
341                    stop_reason: harness_core::StopReason::EndTurn,
342                    reasoning: None,
343                },
344            },
345            SessionEvent::End { ts_ms: 110 },
346        ]
347    }
348
349    #[test]
350    fn stats_compute_correctly() {
351        let s = SessionStats::from(&sample_log());
352        assert_eq!(s.events, 5);
353        assert_eq!(s.model_calls, 1);
354        assert_eq!(s.iters, 1);
355        assert_eq!(s.duration_ms, 110);
356    }
357
358    #[test]
359    fn round_trip_via_serde() {
360        let original = sample_log();
361        let json: Vec<String> = original
362            .iter()
363            .map(|e| serde_json::to_string(e).unwrap())
364            .collect();
365        let parsed: Vec<SessionEvent> = json
366            .iter()
367            .map(|s| serde_json::from_str::<SessionEvent>(s).unwrap())
368            .collect();
369        assert_eq!(parsed.len(), original.len());
370        assert!(
371            matches!(parsed[3], SessionEvent::PostModel { ref output, .. } if output.text.as_deref() == Some("hi"))
372        );
373    }
374}