Skip to main content

harn_vm/testbench/
tape.rs

1//! Unified event tape for the testbench.
2//!
3//! A tape is the canonical artifact behind `harn test-bench --emit-tape`.
4//! Every non-deterministic input the script consumed — clock advances,
5//! LLM responses, FS reads/writes, subprocess spawns — lands as a typed
6//! [`TapeRecord`] with a logical sequence number, execution phase, and a
7//! virtual-time stamp. The tape is what the [`fidelity`] oracle compares;
8//! it is what `harn test-bench replay` reads to drive a deterministic
9//! re-run.
10//!
11//! [`fidelity`]: super::fidelity
12//!
13//! ## File layout
14//!
15//! ```text
16//! tape.tape       # NDJSON: one header line + one record line per event
17//! tape.cas/       # content-addressed sidecar (BLAKE3 hex names)
18//! ```
19//!
20//! Small payloads are serialized inline. Anything over [`MAX_INLINE_BYTES`]
21//! lands in `tape.cas/<blake3>` and the record carries `{"cas": "<blake3>"}`.
22//! That keeps the main stream diffable when the only thing that changes
23//! is a multi-MB LLM response.
24//!
25//! ## Versioning
26//!
27//! Every tape carries a `version` integer in its header. The current
28//! schema is [`TAPE_FORMAT_VERSION`]. Loaders accept anything `<=` the
29//! current version and emit a structured error for newer tapes; this
30//! gives us room to add record kinds (under `#[serde(other)]`) without
31//! silently breaking older runners.
32//!
33//! ## Recording
34//!
35//! Recording is opt-in: the testbench installs a thread-local
36//! [`TapeRecorder`] when `Testbench::tape = TapeConfig::Emit { path }`.
37//! Every host-capability axis that already has a record path
38//! ([`super::process_tape`], [`super::overlay_fs`], [`crate::llm::mock`],
39//! [`crate::clock_mock`]) calls into this module to push a record. When
40//! no recorder is installed, the helpers are no-ops — production code
41//! pays nothing.
42
43use std::cell::RefCell;
44use std::collections::BTreeMap;
45use std::path::{Path, PathBuf};
46use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
47use std::sync::{Arc, Mutex};
48
49use serde::{Deserialize, Serialize};
50
51use crate::clock_mock;
52
53/// Format version of the on-disk tape representation. Bump on any
54/// breaking change. Loaders refuse tapes with a higher version.
55pub const TAPE_FORMAT_VERSION: u32 = 1;
56
57/// Records whose serialized payload exceeds this size are spilled into
58/// the content-addressed sidecar. Picked to be larger than typical
59/// stdout/file-read sizes but smaller than full LLM responses, so the
60/// main NDJSON stream stays diffable.
61pub const MAX_INLINE_BYTES: usize = 4 * 1024;
62
63/// Header line written first in every tape file. Captures the metadata a
64/// fidelity-checker needs to interpret the records that follow.
65#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
66pub struct TapeHeader {
67    pub version: u32,
68    /// Crate version of the producer (`harn-vm` `CARGO_PKG_VERSION`).
69    /// Surfaced so a fidelity report can attribute divergences across
70    /// runtime upgrades.
71    pub harn_version: String,
72    /// UNIX-epoch milliseconds the script was launched at — i.e. the
73    /// initial value of the testbench's paused clock. `null` when the
74    /// run used the real clock.
75    #[serde(default)]
76    pub started_at_unix_ms: Option<i64>,
77    /// Path passed to `harn test-bench run`. Informational only; replays
78    /// resolve scripts via the CLI argument, not this field.
79    #[serde(default)]
80    pub script_path: Option<String>,
81    /// Positional arguments forwarded to the script (post `--`). Captured
82    /// so two re-runs that differ only in argv are distinguishable.
83    #[serde(default)]
84    pub argv: Vec<String>,
85}
86
87impl TapeHeader {
88    pub fn current(
89        started_at_unix_ms: Option<i64>,
90        script_path: Option<String>,
91        argv: Vec<String>,
92    ) -> Self {
93        Self {
94            version: TAPE_FORMAT_VERSION,
95            harn_version: env!("CARGO_PKG_VERSION").to_string(),
96            started_at_unix_ms,
97            script_path,
98            argv,
99        }
100    }
101}
102
103/// One on-disk line of the tape. Wrapping the header and record kinds
104/// behind a single tagged enum lets us write the whole file as
105/// homogeneous NDJSON.
106#[derive(Debug, Clone, Serialize, Deserialize)]
107#[serde(tag = "type", rename_all = "snake_case")]
108enum TapeLine {
109    Header(TapeHeader),
110    Record(TapeRecord),
111}
112
113/// One captured non-deterministic event. The variant carries the record
114/// payload; the wrapping [`TapeRecord`] adds the metadata every variant
115/// shares.
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct TapeRecord {
118    /// Monotonic logical sequence number assigned at record time.
119    pub seq: u64,
120    /// Execution phase that produced the record. The fidelity oracle
121    /// uses this to keep script-visible boundaries strict while letting
122    /// runtime finalization evolve without regenerating user fixtures.
123    #[serde(default)]
124    pub phase: TapePhase,
125    /// Wall-clock value (UNIX-epoch ms) observed at record time. Reads
126    /// from the unified mock clock when one is installed.
127    pub virtual_time_ms: i64,
128    /// Monotonic ms since the testbench was activated. Independent of
129    /// `virtual_time_ms` so a paused clock that never advances still
130    /// produces an ordered stream.
131    pub monotonic_ms: i64,
132    /// The actual event.
133    pub kind: TapeRecordKind,
134}
135
136/// Coarse execution phase for host-boundary tape records.
137#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
138#[serde(rename_all = "snake_case")]
139pub enum TapePhase {
140    /// Records produced while evaluating the user-authored script body.
141    #[default]
142    UserScript,
143    /// Records produced while the runtime drains finish/resume/finalizer
144    /// lifecycle work after the script body has yielded its result.
145    RuntimeFinalize,
146}
147
148impl TapePhase {
149    fn as_u8(self) -> u8 {
150        match self {
151            Self::UserScript => 0,
152            Self::RuntimeFinalize => 1,
153        }
154    }
155
156    fn from_u8(value: u8) -> Self {
157        match value {
158            1 => Self::RuntimeFinalize,
159            _ => Self::UserScript,
160        }
161    }
162
163    pub fn label(self) -> &'static str {
164        match self {
165            Self::UserScript => "user_script",
166            Self::RuntimeFinalize => "runtime_finalize",
167        }
168    }
169}
170
171/// Discriminated union of every record kind the v1 tape captures. New
172/// kinds can be added without breaking older readers (`serde(other)`
173/// support is intentional — unknown variants surface as
174/// [`TapeRecordKind::Unknown`] so a fidelity check still runs).
175#[derive(Debug, Clone, Serialize, Deserialize)]
176#[serde(tag = "kind", rename_all = "snake_case")]
177pub enum TapeRecordKind {
178    /// Script read the wall-clock or monotonic clock. The captured value
179    /// is what the script actually saw, so a re-run that drifts (e.g.
180    /// because the operator forgot `--clock paused`) produces a
181    /// different content hash and the fidelity oracle flags it.
182    ClockRead { source: ClockSource, value_ms: i64 },
183    /// Script slept (or otherwise advanced the unified mock clock) by
184    /// `duration_ms`. The recorded virtual time is post-advance.
185    ClockSleep { duration_ms: u64 },
186    /// LLM call. `request_digest` is a deterministic hash of the call's
187    /// matchable surface (messages + system + tools + tool_choice +
188    /// thinking). `response` is the recorded mock — inline JSON for
189    /// small payloads, a CAS reference for large ones.
190    LlmCall {
191        request_digest: String,
192        response: TapePayload,
193    },
194    /// Filesystem read against the testbench overlay. The content hash
195    /// lets fidelity checks reason about read consistency without
196    /// inlining every byte.
197    FileRead {
198        path: String,
199        content_hash: String,
200        len_bytes: u64,
201    },
202    /// Filesystem write into the testbench overlay.
203    FileWrite {
204        path: String,
205        content_hash: String,
206        len_bytes: u64,
207    },
208    /// Filesystem delete in the overlay layer.
209    FileDelete { path: String },
210    /// Subprocess spawn captured by [`super::process_tape`]. Stdout and
211    /// stderr are stored under `stdout_payload`/`stderr_payload` so the
212    /// large blobs land in CAS rather than the NDJSON line.
213    ProcessSpawn {
214        program: String,
215        args: Vec<String>,
216        cwd: Option<String>,
217        exit_code: i32,
218        duration_ms: u64,
219        stdout_payload: TapePayload,
220        stderr_payload: TapePayload,
221    },
222    /// Catch-all for record kinds emitted by a newer producer. Lets
223    /// older fidelity checkers compare what they understand and flag
224    /// the rest as `Unknown` divergence rather than refusing to load.
225    #[serde(other)]
226    Unknown,
227}
228
229impl TapeRecordKind {
230    /// Stable, snake_case label for this kind. Mirrors the `kind` tag
231    /// `serde` writes to disk so display-side code (CLI summaries,
232    /// report headers, error messages) is consistent with the wire
233    /// format without re-deriving the string each call site.
234    pub fn label(&self) -> &'static str {
235        match self {
236            Self::ClockRead { .. } => "clock_read",
237            Self::ClockSleep { .. } => "clock_sleep",
238            Self::LlmCall { .. } => "llm_call",
239            Self::FileRead { .. } => "file_read",
240            Self::FileWrite { .. } => "file_write",
241            Self::FileDelete { .. } => "file_delete",
242            Self::ProcessSpawn { .. } => "process_spawn",
243            Self::Unknown => "unknown",
244        }
245    }
246}
247
248/// Which face of the unified clock the script read. Captured so a
249/// fidelity report can attribute drift back to the right axis.
250#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
251#[serde(rename_all = "snake_case")]
252pub enum ClockSource {
253    Wall,
254    Monotonic,
255}
256
257/// On-disk representation of a record payload. Inline for small values,
258/// CAS-by-hash for anything over [`MAX_INLINE_BYTES`].
259#[derive(Debug, Clone, Serialize, Deserialize)]
260#[serde(untagged)]
261pub enum TapePayload {
262    /// Inline UTF-8 text payload. Carries a content hash so a fidelity
263    /// check can compare without re-hashing.
264    Inline { content_hash: String, text: String },
265    /// CAS-stored payload. The bytes live at `<tape>.cas/<content_hash>`.
266    Cas {
267        content_hash: String,
268        len_bytes: u64,
269    },
270}
271
272impl TapePayload {
273    pub fn content_hash(&self) -> &str {
274        match self {
275            Self::Inline { content_hash, .. } | Self::Cas { content_hash, .. } => content_hash,
276        }
277    }
278
279    pub fn len_bytes(&self) -> u64 {
280        match self {
281            Self::Inline { text, .. } => text.len() as u64,
282            Self::Cas { len_bytes, .. } => *len_bytes,
283        }
284    }
285}
286
287/// Compute a stable BLAKE3 hex digest for a byte slice. Centralized so
288/// every record path keys CAS lookups identically.
289pub fn content_hash(bytes: &[u8]) -> String {
290    blake3::hash(bytes).to_hex().to_string()
291}
292
293/// Build a [`TapePayload`] from raw bytes, spilling to the sidecar map
294/// when the body is large enough to clutter the NDJSON.
295fn build_payload(bytes: Vec<u8>, cas: &mut BTreeMap<String, Vec<u8>>) -> TapePayload {
296    let hash = content_hash(&bytes);
297    if bytes.len() > MAX_INLINE_BYTES {
298        let len_bytes = bytes.len() as u64;
299        cas.entry(hash.clone()).or_insert(bytes);
300        TapePayload::Cas {
301            content_hash: hash,
302            len_bytes,
303        }
304    } else {
305        let text = match String::from_utf8(bytes) {
306            Ok(text) => text,
307            Err(error) => {
308                // Non-utf8 bytes still need to round-trip. Stash the raw
309                // bytes in CAS and inline a sentinel so a fidelity diff
310                // is still meaningful.
311                let bytes = error.into_bytes();
312                let len_bytes = bytes.len() as u64;
313                cas.entry(hash.clone()).or_insert(bytes);
314                return TapePayload::Cas {
315                    content_hash: hash,
316                    len_bytes,
317                };
318            }
319        };
320        TapePayload::Inline {
321            content_hash: hash,
322            text,
323        }
324    }
325}
326
327/// In-memory tape: header + ordered record list + sidecar bytes pending
328/// flush to disk. Built by [`TapeRecorder`] during a record run; loaded
329/// by [`EventTape::load`] for replay or fidelity comparison.
330#[derive(Debug, Clone)]
331pub struct EventTape {
332    pub header: TapeHeader,
333    pub records: Vec<TapeRecord>,
334    /// Content-addressed bodies. Populated either by the recorder (in
335    /// memory until [`EventTape::persist`]) or by [`EventTape::load`]
336    /// (read back from `<tape>.cas/`).
337    cas: BTreeMap<String, Vec<u8>>,
338}
339
340impl EventTape {
341    pub fn new(header: TapeHeader) -> Self {
342        Self {
343            header,
344            records: Vec::new(),
345            cas: BTreeMap::new(),
346        }
347    }
348
349    /// Resolve a payload to its full bytes. Inline payloads return their
350    /// text; CAS payloads look up the sidecar.
351    pub fn resolve_payload(&self, payload: &TapePayload) -> Result<Vec<u8>, String> {
352        match payload {
353            TapePayload::Inline { text, .. } => Ok(text.as_bytes().to_vec()),
354            TapePayload::Cas { content_hash, .. } => self
355                .cas
356                .get(content_hash)
357                .cloned()
358                .ok_or_else(|| format!("tape CAS missing entry for {content_hash}")),
359        }
360    }
361
362    /// Total CAS payload count. Useful for diagnostics and tests.
363    pub fn cas_len(&self) -> usize {
364        self.cas.len()
365    }
366
367    /// Persist the tape (NDJSON + sidecar) to `path`. The sidecar lives
368    /// at `<path>.cas/`; the parent directory is created if needed.
369    pub fn persist(&self, path: &Path) -> Result<(), String> {
370        if let Some(parent) = path.parent() {
371            if !parent.as_os_str().is_empty() {
372                std::fs::create_dir_all(parent)
373                    .map_err(|err| format!("mkdir {}: {err}", parent.display()))?;
374            }
375        }
376
377        let mut body = String::new();
378        let header_line = serde_json::to_string(&TapeLine::Header(self.header.clone()))
379            .map_err(|err| format!("serialize tape header: {err}"))?;
380        body.push_str(&header_line);
381        body.push('\n');
382        for record in &self.records {
383            let line = serde_json::to_string(&TapeLine::Record(record.clone()))
384                .map_err(|err| format!("serialize tape record: {err}"))?;
385            body.push_str(&line);
386            body.push('\n');
387        }
388        std::fs::write(path, body).map_err(|err| format!("write {}: {err}", path.display()))?;
389
390        if !self.cas.is_empty() {
391            let cas_dir = cas_dir_for(path);
392            std::fs::create_dir_all(&cas_dir)
393                .map_err(|err| format!("mkdir {}: {err}", cas_dir.display()))?;
394            for (hash, bytes) in &self.cas {
395                let entry = cas_dir.join(hash);
396                std::fs::write(&entry, bytes)
397                    .map_err(|err| format!("write {}: {err}", entry.display()))?;
398            }
399        }
400        Ok(())
401    }
402
403    /// Load a tape from `path`. Reads the NDJSON body and lazily fetches
404    /// any referenced CAS entries from `<path>.cas/`.
405    pub fn load(path: &Path) -> Result<Self, String> {
406        let body = std::fs::read_to_string(path)
407            .map_err(|err| format!("read {}: {err}", path.display()))?;
408        let mut lines = body.lines();
409        let first_line = lines
410            .next()
411            .ok_or_else(|| format!("empty tape file: {}", path.display()))?;
412        let header_line: TapeLine = serde_json::from_str(first_line)
413            .map_err(|err| format!("parse tape header in {}: {err}", path.display()))?;
414        let header = match header_line {
415            TapeLine::Header(header) => header,
416            TapeLine::Record(_) => {
417                return Err(format!(
418                    "tape {} is missing its header (first line is a record)",
419                    path.display()
420                ))
421            }
422        };
423        if header.version > TAPE_FORMAT_VERSION {
424            return Err(format!(
425                "tape {} declares version {} but this runtime supports up to {TAPE_FORMAT_VERSION}",
426                path.display(),
427                header.version
428            ));
429        }
430        let mut records = Vec::new();
431        for (idx, line) in lines.enumerate() {
432            let trimmed = line.trim();
433            if trimmed.is_empty() {
434                continue;
435            }
436            let parsed: TapeLine = serde_json::from_str(trimmed).map_err(|err| {
437                format!(
438                    "parse tape record at line {} in {}: {err}",
439                    idx + 2,
440                    path.display()
441                )
442            })?;
443            match parsed {
444                TapeLine::Record(record) => records.push(record),
445                TapeLine::Header(_) => {
446                    return Err(format!(
447                        "tape {} contains a second header at line {}",
448                        path.display(),
449                        idx + 2
450                    ))
451                }
452            }
453        }
454
455        let mut cas = BTreeMap::new();
456        let cas_dir = cas_dir_for(path);
457        if cas_dir.is_dir() {
458            for record in &records {
459                visit_payloads(&record.kind, |payload| {
460                    if let TapePayload::Cas { content_hash, .. } = payload {
461                        if cas.contains_key(content_hash) {
462                            return;
463                        }
464                        let entry = cas_dir.join(content_hash);
465                        if let Ok(bytes) = std::fs::read(&entry) {
466                            cas.insert(content_hash.clone(), bytes);
467                        }
468                    }
469                });
470            }
471        }
472        Ok(Self {
473            header,
474            records,
475            cas,
476        })
477    }
478}
479
480fn cas_dir_for(tape_path: &Path) -> PathBuf {
481    let mut buf = tape_path.as_os_str().to_owned();
482    buf.push(".cas");
483    PathBuf::from(buf)
484}
485
486fn visit_payloads(kind: &TapeRecordKind, mut visit: impl FnMut(&TapePayload)) {
487    match kind {
488        TapeRecordKind::LlmCall { response, .. } => visit(response),
489        TapeRecordKind::ProcessSpawn {
490            stdout_payload,
491            stderr_payload,
492            ..
493        } => {
494            visit(stdout_payload);
495            visit(stderr_payload);
496        }
497        TapeRecordKind::ClockRead { .. }
498        | TapeRecordKind::ClockSleep { .. }
499        | TapeRecordKind::FileRead { .. }
500        | TapeRecordKind::FileWrite { .. }
501        | TapeRecordKind::FileDelete { .. }
502        | TapeRecordKind::Unknown => {}
503    }
504}
505
506/// Recorder consulted by every host-capability axis. When installed as
507/// the [`active_recorder`], each axis's record path also pushes a
508/// [`TapeRecord`] here so the unified tape stays in sync without
509/// re-routing every capability through this module.
510#[derive(Debug)]
511pub struct TapeRecorder {
512    next_seq: AtomicU64,
513    phase: AtomicU8,
514    started_at: clock_mock::ClockInstant,
515    inner: Mutex<RecorderInner>,
516}
517
518#[derive(Debug, Default)]
519struct RecorderInner {
520    records: Vec<TapeRecord>,
521    cas: BTreeMap<String, Vec<u8>>,
522}
523
524impl Default for TapeRecorder {
525    fn default() -> Self {
526        Self::new()
527    }
528}
529
530impl TapeRecorder {
531    pub fn new() -> Self {
532        Self {
533            next_seq: AtomicU64::new(0),
534            phase: AtomicU8::new(TapePhase::UserScript.as_u8()),
535            started_at: clock_mock::instant_now(),
536            inner: Mutex::new(RecorderInner::default()),
537        }
538    }
539
540    /// Append a record built from `kind`. The recorder stamps the seq
541    /// number and timing metadata; callers only worry about the payload.
542    pub fn record(&self, kind: TapeRecordKind) {
543        let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
544        let virtual_time_ms = clock_mock::now_ms();
545        let monotonic_ms = clock_mock::instant_now()
546            .duration_since(self.started_at)
547            .as_millis()
548            .min(i64::MAX as u128) as i64;
549        let record = TapeRecord {
550            seq,
551            phase: TapePhase::from_u8(self.phase.load(Ordering::SeqCst)),
552            virtual_time_ms,
553            monotonic_ms,
554            kind,
555        };
556        self.inner
557            .lock()
558            .expect("tape recorder mutex poisoned")
559            .records
560            .push(record);
561    }
562
563    fn swap_phase(&self, phase: TapePhase) -> TapePhase {
564        TapePhase::from_u8(self.phase.swap(phase.as_u8(), Ordering::SeqCst))
565    }
566
567    /// Convenience wrapper: build a [`TapePayload`] from `bytes` (spilling
568    /// to CAS as needed) and register the bytes for persistence. Used by
569    /// axes that have raw bodies on hand (subprocess stdout, LLM
570    /// response JSON, file content).
571    pub fn payload_from_bytes(&self, bytes: Vec<u8>) -> TapePayload {
572        let mut inner = self.inner.lock().expect("tape recorder mutex poisoned");
573        build_payload(bytes, &mut inner.cas)
574    }
575
576    /// Snapshot the tape into a self-contained [`EventTape`]. Consumes
577    /// the recorder's CAS by `clone()` so a recorder can be sampled
578    /// mid-run for diagnostics — production callers usually move into
579    /// `into_tape` instead.
580    pub fn snapshot(&self, header: TapeHeader) -> EventTape {
581        let inner = self.inner.lock().expect("tape recorder mutex poisoned");
582        EventTape {
583            header,
584            records: inner.records.clone(),
585            cas: inner.cas.clone(),
586        }
587    }
588}
589
590thread_local! {
591    static ACTIVE_RECORDER: RefCell<Option<Arc<TapeRecorder>>> = const { RefCell::new(None) };
592}
593
594/// RAII guard returned by [`install_recorder`]. Restores the previous
595/// recorder (if any) on drop so nested testbench sessions stay sane.
596pub struct TapeRecorderGuard {
597    previous: Option<Arc<TapeRecorder>>,
598}
599
600impl Drop for TapeRecorderGuard {
601    fn drop(&mut self) {
602        let prev = self.previous.take();
603        ACTIVE_RECORDER.with(|slot| {
604            *slot.borrow_mut() = prev;
605        });
606    }
607}
608
609pub fn install_recorder(recorder: Arc<TapeRecorder>) -> TapeRecorderGuard {
610    let previous = ACTIVE_RECORDER.with(|slot| slot.replace(Some(recorder)));
611    TapeRecorderGuard { previous }
612}
613
614/// Currently installed recorder, if any. Production callers stay
615/// untouched because nothing installs a recorder outside testbench mode.
616pub fn active_recorder() -> Option<Arc<TapeRecorder>> {
617    ACTIVE_RECORDER.with(|slot| slot.borrow().clone())
618}
619
620/// RAII guard that temporarily changes the phase stamped onto records
621/// from the active recorder.
622pub struct TapePhaseGuard {
623    recorder: Arc<TapeRecorder>,
624    previous: TapePhase,
625}
626
627impl Drop for TapePhaseGuard {
628    fn drop(&mut self) {
629        self.recorder.swap_phase(self.previous);
630    }
631}
632
633/// Enter `phase` for subsequent records emitted by the active recorder.
634/// Returns `None` when tape recording is off.
635pub fn enter_phase(phase: TapePhase) -> Option<TapePhaseGuard> {
636    let recorder = active_recorder()?;
637    let previous = recorder.swap_phase(phase);
638    Some(TapePhaseGuard { recorder, previous })
639}
640
641/// Push a record if a recorder is active. The closure is only evaluated
642/// when recording is on, so the per-axis hooks pay nothing in production.
643pub fn with_active_recorder<F>(build: F)
644where
645    F: FnOnce(&Arc<TapeRecorder>) -> Option<TapeRecordKind>,
646{
647    let Some(recorder) = active_recorder() else {
648        return;
649    };
650    if let Some(kind) = build(&recorder) {
651        recorder.record(kind);
652    }
653}
654
655#[cfg(test)]
656mod tests {
657    use super::*;
658    use tempfile::TempDir;
659
660    fn small_record(seq: u64, dur: u64) -> TapeRecord {
661        TapeRecord {
662            seq,
663            phase: TapePhase::UserScript,
664            virtual_time_ms: seq as i64 * 1000,
665            monotonic_ms: seq as i64 * 1000,
666            kind: TapeRecordKind::ClockSleep { duration_ms: dur },
667        }
668    }
669
670    #[test]
671    fn round_trip_inline_records() {
672        let temp = TempDir::new().unwrap();
673        let path = temp.path().join("run.tape");
674        let mut tape = EventTape::new(TapeHeader::current(
675            Some(1_700_000_000_000),
676            Some("script.harn".to_string()),
677            vec!["a".into()],
678        ));
679        tape.records.push(small_record(0, 250));
680        tape.records.push(small_record(1, 750));
681        tape.persist(&path).unwrap();
682
683        let loaded = EventTape::load(&path).unwrap();
684        assert_eq!(loaded.header.version, TAPE_FORMAT_VERSION);
685        assert_eq!(loaded.header.argv, vec!["a".to_string()]);
686        assert_eq!(loaded.records.len(), 2);
687        match &loaded.records[0].kind {
688            TapeRecordKind::ClockSleep { duration_ms } => assert_eq!(*duration_ms, 250),
689            other => panic!("unexpected: {other:?}"),
690        }
691    }
692
693    #[test]
694    fn recorder_phase_guard_stamps_and_restores() {
695        let recorder = Arc::new(TapeRecorder::new());
696        let _recorder_guard = install_recorder(Arc::clone(&recorder));
697
698        with_active_recorder(|_| Some(TapeRecordKind::ClockSleep { duration_ms: 1 }));
699        {
700            let _phase_guard = enter_phase(TapePhase::RuntimeFinalize).unwrap();
701            with_active_recorder(|_| Some(TapeRecordKind::ClockSleep { duration_ms: 2 }));
702        }
703        with_active_recorder(|_| Some(TapeRecordKind::ClockSleep { duration_ms: 3 }));
704
705        let tape = recorder.snapshot(TapeHeader::current(None, None, Vec::new()));
706        let phases = tape
707            .records
708            .iter()
709            .map(|record| record.phase)
710            .collect::<Vec<_>>();
711        assert_eq!(
712            phases,
713            vec![
714                TapePhase::UserScript,
715                TapePhase::RuntimeFinalize,
716                TapePhase::UserScript
717            ]
718        );
719    }
720
721    #[test]
722    fn large_payloads_spill_to_cas_and_round_trip() {
723        let temp = TempDir::new().unwrap();
724        let path = temp.path().join("run.tape");
725        let mut tape = EventTape::new(TapeHeader::current(None, None, Vec::new()));
726        let big = vec![b'x'; MAX_INLINE_BYTES + 32];
727        let payload = build_payload(big.clone(), &mut tape.cas);
728        let hash = payload.content_hash().to_string();
729        let kind = TapeRecordKind::ProcessSpawn {
730            program: "/bin/echo".to_string(),
731            args: vec!["x".to_string()],
732            cwd: None,
733            exit_code: 0,
734            duration_ms: 1,
735            stdout_payload: payload,
736            stderr_payload: build_payload(Vec::new(), &mut tape.cas),
737        };
738        tape.records.push(TapeRecord {
739            seq: 0,
740            phase: TapePhase::UserScript,
741            virtual_time_ms: 0,
742            monotonic_ms: 0,
743            kind,
744        });
745        tape.persist(&path).unwrap();
746
747        // CAS sidecar exists.
748        assert!(path.with_extension("tape.cas").exists() || cas_dir_for(&path).exists());
749        let cas_dir = cas_dir_for(&path);
750        assert!(cas_dir.join(&hash).exists());
751
752        let loaded = EventTape::load(&path).unwrap();
753        let resolved = match &loaded.records[0].kind {
754            TapeRecordKind::ProcessSpawn { stdout_payload, .. } => {
755                loaded.resolve_payload(stdout_payload).unwrap()
756            }
757            other => panic!("unexpected: {other:?}"),
758        };
759        assert_eq!(resolved.len(), big.len());
760    }
761
762    #[test]
763    fn rejects_newer_version() {
764        let temp = TempDir::new().unwrap();
765        let path = temp.path().join("future.tape");
766        std::fs::write(
767            &path,
768            r#"{"type":"header","version":99,"harn_version":"x","started_at_unix_ms":null,"script_path":null,"argv":[]}
769"#,
770        )
771        .unwrap();
772        let err = EventTape::load(&path).unwrap_err();
773        assert!(err.contains("version 99"), "{err}");
774    }
775
776    #[test]
777    fn recorder_assigns_monotonic_seq() {
778        let recorder = Arc::new(TapeRecorder::new());
779        recorder.record(TapeRecordKind::ClockSleep { duration_ms: 1 });
780        recorder.record(TapeRecordKind::ClockSleep { duration_ms: 2 });
781        let snapshot = recorder.snapshot(TapeHeader::current(None, None, Vec::new()));
782        assert_eq!(snapshot.records[0].seq, 0);
783        assert_eq!(snapshot.records[1].seq, 1);
784    }
785}