Skip to main content

harness_loop/
replay.rs

1//! Session record + replay (DESIGN.md §15 v0.2+).
2//!
3//! Two halves:
4//! - [`SessionRecorder`] is a [`Hook`] that captures every lifecycle event
5//!   to a JSONL file. Wire it via `AgentLoop::with_hook` and you get a
6//!   complete trace of what the agent did.
7//! - [`read_session`] + [`replay_as_mock`] reconstruct a deterministic
8//!   `MockModel` from a recorded log so you can replay the run offline,
9//!   verify changes, or debug failures without rerunning against a real LLM.
10
11use harness_core::{
12    Action, CompactionStage, Event, Hook, HookOutcome, ModelOutput, ToolResult, World,
13};
14use serde::{Deserialize, Serialize};
15use std::fs::OpenOptions;
16use std::io::Write;
17use std::path::Path;
18use std::sync::Mutex;
19
20/// One event in the recorded session. Owned (no borrows) so it round-trips
21/// through serde without lifetime gymnastics.
22#[derive(Debug, Clone, Serialize, Deserialize)]
23#[serde(tag = "kind", rename_all = "snake_case")]
24pub enum SessionEvent {
25    Start {
26        ts_ms: i64,
27        source: String,
28    },
29    PreModel {
30        ts_ms: i64,
31        history_len: usize,
32        tools_count: usize,
33    },
34    PostModel {
35        ts_ms: i64,
36        output: ModelOutput,
37    },
38    PreTool {
39        ts_ms: i64,
40        action: Action,
41    },
42    PostTool {
43        ts_ms: i64,
44        call_id: String,
45        result: ToolResult,
46    },
47    Sensor {
48        ts_ms: i64,
49        id: String,
50        signals: usize,
51    },
52    PreCompact {
53        ts_ms: i64,
54        stage: CompactionStage,
55    },
56    PostCompact {
57        ts_ms: i64,
58        stage: CompactionStage,
59    },
60    Heartbeat {
61        ts_ms: i64,
62        iter: u32,
63    },
64    /// Budget ratio crossed a high-water threshold. Currently the loop only
65    /// fires this once, at the moment the iteration budget is exhausted and
66    /// the forced final-synthesis pass is about to run.
67    BudgetWarning {
68        ts_ms: i64,
69        ratio: f32,
70    },
71    End {
72        ts_ms: i64,
73    },
74}
75
76/// Hook that serialises every relevant lifecycle event into a JSONL file.
77///
78/// Failures (locked mutex, I/O errors) are logged via `tracing::warn` but
79/// never panic — recording is a best-effort observability layer, not a
80/// correctness path.
81pub struct SessionRecorder {
82    file: Mutex<std::fs::File>,
83}
84
85impl SessionRecorder {
86    /// Open the file for append (creating it if needed).
87    pub fn new(path: &Path) -> std::io::Result<Self> {
88        if let Some(parent) = path.parent() {
89            std::fs::create_dir_all(parent)?;
90        }
91        let f = OpenOptions::new().create(true).append(true).open(path)?;
92        Ok(Self {
93            file: Mutex::new(f),
94        })
95    }
96
97    fn write(&self, ev: &SessionEvent) {
98        let Ok(mut f) = self.file.lock() else {
99            return;
100        };
101        match serde_json::to_string(ev) {
102            Ok(s) => {
103                if let Err(e) = writeln!(f, "{s}") {
104                    tracing::warn!(error=%e, "session recorder write failed");
105                }
106            }
107            Err(e) => tracing::warn!(error=%e, "session recorder serialize failed"),
108        }
109    }
110}
111
112impl Hook for SessionRecorder {
113    fn name(&self) -> &str {
114        "session-recorder"
115    }
116    fn matches(&self, _ev: &Event<'_>) -> bool {
117        true
118    }
119
120    fn fire(&self, ev: &Event<'_>, world: &mut World) -> HookOutcome {
121        let ts = world.clock.now_ms();
122        let session_ev = match ev {
123            Event::SessionStart { source } => Some(SessionEvent::Start {
124                ts_ms: ts,
125                source: format!("{source:?}"),
126            }),
127            Event::PreModel { ctx } => Some(SessionEvent::PreModel {
128                ts_ms: ts,
129                history_len: ctx.history.len(),
130                tools_count: ctx.tools.len(),
131            }),
132            Event::PostModel { out } => Some(SessionEvent::PostModel {
133                ts_ms: ts,
134                output: (*out).clone(),
135            }),
136            Event::PreToolUse { action } => Some(SessionEvent::PreTool {
137                ts_ms: ts,
138                action: (*action).clone(),
139            }),
140            Event::PostToolUse { action, result } => Some(SessionEvent::PostTool {
141                ts_ms: ts,
142                call_id: action.call_id.clone(),
143                result: (*result).clone(),
144            }),
145            Event::PostSensor { sensor, signals } => Some(SessionEvent::Sensor {
146                ts_ms: ts,
147                id: (*sensor).clone(),
148                signals: signals.len(),
149            }),
150            Event::PreCompact { stage } => Some(SessionEvent::PreCompact {
151                ts_ms: ts,
152                stage: *stage,
153            }),
154            Event::PostCompact { stage } => Some(SessionEvent::PostCompact {
155                ts_ms: ts,
156                stage: *stage,
157            }),
158            Event::Heartbeat { iter } => Some(SessionEvent::Heartbeat {
159                ts_ms: ts,
160                iter: *iter,
161            }),
162            Event::BudgetWarning { ratio } => Some(SessionEvent::BudgetWarning {
163                ts_ms: ts,
164                ratio: *ratio,
165            }),
166            Event::SessionEnd => Some(SessionEvent::End { ts_ms: ts }),
167            _ => None,
168        };
169        if let Some(e) = session_ev {
170            self.write(&e);
171        }
172        HookOutcome::Allow
173    }
174}
175
176/// Read a recorded JSONL session log back into memory.
177///
178/// Tolerates malformed lines (logged, skipped) so a partially-corrupted log
179/// still yields usable replay material.
180pub fn read_session(path: &Path) -> std::io::Result<Vec<SessionEvent>> {
181    let content = std::fs::read_to_string(path)?;
182    let mut events = Vec::new();
183    for (i, line) in content.lines().enumerate() {
184        let line = line.trim();
185        if line.is_empty() {
186            continue;
187        }
188        match serde_json::from_str(line) {
189            Ok(e) => events.push(e),
190            Err(err) => tracing::warn!(line=i+1, error=%err, "session log line skipped"),
191        }
192    }
193    Ok(events)
194}
195
196/// Build a [`harness_models::MockModel`] that returns each recorded
197/// `PostModel` output in order. Pair with a fresh `AgentLoop` to replay the
198/// run.
199pub fn replay_as_mock(events: &[SessionEvent]) -> harness_models::MockModel {
200    use harness_models::{MockModel, MockResponse};
201    let mut m = MockModel::new().with_name("replay");
202    for e in events {
203        if let SessionEvent::PostModel { output, .. } = e {
204            m = m.script(MockResponse {
205                text: output.text.clone(),
206                tool_calls: output.tool_calls.clone(),
207                stop_reason: output.stop_reason,
208                input_tokens: output.usage.input_tokens,
209                output_tokens: output.usage.output_tokens,
210                reasoning: output.reasoning.clone(),
211            });
212        }
213    }
214    m
215}
216
217/// Backwards-compatible alias.
218pub fn replay_as_mock_via_events(events: &[SessionEvent]) -> harness_models::MockModel {
219    replay_as_mock(events)
220}
221
222/// Stats from a single session — handy summary for the `harness trace` CLI.
223#[derive(Debug, Clone, Default)]
224pub struct SessionStats {
225    pub events: usize,
226    pub model_calls: usize,
227    pub tool_calls: usize,
228    pub iters: u32,
229    pub input_tokens: u32,
230    pub output_tokens: u32,
231    pub stages_run: usize,
232    pub duration_ms: i64,
233}
234
235impl SessionStats {
236    pub fn from(events: &[SessionEvent]) -> Self {
237        let mut s = Self {
238            events: events.len(),
239            ..Default::default()
240        };
241        let mut first_ts: Option<i64> = None;
242        let mut last_ts: Option<i64> = None;
243        for e in events {
244            let ts = match e {
245                SessionEvent::Start { ts_ms, .. }
246                | SessionEvent::PreModel { ts_ms, .. }
247                | SessionEvent::PostModel { ts_ms, .. }
248                | SessionEvent::PreTool { ts_ms, .. }
249                | SessionEvent::PostTool { ts_ms, .. }
250                | SessionEvent::Sensor { ts_ms, .. }
251                | SessionEvent::PreCompact { ts_ms, .. }
252                | SessionEvent::PostCompact { ts_ms, .. }
253                | SessionEvent::Heartbeat { ts_ms, .. }
254                | SessionEvent::BudgetWarning { ts_ms, .. }
255                | SessionEvent::End { ts_ms } => *ts_ms,
256            };
257            if first_ts.is_none() {
258                first_ts = Some(ts);
259            }
260            last_ts = Some(ts);
261
262            match e {
263                SessionEvent::PostModel { output, .. } => {
264                    s.model_calls += 1;
265                    s.input_tokens += output.usage.input_tokens;
266                    s.output_tokens += output.usage.output_tokens;
267                }
268                SessionEvent::PreTool { .. } => s.tool_calls += 1,
269                SessionEvent::PostCompact { .. } => s.stages_run += 1,
270                SessionEvent::Heartbeat { iter, .. } => s.iters = s.iters.max(*iter + 1),
271                _ => {}
272            }
273        }
274        s.duration_ms = match (first_ts, last_ts) {
275            (Some(a), Some(b)) => b - a,
276            _ => 0,
277        };
278        s
279    }
280}
281
282/// Multi-line, content-rich version of [`format_event_short`].
283///
284/// Surfaces what `format_event_short` hides: model text, full tool args,
285/// tool result preview, and failure reasons. Used by `harness trace --verbose`
286/// and by [`LiveProgressHook`] so operators can actually see what their agent
287/// is doing instead of guessing from `ok=false`.
288pub fn format_event_verbose(e: &SessionEvent) -> String {
289    match e {
290        SessionEvent::Start { source, .. } => format!("session start ({source})"),
291        SessionEvent::Heartbeat { iter, .. } => format!("iter {iter}"),
292        SessionEvent::PreModel {
293            history_len,
294            tools_count,
295            ..
296        } => format!("→ model (history={history_len}, tools={tools_count})"),
297        SessionEvent::PostModel { output, .. } => {
298            let mut out = format!(
299                "← model: {} tool_call(s) [{}/{} tok, stop={:?}]",
300                output.tool_calls.len(),
301                output.usage.input_tokens,
302                output.usage.output_tokens,
303                output.stop_reason,
304            );
305            if let Some(text) = output.text.as_deref().filter(|s| !s.is_empty()) {
306                out.push_str("\n  text: ");
307                out.push_str(&truncate(text, 400));
308            }
309            if let Some(reasoning) = output.reasoning.as_deref().filter(|s| !s.is_empty()) {
310                out.push_str("\n  reasoning: ");
311                out.push_str(&truncate(reasoning, 200));
312            }
313            out
314        }
315        SessionEvent::PreTool { action, .. } => {
316            let args = action.args.to_string();
317            format!("  → tool {} args={}", action.tool, truncate(&args, 240))
318        }
319        SessionEvent::PostTool {
320            call_id, result, ..
321        } => {
322            let preview = preview_tool_result(result);
323            format!(
324                "  ← tool {} ok={} {}",
325                call_id,
326                result.ok,
327                if preview.is_empty() {
328                    String::new()
329                } else {
330                    format!("\n      {preview}")
331                }
332            )
333        }
334        SessionEvent::Sensor { id, signals, .. } => {
335            format!("  ⚑ sensor {id}: {signals} signal(s)")
336        }
337        SessionEvent::PreCompact { stage, .. } => format!("  ⇩ pre-compact {stage:?}"),
338        SessionEvent::PostCompact { stage, .. } => format!("  ⇧ post-compact {stage:?}"),
339        SessionEvent::BudgetWarning { ratio, .. } => {
340            if *ratio >= 1.0 {
341                "≫ budget exhausted — forcing tool-less final-synthesis pass".into()
342            } else {
343                format!("≫ budget warning (used {:.0}%)", ratio * 100.0)
344            }
345        }
346        SessionEvent::End { .. } => "session end".into(),
347    }
348}
349
350/// Pull the most actionable text out of a [`ToolResult`] for human display.
351///
352/// For failures, prefer `errors`/`hint`/`message` keys if the tool returned a
353/// structured JSON payload (the multi-engine search tool in `investor-bot`
354/// follows this convention). Falls back to a truncated JSON dump.
355fn preview_tool_result(r: &ToolResult) -> String {
356    let v = &r.content;
357    if !r.ok {
358        // Try the common error-shape conventions first.
359        if let Some(errors) = v.get("errors").and_then(|x| x.as_array()) {
360            let joined: Vec<String> = errors
361                .iter()
362                .filter_map(|e| e.as_str().map(String::from))
363                .collect();
364            if !joined.is_empty() {
365                let hint = v
366                    .get("hint")
367                    .and_then(|x| x.as_str())
368                    .map(|h| format!(" | hint: {h}"))
369                    .unwrap_or_default();
370                return format!("errors=[{}]{hint}", truncate(&joined.join("; "), 240));
371            }
372        }
373        if let Some(msg) = v.get("message").and_then(|x| x.as_str()) {
374            return format!("message={}", truncate(msg, 240));
375        }
376        if let Some(err) = v.get("error").and_then(|x| x.as_str()) {
377            return format!("error={}", truncate(err, 240));
378        }
379    }
380    // Generic preview: serialize, trim, truncate.
381    let s = v.to_string();
382    if s == "null" || s == "{}" {
383        String::new()
384    } else {
385        format!("preview={}", truncate(&s, 240))
386    }
387}
388
389fn truncate(s: &str, max: usize) -> String {
390    // Char-wise truncation so we don't bisect a multibyte sequence.
391    let chars: Vec<char> = s.chars().collect();
392    if chars.len() <= max {
393        s.replace('\n', " ⏎ ")
394    } else {
395        let head: String = chars[..max].iter().collect();
396        format!(
397            "{}… ({} chars total)",
398            head.replace('\n', " ⏎ "),
399            chars.len()
400        )
401    }
402}
403
404/// Tiny helper used by the CLI: convert a single event to a single line of
405/// pretty-printed text (does NOT include the timestamp prefix).
406pub fn format_event_short(e: &SessionEvent) -> String {
407    match e {
408        SessionEvent::Start { source, .. } => format!("session start ({source})"),
409        SessionEvent::Heartbeat { iter, .. } => format!("iter {iter}"),
410        SessionEvent::PreModel {
411            history_len,
412            tools_count,
413            ..
414        } => {
415            format!("→ model (history={history_len}, tools={tools_count})")
416        }
417        SessionEvent::PostModel { output, .. } => {
418            let calls = output.tool_calls.len();
419            let txt = output
420                .text
421                .as_deref()
422                .unwrap_or("")
423                .chars()
424                .take(60)
425                .collect::<String>();
426            if calls > 0 {
427                format!(
428                    "← model: {} tool_call(s) [{}/{} tok]",
429                    calls, output.usage.input_tokens, output.usage.output_tokens
430                )
431            } else {
432                format!(
433                    "← model: {:?} [{}/{} tok]",
434                    txt, output.usage.input_tokens, output.usage.output_tokens
435                )
436            }
437        }
438        SessionEvent::PreTool { action, .. } => {
439            format!("  → tool {} args={}", action.tool, action.args)
440        }
441        SessionEvent::PostTool {
442            call_id, result, ..
443        } => {
444            format!("  ← tool {} ok={}", call_id, result.ok)
445        }
446        SessionEvent::Sensor { id, signals, .. } => format!("  ⚑ sensor {id}: {signals} signal(s)"),
447        SessionEvent::PreCompact { stage, .. } => format!("  ⇩ pre-compact {stage:?}"),
448        SessionEvent::PostCompact { stage, .. } => format!("  ⇧ post-compact {stage:?}"),
449        SessionEvent::BudgetWarning { ratio, .. } => {
450            format!("≫ budget warning (used {:.0}%)", ratio * 100.0)
451        }
452        SessionEvent::End { .. } => "session end".into(),
453    }
454}
455
456/// `Hook` that prints a verbose progress trace to stderr in real time.
457///
458/// Pair with `AgentLoop::with_hook(Arc::new(LiveProgressHook::default()))` to
459/// see model calls, tool calls, and tool results as they happen — instead of
460/// staring at a silent terminal for 60 seconds and then post-mortem'ing a JSONL
461/// file. Independent of `SessionRecorder`; both can be installed together.
462///
463/// Output is structured to be greppable: `[iter=N]` prefix on every line, and
464/// each line is one event. Writes go to stderr, so stdout stays clean for
465/// the final answer.
466#[derive(Default)]
467pub struct LiveProgressHook {
468    iter: std::sync::atomic::AtomicU32,
469}
470
471impl LiveProgressHook {
472    pub fn new() -> Self {
473        Self::default()
474    }
475}
476
477impl Hook for LiveProgressHook {
478    fn name(&self) -> &str {
479        "live-progress"
480    }
481    fn matches(&self, _ev: &Event<'_>) -> bool {
482        true
483    }
484    fn fire(&self, ev: &Event<'_>, world: &mut World) -> HookOutcome {
485        let ts = world.clock.now_ms();
486        let iter = self.iter.load(std::sync::atomic::Ordering::Relaxed);
487        // Reuse the recorder's projection + the verbose formatter so the
488        // live output is the same format you'd see post-mortem.
489        let session_ev = match ev {
490            Event::SessionStart { source } => Some(SessionEvent::Start {
491                ts_ms: ts,
492                source: format!("{source:?}"),
493            }),
494            Event::PreModel { ctx } => Some(SessionEvent::PreModel {
495                ts_ms: ts,
496                history_len: ctx.history.len(),
497                tools_count: ctx.tools.len(),
498            }),
499            Event::PostModel { out } => Some(SessionEvent::PostModel {
500                ts_ms: ts,
501                output: (*out).clone(),
502            }),
503            Event::PreToolUse { action } => Some(SessionEvent::PreTool {
504                ts_ms: ts,
505                action: (*action).clone(),
506            }),
507            Event::PostToolUse { action, result } => Some(SessionEvent::PostTool {
508                ts_ms: ts,
509                call_id: action.call_id.clone(),
510                result: (*result).clone(),
511            }),
512            Event::Heartbeat { iter: i } => {
513                self.iter.store(*i, std::sync::atomic::Ordering::Relaxed);
514                Some(SessionEvent::Heartbeat {
515                    ts_ms: ts,
516                    iter: *i,
517                })
518            }
519            Event::PreCompact { stage } => Some(SessionEvent::PreCompact {
520                ts_ms: ts,
521                stage: *stage,
522            }),
523            Event::PostCompact { stage } => Some(SessionEvent::PostCompact {
524                ts_ms: ts,
525                stage: *stage,
526            }),
527            Event::BudgetWarning { ratio } => Some(SessionEvent::BudgetWarning {
528                ts_ms: ts,
529                ratio: *ratio,
530            }),
531            Event::SessionEnd => Some(SessionEvent::End { ts_ms: ts }),
532            _ => None,
533        };
534        if let Some(e) = session_ev {
535            for line in format_event_verbose(&e).lines() {
536                eprintln!("[iter={iter}] {line}");
537            }
538        }
539        HookOutcome::Allow
540    }
541}
542
543#[cfg(test)]
544mod tests {
545    use super::*;
546
547    fn sample_log() -> Vec<SessionEvent> {
548        vec![
549            SessionEvent::Start {
550                ts_ms: 0,
551                source: "Startup".into(),
552            },
553            SessionEvent::Heartbeat { ts_ms: 1, iter: 0 },
554            SessionEvent::PreModel {
555                ts_ms: 2,
556                history_len: 1,
557                tools_count: 3,
558            },
559            SessionEvent::PostModel {
560                ts_ms: 100,
561                output: ModelOutput {
562                    text: Some("hi".into()),
563                    tool_calls: Vec::new(),
564                    usage: Default::default(),
565                    stop_reason: harness_core::StopReason::EndTurn,
566                    reasoning: None,
567                },
568            },
569            SessionEvent::End { ts_ms: 110 },
570        ]
571    }
572
573    #[test]
574    fn stats_compute_correctly() {
575        let s = SessionStats::from(&sample_log());
576        assert_eq!(s.events, 5);
577        assert_eq!(s.model_calls, 1);
578        assert_eq!(s.iters, 1);
579        assert_eq!(s.duration_ms, 110);
580    }
581
582    #[test]
583    fn round_trip_via_serde() {
584        let original = sample_log();
585        let json: Vec<String> = original
586            .iter()
587            .map(|e| serde_json::to_string(e).unwrap())
588            .collect();
589        let parsed: Vec<SessionEvent> = json
590            .iter()
591            .map(|s| serde_json::from_str::<SessionEvent>(s).unwrap())
592            .collect();
593        assert_eq!(parsed.len(), original.len());
594        assert!(
595            matches!(parsed[3], SessionEvent::PostModel { ref output, .. } if output.text.as_deref() == Some("hi"))
596        );
597    }
598}