Skip to main content

harn_vm/testbench/
tape.rs

1//! Unified event tape for the testbench.
2//!
3//! A tape is the canonical artifact behind `harn test-bench --emit-tape`.
4//! Every non-deterministic input the script consumed — clock advances,
5//! LLM responses, FS reads/writes, subprocess spawns — lands as a typed
6//! [`TapeRecord`] with a logical sequence number, execution phase, and a
7//! virtual-time stamp. The tape is what the [`fidelity`] oracle compares;
8//! it is what `harn test-bench replay` reads to drive a deterministic
9//! re-run.
10//!
11//! [`fidelity`]: super::fidelity
12//!
13//! ## File layout
14//!
15//! ```text
16//! tape.tape       # NDJSON: one header line + one record line per event
17//! tape.cas/       # content-addressed sidecar (BLAKE3 hex names)
18//! ```
19//!
20//! Small payloads are serialized inline. Anything over [`MAX_INLINE_BYTES`]
21//! lands in `tape.cas/<blake3>` and the record carries `{"cas": "<blake3>"}`.
22//! That keeps the main stream diffable when the only thing that changes
23//! is a multi-MB LLM response.
24//!
25//! ## Versioning
26//!
27//! Every tape carries a `version` integer in its header. The current
28//! schema is [`TAPE_FORMAT_VERSION`]. Loaders accept anything `<=` the
29//! current version and emit a structured error for newer tapes; this
30//! gives us room to add record kinds (under `#[serde(other)]`) without
31//! silently breaking older runners.
32//!
33//! ## Recording
34//!
35//! Recording is opt-in: the testbench installs a thread-local
36//! [`TapeRecorder`] when `Testbench::tape = TapeConfig::Emit { path }`.
37//! Every host-capability axis that already has a record path
38//! ([`super::process_tape`], [`super::overlay_fs`], [`crate::llm::mock`],
39//! [`crate::clock_mock`]) calls into this module to push a record. When
40//! no recorder is installed, the helpers are no-ops — production code
41//! pays nothing.
42
43use std::cell::RefCell;
44use std::collections::BTreeMap;
45use std::path::{Path, PathBuf};
46use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
47use std::sync::{Arc, Mutex};
48
49use serde::{Deserialize, Serialize};
50
51use crate::clock_mock;
52
53/// Format version of the on-disk tape representation. Bump on any
54/// breaking change. Loaders refuse tapes with a higher version.
55pub const TAPE_FORMAT_VERSION: u32 = 1;
56
57/// Records whose serialized payload exceeds this size are spilled into
58/// the content-addressed sidecar. Picked to be larger than typical
59/// stdout/file-read sizes but smaller than full LLM responses, so the
60/// main NDJSON stream stays diffable.
61pub const MAX_INLINE_BYTES: usize = 4 * 1024;
62
63/// Header line written first in every tape file. Captures the metadata a
64/// fidelity-checker needs to interpret the records that follow.
65#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
66pub struct TapeHeader {
67    pub version: u32,
68    /// Crate version of the producer (`harn-vm` `CARGO_PKG_VERSION`).
69    /// Surfaced so a fidelity report can attribute divergences across
70    /// runtime upgrades.
71    pub harn_version: String,
72    /// UNIX-epoch milliseconds the script was launched at — i.e. the
73    /// initial value of the testbench's paused clock. `null` when the
74    /// run used the real clock.
75    #[serde(default)]
76    pub started_at_unix_ms: Option<i64>,
77    /// Path passed to `harn test-bench run`. Informational only; replays
78    /// resolve scripts via the CLI argument, not this field.
79    #[serde(default)]
80    pub script_path: Option<String>,
81    /// Positional arguments forwarded to the script (post `--`). Captured
82    /// so two re-runs that differ only in argv are distinguishable.
83    #[serde(default)]
84    pub argv: Vec<String>,
85}
86
87impl TapeHeader {
88    pub fn current(
89        started_at_unix_ms: Option<i64>,
90        script_path: Option<String>,
91        argv: Vec<String>,
92    ) -> Self {
93        Self {
94            version: TAPE_FORMAT_VERSION,
95            harn_version: env!("CARGO_PKG_VERSION").to_string(),
96            started_at_unix_ms,
97            script_path,
98            argv,
99        }
100    }
101}
102
103/// One on-disk line of the tape. Wrapping the header and record kinds
104/// behind a single tagged enum lets us write the whole file as
105/// homogeneous NDJSON.
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[serde(tag = "type", rename_all = "snake_case")]
108enum TapeLine {
109    Header(TapeHeader),
110    Record(TapeRecord),
111}
112
113/// One captured non-deterministic event. The variant carries the record
114/// payload; the wrapping [`TapeRecord`] adds the metadata every variant
115/// shares.
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct TapeRecord {
118    /// Monotonic logical sequence number assigned at record time.
119    pub seq: u64,
120    /// Execution phase that produced the record. The fidelity oracle
121    /// uses this to keep script-visible boundaries strict while letting
122    /// runtime finalization evolve without regenerating user fixtures.
123    #[serde(default)]
124    pub phase: TapePhase,
125    /// Wall-clock value (UNIX-epoch ms) observed at record time. Reads
126    /// from the unified mock clock when one is installed.
127    pub virtual_time_ms: i64,
128    /// Monotonic ms since the testbench was activated. Independent of
129    /// `virtual_time_ms` so a paused clock that never advances still
130    /// produces an ordered stream.
131    pub monotonic_ms: i64,
132    /// The actual event.
133    pub kind: TapeRecordKind,
134}
135
136/// Coarse execution phase for host-boundary tape records.
137#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
138#[serde(rename_all = "snake_case")]
139pub enum TapePhase {
140    /// Records produced while evaluating the user-authored script body.
141    #[default]
142    UserScript,
143    /// Records produced while the runtime drains finish/resume/finalizer
144    /// lifecycle work after the script body has yielded its result.
145    RuntimeFinalize,
146}
147
148impl TapePhase {
149    fn as_u8(self) -> u8 {
150        match self {
151            Self::UserScript => 0,
152            Self::RuntimeFinalize => 1,
153        }
154    }
155
156    fn from_u8(value: u8) -> Self {
157        match value {
158            1 => Self::RuntimeFinalize,
159            _ => Self::UserScript,
160        }
161    }
162
163    pub fn label(self) -> &'static str {
164        match self {
165            Self::UserScript => "user_script",
166            Self::RuntimeFinalize => "runtime_finalize",
167        }
168    }
169}
170
171/// Discriminated union of every record kind the v1 tape captures. New
172/// kinds can be added without breaking older readers (`serde(other)`
173/// support is intentional — unknown variants surface as
174/// [`TapeRecordKind::Unknown`] so a fidelity check still runs).
175#[derive(Debug, Clone, Serialize, Deserialize)]
176#[serde(tag = "kind", rename_all = "snake_case")]
177pub enum TapeRecordKind {
178    /// Script read the wall-clock or monotonic clock. The captured value
179    /// is what the script actually saw, so a re-run that drifts (e.g.
180    /// because the operator forgot `--clock paused`) produces a
181    /// different content hash and the fidelity oracle flags it.
182    ClockRead { source: ClockSource, value_ms: i64 },
183    /// Script slept (or otherwise advanced the unified mock clock) by
184    /// `duration_ms`. The recorded virtual time is post-advance.
185    ClockSleep { duration_ms: u64 },
186    /// LLM call. `request_digest` is a deterministic hash of the call's
187    /// matchable surface (messages + system + tools + tool_choice +
188    /// thinking). `response` is the recorded mock — inline JSON for
189    /// small payloads, a CAS reference for large ones.
190    LlmCall {
191        request_digest: String,
192        response: TapePayload,
193    },
194    /// Filesystem read against the testbench overlay. The content hash
195    /// lets fidelity checks reason about read consistency without
196    /// inlining every byte.
197    FileRead {
198        path: String,
199        content_hash: String,
200        len_bytes: u64,
201    },
202    /// Filesystem write into the testbench overlay.
203    FileWrite {
204        path: String,
205        content_hash: String,
206        len_bytes: u64,
207    },
208    /// Filesystem delete in the overlay layer.
209    FileDelete { path: String },
210    /// Subprocess spawn captured by [`super::process_tape`]. Stdout and
211    /// stderr are stored under `stdout_payload`/`stderr_payload` so the
212    /// large blobs land in CAS rather than the NDJSON line.
213    ProcessSpawn {
214        program: String,
215        args: Vec<String>,
216        cwd: Option<String>,
217        exit_code: i32,
218        duration_ms: u64,
219        stdout_payload: TapePayload,
220        stderr_payload: TapePayload,
221    },
222    /// MCP JSON-RPC exchange observed by Harn's MCP client. The request
223    /// and response payloads are redacted before they are written so
224    /// cassettes and unified tapes share the same privacy boundary.
225    McpJsonRpc {
226        server: String,
227        method: String,
228        request_digest: String,
229        response_digest: String,
230        latency_ms: u64,
231        request_payload: TapePayload,
232        response_payload: TapePayload,
233    },
234    /// Catch-all for record kinds emitted by a newer producer. Lets
235    /// older fidelity checkers compare what they understand and flag
236    /// the rest as `Unknown` divergence rather than refusing to load.
237    #[serde(other)]
238    Unknown,
239}
240
241impl TapeRecordKind {
242    /// Stable, snake_case label for this kind. Mirrors the `kind` tag
243    /// `serde` writes to disk so display-side code (CLI summaries,
244    /// report headers, error messages) is consistent with the wire
245    /// format without re-deriving the string each call site.
246    pub fn label(&self) -> &'static str {
247        match self {
248            Self::ClockRead { .. } => "clock_read",
249            Self::ClockSleep { .. } => "clock_sleep",
250            Self::LlmCall { .. } => "llm_call",
251            Self::FileRead { .. } => "file_read",
252            Self::FileWrite { .. } => "file_write",
253            Self::FileDelete { .. } => "file_delete",
254            Self::ProcessSpawn { .. } => "process_spawn",
255            Self::McpJsonRpc { .. } => "mcp_json_rpc",
256            Self::Unknown => "unknown",
257        }
258    }
259}
260
261/// Which face of the unified clock the script read. Captured so a
262/// fidelity report can attribute drift back to the right axis.
263#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
264#[serde(rename_all = "snake_case")]
265pub enum ClockSource {
266    Wall,
267    Monotonic,
268}
269
270/// On-disk representation of a record payload. Inline for small values,
271/// CAS-by-hash for anything over [`MAX_INLINE_BYTES`].
272#[derive(Debug, Clone, Serialize, Deserialize)]
273#[serde(untagged)]
274pub enum TapePayload {
275    /// Inline UTF-8 text payload. Carries a content hash so a fidelity
276    /// check can compare without re-hashing.
277    Inline { content_hash: String, text: String },
278    /// CAS-stored payload. The bytes live at `<tape>.cas/<content_hash>`.
279    Cas {
280        content_hash: String,
281        len_bytes: u64,
282    },
283}
284
285impl TapePayload {
286    pub fn content_hash(&self) -> &str {
287        match self {
288            Self::Inline { content_hash, .. } | Self::Cas { content_hash, .. } => content_hash,
289        }
290    }
291
292    pub fn len_bytes(&self) -> u64 {
293        match self {
294            Self::Inline { text, .. } => text.len() as u64,
295            Self::Cas { len_bytes, .. } => *len_bytes,
296        }
297    }
298}
299
300/// Compute a stable BLAKE3 hex digest for a byte slice. Centralized so
301/// every record path keys CAS lookups identically.
302pub fn content_hash(bytes: &[u8]) -> String {
303    blake3::hash(bytes).to_hex().to_string()
304}
305
306/// Build a [`TapePayload`] from raw bytes, spilling to the sidecar map
307/// when the body is large enough to clutter the NDJSON.
308fn build_payload(bytes: Vec<u8>, cas: &mut BTreeMap<String, Vec<u8>>) -> TapePayload {
309    let hash = content_hash(&bytes);
310    if bytes.len() > MAX_INLINE_BYTES {
311        let len_bytes = bytes.len() as u64;
312        cas.entry(hash.clone()).or_insert(bytes);
313        TapePayload::Cas {
314            content_hash: hash,
315            len_bytes,
316        }
317    } else {
318        let text = match String::from_utf8(bytes) {
319            Ok(text) => text,
320            Err(error) => {
321                // Non-utf8 bytes still need to round-trip. Stash the raw
322                // bytes in CAS and inline a sentinel so a fidelity diff
323                // is still meaningful.
324                let bytes = error.into_bytes();
325                let len_bytes = bytes.len() as u64;
326                cas.entry(hash.clone()).or_insert(bytes);
327                return TapePayload::Cas {
328                    content_hash: hash,
329                    len_bytes,
330                };
331            }
332        };
333        TapePayload::Inline {
334            content_hash: hash,
335            text,
336        }
337    }
338}
339
340/// In-memory tape: header + ordered record list + sidecar bytes pending
341/// flush to disk. Built by [`TapeRecorder`] during a record run; loaded
342/// by [`EventTape::load`] for replay or fidelity comparison.
343#[derive(Debug, Clone)]
344pub struct EventTape {
345    pub header: TapeHeader,
346    pub records: Vec<TapeRecord>,
347    /// Content-addressed bodies. Populated either by the recorder (in
348    /// memory until [`EventTape::persist`]) or by [`EventTape::load`]
349    /// (read back from `<tape>.cas/`).
350    cas: BTreeMap<String, Vec<u8>>,
351}
352
353impl EventTape {
354    pub fn new(header: TapeHeader) -> Self {
355        Self {
356            header,
357            records: Vec::new(),
358            cas: BTreeMap::new(),
359        }
360    }
361
362    /// Resolve a payload to its full bytes. Inline payloads return their
363    /// text; CAS payloads look up the sidecar.
364    pub fn resolve_payload(&self, payload: &TapePayload) -> Result<Vec<u8>, String> {
365        match payload {
366            TapePayload::Inline { text, .. } => Ok(text.as_bytes().to_vec()),
367            TapePayload::Cas { content_hash, .. } => self
368                .cas
369                .get(content_hash)
370                .cloned()
371                .ok_or_else(|| format!("tape CAS missing entry for {content_hash}")),
372        }
373    }
374
375    /// Total CAS payload count. Useful for diagnostics and tests.
376    pub fn cas_len(&self) -> usize {
377        self.cas.len()
378    }
379
380    /// Persist the tape (NDJSON + sidecar) to `path`. The sidecar lives
381    /// at `<path>.cas/`; the parent directory is created if needed.
382    pub fn persist(&self, path: &Path) -> Result<(), String> {
383        if let Some(parent) = path.parent() {
384            if !parent.as_os_str().is_empty() {
385                std::fs::create_dir_all(parent)
386                    .map_err(|err| format!("mkdir {}: {err}", parent.display()))?;
387            }
388        }
389
390        let mut body = String::new();
391        let header_line = serde_json::to_string(&TapeLine::Header(self.header.clone()))
392            .map_err(|err| format!("serialize tape header: {err}"))?;
393        body.push_str(&header_line);
394        body.push('\n');
395        for record in &self.records {
396            let line = serde_json::to_string(&TapeLine::Record(record.clone()))
397                .map_err(|err| format!("serialize tape record: {err}"))?;
398            body.push_str(&line);
399            body.push('\n');
400        }
401        std::fs::write(path, body).map_err(|err| format!("write {}: {err}", path.display()))?;
402
403        if !self.cas.is_empty() {
404            let cas_dir = cas_dir_for(path);
405            std::fs::create_dir_all(&cas_dir)
406                .map_err(|err| format!("mkdir {}: {err}", cas_dir.display()))?;
407            for (hash, bytes) in &self.cas {
408                let entry = cas_dir.join(hash);
409                std::fs::write(&entry, bytes)
410                    .map_err(|err| format!("write {}: {err}", entry.display()))?;
411            }
412        }
413        Ok(())
414    }
415
416    /// Load a tape from `path`. Reads the NDJSON body and lazily fetches
417    /// any referenced CAS entries from `<path>.cas/`.
418    pub fn load(path: &Path) -> Result<Self, String> {
419        let body = std::fs::read_to_string(path)
420            .map_err(|err| format!("read {}: {err}", path.display()))?;
421        let mut lines = body.lines();
422        let first_line = lines
423            .next()
424            .ok_or_else(|| format!("empty tape file: {}", path.display()))?;
425        let header_line: TapeLine = serde_json::from_str(first_line)
426            .map_err(|err| format!("parse tape header in {}: {err}", path.display()))?;
427        let header = match header_line {
428            TapeLine::Header(header) => header,
429            TapeLine::Record(_) => {
430                return Err(format!(
431                    "tape {} is missing its header (first line is a record)",
432                    path.display()
433                ))
434            }
435        };
436        if header.version > TAPE_FORMAT_VERSION {
437            return Err(format!(
438                "tape {} declares version {} but this runtime supports up to {TAPE_FORMAT_VERSION}",
439                path.display(),
440                header.version
441            ));
442        }
443        let mut records = Vec::new();
444        for (idx, line) in lines.enumerate() {
445            let trimmed = line.trim();
446            if trimmed.is_empty() {
447                continue;
448            }
449            let parsed: TapeLine = serde_json::from_str(trimmed).map_err(|err| {
450                format!(
451                    "parse tape record at line {} in {}: {err}",
452                    idx + 2,
453                    path.display()
454                )
455            })?;
456            match parsed {
457                TapeLine::Record(record) => records.push(record),
458                TapeLine::Header(_) => {
459                    return Err(format!(
460                        "tape {} contains a second header at line {}",
461                        path.display(),
462                        idx + 2
463                    ))
464                }
465            }
466        }
467
468        let mut cas = BTreeMap::new();
469        let cas_dir = cas_dir_for(path);
470        if cas_dir.is_dir() {
471            for record in &records {
472                visit_payloads(&record.kind, |payload| {
473                    if let TapePayload::Cas { content_hash, .. } = payload {
474                        if cas.contains_key(content_hash) {
475                            return;
476                        }
477                        let entry = cas_dir.join(content_hash);
478                        if let Ok(bytes) = std::fs::read(&entry) {
479                            cas.insert(content_hash.clone(), bytes);
480                        }
481                    }
482                });
483            }
484        }
485        Ok(Self {
486            header,
487            records,
488            cas,
489        })
490    }
491}
492
493fn cas_dir_for(tape_path: &Path) -> PathBuf {
494    let mut buf = tape_path.as_os_str().to_owned();
495    buf.push(".cas");
496    PathBuf::from(buf)
497}
498
499fn visit_payloads(kind: &TapeRecordKind, mut visit: impl FnMut(&TapePayload)) {
500    match kind {
501        TapeRecordKind::LlmCall { response, .. } => visit(response),
502        TapeRecordKind::ProcessSpawn {
503            stdout_payload,
504            stderr_payload,
505            ..
506        } => {
507            visit(stdout_payload);
508            visit(stderr_payload);
509        }
510        TapeRecordKind::McpJsonRpc {
511            request_payload,
512            response_payload,
513            ..
514        } => {
515            visit(request_payload);
516            visit(response_payload);
517        }
518        TapeRecordKind::ClockRead { .. }
519        | TapeRecordKind::ClockSleep { .. }
520        | TapeRecordKind::FileRead { .. }
521        | TapeRecordKind::FileWrite { .. }
522        | TapeRecordKind::FileDelete { .. }
523        | TapeRecordKind::Unknown => {}
524    }
525}
526
527/// Recorder consulted by every host-capability axis. When installed as
528/// the [`active_recorder`], each axis's record path also pushes a
529/// [`TapeRecord`] here so the unified tape stays in sync without
530/// re-routing every capability through this module.
531#[derive(Debug)]
532pub struct TapeRecorder {
533    next_seq: AtomicU64,
534    phase: AtomicU8,
535    started_at: clock_mock::ClockInstant,
536    inner: Mutex<RecorderInner>,
537}
538
539#[derive(Debug, Default)]
540struct RecorderInner {
541    records: Vec<TapeRecord>,
542    cas: BTreeMap<String, Vec<u8>>,
543}
544
545impl Default for TapeRecorder {
546    fn default() -> Self {
547        Self::new()
548    }
549}
550
551impl TapeRecorder {
552    pub fn new() -> Self {
553        Self {
554            next_seq: AtomicU64::new(0),
555            phase: AtomicU8::new(TapePhase::UserScript.as_u8()),
556            started_at: clock_mock::instant_now(),
557            inner: Mutex::new(RecorderInner::default()),
558        }
559    }
560
561    /// Append a record built from `kind`. The recorder stamps the seq
562    /// number and timing metadata; callers only worry about the payload.
563    pub fn record(&self, kind: TapeRecordKind) {
564        let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
565        let virtual_time_ms = clock_mock::now_ms();
566        let monotonic_ms = clock_mock::instant_now()
567            .duration_since(self.started_at)
568            .as_millis()
569            .min(i64::MAX as u128) as i64;
570        let record = TapeRecord {
571            seq,
572            phase: TapePhase::from_u8(self.phase.load(Ordering::SeqCst)),
573            virtual_time_ms,
574            monotonic_ms,
575            kind,
576        };
577        self.inner
578            .lock()
579            .expect("tape recorder mutex poisoned")
580            .records
581            .push(record);
582    }
583
584    fn swap_phase(&self, phase: TapePhase) -> TapePhase {
585        TapePhase::from_u8(self.phase.swap(phase.as_u8(), Ordering::SeqCst))
586    }
587
588    /// Convenience wrapper: build a [`TapePayload`] from `bytes` (spilling
589    /// to CAS as needed) and register the bytes for persistence. Used by
590    /// axes that have raw bodies on hand (subprocess stdout, LLM
591    /// response JSON, file content).
592    pub fn payload_from_bytes(&self, bytes: Vec<u8>) -> TapePayload {
593        let mut inner = self.inner.lock().expect("tape recorder mutex poisoned");
594        build_payload(bytes, &mut inner.cas)
595    }
596
597    /// Snapshot the tape into a self-contained [`EventTape`]. Consumes
598    /// the recorder's CAS by `clone()` so a recorder can be sampled
599    /// mid-run for diagnostics — production callers usually move into
600    /// `into_tape` instead.
601    pub fn snapshot(&self, header: TapeHeader) -> EventTape {
602        let inner = self.inner.lock().expect("tape recorder mutex poisoned");
603        EventTape {
604            header,
605            records: inner.records.clone(),
606            cas: inner.cas.clone(),
607        }
608    }
609}
610
611thread_local! {
612    static ACTIVE_RECORDER: RefCell<Option<Arc<TapeRecorder>>> = const { RefCell::new(None) };
613}
614
615/// RAII guard returned by [`install_recorder`]. Restores the previous
616/// recorder (if any) on drop so nested testbench sessions stay sane.
617pub struct TapeRecorderGuard {
618    previous: Option<Arc<TapeRecorder>>,
619}
620
621impl Drop for TapeRecorderGuard {
622    fn drop(&mut self) {
623        let prev = self.previous.take();
624        ACTIVE_RECORDER.with(|slot| {
625            *slot.borrow_mut() = prev;
626        });
627    }
628}
629
630pub fn install_recorder(recorder: Arc<TapeRecorder>) -> TapeRecorderGuard {
631    let previous = ACTIVE_RECORDER.with(|slot| slot.replace(Some(recorder)));
632    TapeRecorderGuard { previous }
633}
634
635/// Currently installed recorder, if any. Production callers stay
636/// untouched because nothing installs a recorder outside testbench mode.
637pub fn active_recorder() -> Option<Arc<TapeRecorder>> {
638    ACTIVE_RECORDER.with(|slot| slot.borrow().clone())
639}
640
641/// Record an MCP JSON-RPC exchange in the active unified tape, if one
642/// is installed. Payloads are redacted here so every caller gets the
643/// same privacy behavior as MCP cassettes.
644pub fn record_mcp_json_rpc(
645    server: &str,
646    method: &str,
647    request: &serde_json::Value,
648    response: &serde_json::Value,
649    latency_ms: u64,
650) {
651    let Some(recorder) = active_recorder() else {
652        return;
653    };
654    let policy = crate::redact::current_policy();
655    let request = policy.redact_json(request);
656    let response = policy.redact_json(response);
657    let request_bytes = serde_json::to_vec(&request).unwrap_or_default();
658    let response_bytes = serde_json::to_vec(&response).unwrap_or_default();
659    let request_digest = content_hash(&request_bytes);
660    let response_digest = content_hash(&response_bytes);
661    let request_payload = recorder.payload_from_bytes(request_bytes);
662    let response_payload = recorder.payload_from_bytes(response_bytes);
663    recorder.record(TapeRecordKind::McpJsonRpc {
664        server: server.to_string(),
665        method: method.to_string(),
666        request_digest,
667        response_digest,
668        latency_ms,
669        request_payload,
670        response_payload,
671    });
672}
673
674/// RAII guard that temporarily changes the phase stamped onto records
675/// from the active recorder.
676pub struct TapePhaseGuard {
677    recorder: Arc<TapeRecorder>,
678    previous: TapePhase,
679}
680
681impl Drop for TapePhaseGuard {
682    fn drop(&mut self) {
683        self.recorder.swap_phase(self.previous);
684    }
685}
686
687/// Enter `phase` for subsequent records emitted by the active recorder.
688/// Returns `None` when tape recording is off.
689pub fn enter_phase(phase: TapePhase) -> Option<TapePhaseGuard> {
690    let recorder = active_recorder()?;
691    let previous = recorder.swap_phase(phase);
692    Some(TapePhaseGuard { recorder, previous })
693}
694
695/// Push a record if a recorder is active. The closure is only evaluated
696/// when recording is on, so the per-axis hooks pay nothing in production.
697pub fn with_active_recorder<F>(build: F)
698where
699    F: FnOnce(&Arc<TapeRecorder>) -> Option<TapeRecordKind>,
700{
701    let Some(recorder) = active_recorder() else {
702        return;
703    };
704    if let Some(kind) = build(&recorder) {
705        recorder.record(kind);
706    }
707}
708
709#[cfg(test)]
710mod tests {
711    use super::*;
712    use tempfile::TempDir;
713
714    fn small_record(seq: u64, dur: u64) -> TapeRecord {
715        TapeRecord {
716            seq,
717            phase: TapePhase::UserScript,
718            virtual_time_ms: seq as i64 * 1000,
719            monotonic_ms: seq as i64 * 1000,
720            kind: TapeRecordKind::ClockSleep { duration_ms: dur },
721        }
722    }
723
724    #[test]
725    fn round_trip_inline_records() {
726        let temp = TempDir::new().unwrap();
727        let path = temp.path().join("run.tape");
728        let mut tape = EventTape::new(TapeHeader::current(
729            Some(1_700_000_000_000),
730            Some("script.harn".to_string()),
731            vec!["a".into()],
732        ));
733        tape.records.push(small_record(0, 250));
734        tape.records.push(small_record(1, 750));
735        tape.persist(&path).unwrap();
736
737        let loaded = EventTape::load(&path).unwrap();
738        assert_eq!(loaded.header.version, TAPE_FORMAT_VERSION);
739        assert_eq!(loaded.header.argv, vec!["a".to_string()]);
740        assert_eq!(loaded.records.len(), 2);
741        match &loaded.records[0].kind {
742            TapeRecordKind::ClockSleep { duration_ms } => assert_eq!(*duration_ms, 250),
743            other => panic!("unexpected: {other:?}"),
744        }
745    }
746
747    #[test]
748    fn recorder_phase_guard_stamps_and_restores() {
749        let recorder = Arc::new(TapeRecorder::new());
750        let _recorder_guard = install_recorder(Arc::clone(&recorder));
751
752        with_active_recorder(|_| Some(TapeRecordKind::ClockSleep { duration_ms: 1 }));
753        {
754            let _phase_guard = enter_phase(TapePhase::RuntimeFinalize).unwrap();
755            with_active_recorder(|_| Some(TapeRecordKind::ClockSleep { duration_ms: 2 }));
756        }
757        with_active_recorder(|_| Some(TapeRecordKind::ClockSleep { duration_ms: 3 }));
758
759        let tape = recorder.snapshot(TapeHeader::current(None, None, Vec::new()));
760        let phases = tape
761            .records
762            .iter()
763            .map(|record| record.phase)
764            .collect::<Vec<_>>();
765        assert_eq!(
766            phases,
767            vec![
768                TapePhase::UserScript,
769                TapePhase::RuntimeFinalize,
770                TapePhase::UserScript
771            ]
772        );
773    }
774
775    #[test]
776    fn large_payloads_spill_to_cas_and_round_trip() {
777        let temp = TempDir::new().unwrap();
778        let path = temp.path().join("run.tape");
779        let mut tape = EventTape::new(TapeHeader::current(None, None, Vec::new()));
780        let big = vec![b'x'; MAX_INLINE_BYTES + 32];
781        let payload = build_payload(big.clone(), &mut tape.cas);
782        let hash = payload.content_hash().to_string();
783        let kind = TapeRecordKind::ProcessSpawn {
784            program: "/bin/echo".to_string(),
785            args: vec!["x".to_string()],
786            cwd: None,
787            exit_code: 0,
788            duration_ms: 1,
789            stdout_payload: payload,
790            stderr_payload: build_payload(Vec::new(), &mut tape.cas),
791        };
792        tape.records.push(TapeRecord {
793            seq: 0,
794            phase: TapePhase::UserScript,
795            virtual_time_ms: 0,
796            monotonic_ms: 0,
797            kind,
798        });
799        tape.persist(&path).unwrap();
800
801        // CAS sidecar exists.
802        assert!(path.with_extension("tape.cas").exists() || cas_dir_for(&path).exists());
803        let cas_dir = cas_dir_for(&path);
804        assert!(cas_dir.join(&hash).exists());
805
806        let loaded = EventTape::load(&path).unwrap();
807        let resolved = match &loaded.records[0].kind {
808            TapeRecordKind::ProcessSpawn { stdout_payload, .. } => {
809                loaded.resolve_payload(stdout_payload).unwrap()
810            }
811            other => panic!("unexpected: {other:?}"),
812        };
813        assert_eq!(resolved.len(), big.len());
814    }
815
816    #[test]
817    fn rejects_newer_version() {
818        let temp = TempDir::new().unwrap();
819        let path = temp.path().join("future.tape");
820        std::fs::write(
821            &path,
822            r#"{"type":"header","version":99,"harn_version":"x","started_at_unix_ms":null,"script_path":null,"argv":[]}
823"#,
824        )
825        .unwrap();
826        let err = EventTape::load(&path).unwrap_err();
827        assert!(err.contains("version 99"), "{err}");
828    }
829
830    #[test]
831    fn recorder_assigns_monotonic_seq() {
832        let recorder = Arc::new(TapeRecorder::new());
833        recorder.record(TapeRecordKind::ClockSleep { duration_ms: 1 });
834        recorder.record(TapeRecordKind::ClockSleep { duration_ms: 2 });
835        let snapshot = recorder.snapshot(TapeHeader::current(None, None, Vec::new()));
836        assert_eq!(snapshot.records[0].seq, 0);
837        assert_eq!(snapshot.records[1].seq, 1);
838    }
839}