Skip to main content

harn_vm/testbench/
process_tape.rs

1//! Subprocess record/replay tape for the testbench.
2//!
3//! `ProcessTape` is a thread-local override consulted by
4//! [`crate::stdlib::sandbox::command_output`] before it spawns. In record
5//! mode the subprocess actually runs, output is captured, and a tape
6//! entry is appended. In replay mode the (program, args, cwd) tuple is
7//! looked up in the loaded tape and the recorded output is returned —
8//! no real OS spawn, and the unified mock clock is advanced by the
9//! recorded duration so script-observed time stays consistent.
10//!
11//! ## What the tape does NOT cover
12//!
13//! Subprocesses cannot observe the testbench's mock clock — the kernel
14//! always reads real wall time. The tape captures the *parent-observed*
15//! duration and replays it into the parent's clock, but a script that
16//! depends on a subprocess' internal timing will see real wall-clock
17//! time. Full virtualization is the WASI-mediated subprocess child
18//! issue — see `docs/src/dev/testbench.md`.
19
20use std::cell::RefCell;
21use std::collections::BTreeMap;
22use std::path::Path;
23use std::process::{ExitStatus, Output};
24use std::sync::atomic::{AtomicUsize, Ordering};
25use std::sync::Arc;
26use std::sync::Mutex;
27use std::time::Duration;
28
29use serde::{Deserialize, Serialize};
30
31use crate::clock_mock;
32use crate::testbench::tape::{self, TapeRecordKind};
33
34/// Whether the active tape is recording new entries or replaying an
35/// existing tape.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum ProcessTapeMode {
38    Record,
39    Replay,
40}
41
42/// One captured (or to-be-replayed) subprocess invocation.
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
44pub struct TapeEntry {
45    pub program: String,
46    #[serde(default)]
47    pub args: Vec<String>,
48    #[serde(default)]
49    pub cwd: Option<String>,
50    /// Reserved for future env-var matching. Currently unpopulated;
51    /// invocations match on `(program, args, cwd)` only.
52    #[serde(default)]
53    pub env: BTreeMap<String, String>,
54    #[serde(default)]
55    pub stdout: String,
56    #[serde(default)]
57    pub stderr: String,
58    #[serde(default)]
59    pub exit_code: i32,
60    /// Duration the parent observed via the *unified mock clock*. In
61    /// record mode this is the wall-clock duration the subprocess took.
62    /// Replay advances the testbench clock by this delta.
63    #[serde(default)]
64    pub duration_ms: u64,
65}
66
67#[derive(Debug)]
68pub struct ProcessTape {
69    mode: ProcessTapeMode,
70    entries: Vec<TapeEntry>,
71    cursor: AtomicUsize,
72    captured: Mutex<Vec<TapeEntry>>,
73}
74
75impl ProcessTape {
76    pub fn recording() -> Self {
77        Self {
78            mode: ProcessTapeMode::Record,
79            entries: Vec::new(),
80            cursor: AtomicUsize::new(0),
81            captured: Mutex::new(Vec::new()),
82        }
83    }
84
85    pub fn replay_from(entries: Vec<TapeEntry>) -> Self {
86        Self {
87            mode: ProcessTapeMode::Replay,
88            entries,
89            cursor: AtomicUsize::new(0),
90            captured: Mutex::new(Vec::new()),
91        }
92    }
93
94    pub fn load(path: &Path) -> Result<Self, String> {
95        let body = std::fs::read_to_string(path)
96            .map_err(|err| format!("read {}: {err}", path.display()))?;
97        let entries = if body.trim().is_empty() {
98            Vec::new()
99        } else {
100            serde_json::from_str::<Vec<TapeEntry>>(&body)
101                .map_err(|err| format!("parse {}: {err}", path.display()))?
102        };
103        Ok(Self::replay_from(entries))
104    }
105
106    pub fn persist(&self, path: &Path) -> Result<(), String> {
107        if let Some(parent) = path.parent() {
108            if !parent.as_os_str().is_empty() {
109                std::fs::create_dir_all(parent)
110                    .map_err(|err| format!("mkdir {}: {err}", parent.display()))?;
111            }
112        }
113        let recorded = self.recorded();
114        let body = serde_json::to_string_pretty(&recorded)
115            .map_err(|err| format!("serialize tape: {err}"))?;
116        std::fs::write(path, body).map_err(|err| format!("write {}: {err}", path.display()))
117    }
118
119    pub fn mode(&self) -> ProcessTapeMode {
120        self.mode
121    }
122
123    pub fn recorded(&self) -> Vec<TapeEntry> {
124        self.captured
125            .lock()
126            .expect("process tape captured mutex poisoned")
127            .clone()
128    }
129
130    /// Whether every replay entry has been consumed.
131    pub fn fully_consumed(&self) -> bool {
132        self.cursor.load(Ordering::SeqCst) >= self.entries.len()
133    }
134
135    fn record_entry(&self, entry: TapeEntry) {
136        self.captured
137            .lock()
138            .expect("process tape captured mutex poisoned")
139            .push(entry);
140    }
141
142    fn next_replay(&self, expected: &TapeEntry) -> Result<TapeEntry, String> {
143        let idx = self.cursor.fetch_add(1, Ordering::SeqCst);
144        let candidate = self.entries.get(idx).cloned().ok_or_else(|| {
145            format!(
146                "process tape exhausted at call #{idx}: program={:?} args={:?}",
147                expected.program, expected.args
148            )
149        })?;
150        if !invocation_matches(expected, &candidate) {
151            return Err(format!(
152                "process tape diverged at call #{idx}: expected {:?}({:?}) cwd={:?}, tape has {:?}({:?}) cwd={:?}",
153                expected.program,
154                expected.args,
155                expected.cwd,
156                candidate.program,
157                candidate.args,
158                candidate.cwd
159            ));
160        }
161        Ok(candidate)
162    }
163}
164
165fn invocation_matches(expected: &TapeEntry, recorded: &TapeEntry) -> bool {
166    if expected.program != recorded.program || expected.args != recorded.args {
167        return false;
168    }
169    // A `null` cwd in the recorded tape acts as a wildcard: portable
170    // fixtures don't pin to a particular machine layout. An explicit
171    // recorded cwd still has to match exactly.
172    match &recorded.cwd {
173        None => true,
174        Some(_) => expected.cwd == recorded.cwd,
175    }
176}
177
178thread_local! {
179    static ACTIVE_TAPE: RefCell<Option<Arc<ProcessTape>>> = const { RefCell::new(None) };
180}
181
182pub struct ProcessTapeGuard {
183    previous: Option<Arc<ProcessTape>>,
184}
185
186impl Drop for ProcessTapeGuard {
187    fn drop(&mut self) {
188        let prev = self.previous.take();
189        ACTIVE_TAPE.with(|slot| {
190            *slot.borrow_mut() = prev;
191        });
192    }
193}
194
195pub fn install_process_tape(tape: Arc<ProcessTape>) -> ProcessTapeGuard {
196    let previous = ACTIVE_TAPE.with(|slot| slot.replace(Some(tape)));
197    ProcessTapeGuard { previous }
198}
199
200pub fn active_tape() -> Option<Arc<ProcessTape>> {
201    ACTIVE_TAPE.with(|slot| slot.borrow().clone())
202}
203
204/// If a tape is active, intercept the spawn. Returns `Some(output)` when
205/// the call was satisfied by the tape; `None` means the caller should
206/// spawn a real subprocess. In record mode this returns `None` (the real
207/// subprocess will be spawned by the caller, and a follow-up call to
208/// [`start_recording`] / [`RecordingSpan::finish`] should append the
209/// entry to the tape).
210///
211/// When a WASI toolchain is active and `<toolchain>/<program>.wasm`
212/// exists, the WASM module is run under wasmtime (with the testbench
213/// clock virtualized into `clock_time_get` and `poll_oneoff`) and its
214/// output is returned without consulting the process tape. WASI hits
215/// take precedence over native record/replay so a partial `.wasm`
216/// toolchain can coexist with a recorded native tape — wasm-backed
217/// programs route through wasmtime, the rest hit the tape or host.
218pub fn intercept_spawn(
219    program: &str,
220    args: &[String],
221    cwd: Option<&Path>,
222) -> Option<Result<Output, String>> {
223    #[cfg(feature = "testbench-wasi")]
224    if let Some(intercepted) = wasi_intercept(program, args, cwd) {
225        return Some(intercepted);
226    }
227
228    let tape = active_tape()?;
229    if matches!(tape.mode(), ProcessTapeMode::Record) {
230        return None;
231    }
232    let expected = TapeEntry {
233        program: program.to_string(),
234        args: args.to_vec(),
235        cwd: cwd.map(|p| p.to_string_lossy().into_owned()),
236        env: BTreeMap::new(),
237        stdout: String::new(),
238        stderr: String::new(),
239        exit_code: 0,
240        duration_ms: 0,
241    };
242    Some(match tape.next_replay(&expected) {
243        Ok(entry) => {
244            if entry.duration_ms > 0 {
245                clock_mock::advance(Duration::from_millis(entry.duration_ms));
246            }
247            record_unified_spawn(&entry);
248            Ok(synthesize_output(&entry))
249        }
250        Err(err) => Err(err),
251    })
252}
253
254#[cfg(feature = "testbench-wasi")]
255fn wasi_intercept(
256    program: &str,
257    args: &[String],
258    cwd: Option<&Path>,
259) -> Option<Result<Output, String>> {
260    use crate::testbench::wasi_process;
261    let module = wasi_process::wasi_module_for(program)?;
262    let env: Vec<(String, String)> = Vec::new();
263    let result = wasi_process::run_wasm_module(&module, args, &env);
264    Some(result.map(|out| {
265        let entry = TapeEntry {
266            program: program.to_string(),
267            args: args.to_vec(),
268            cwd: cwd.map(|p| p.to_string_lossy().into_owned()),
269            env: BTreeMap::new(),
270            stdout: String::from_utf8_lossy(&out.stdout).into_owned(),
271            stderr: String::from_utf8_lossy(&out.stderr).into_owned(),
272            exit_code: out.exit_code,
273            duration_ms: out.virtual_duration_ms,
274        };
275        record_unified_spawn(&entry);
276        Output {
277            status: synthesize_status(out.exit_code),
278            stdout: out.stdout,
279            stderr: out.stderr,
280        }
281    }))
282}
283
284fn record_unified_spawn(entry: &TapeEntry) {
285    tape::with_active_recorder(|recorder| {
286        let stdout_payload = recorder.payload_from_bytes(entry.stdout.as_bytes().to_vec());
287        let stderr_payload = recorder.payload_from_bytes(entry.stderr.as_bytes().to_vec());
288        Some(TapeRecordKind::ProcessSpawn {
289            program: entry.program.clone(),
290            args: entry.args.clone(),
291            cwd: entry.cwd.clone(),
292            exit_code: entry.exit_code,
293            duration_ms: entry.duration_ms,
294            stdout_payload,
295            stderr_payload,
296        })
297    });
298}
299
300/// Begin recording a subprocess invocation. Returns `Some(span)` when a
301/// tape is in record mode; `None` otherwise. The span captures the
302/// invocation's start time on the injected clock so [`RecordingSpan::finish`]
303/// can stamp the elapsed delta deterministically — under a paused mock
304/// clock the recording is virtual time, matching what replay will
305/// advance.
306pub fn start_recording(
307    program: &str,
308    args: &[String],
309    cwd: Option<&Path>,
310) -> Option<RecordingSpan> {
311    let tape = active_tape()?;
312    if !matches!(tape.mode(), ProcessTapeMode::Record) {
313        return None;
314    }
315    Some(RecordingSpan {
316        tape,
317        program: program.to_string(),
318        args: args.to_vec(),
319        cwd: cwd.map(|p| p.to_string_lossy().into_owned()),
320        started_at: clock_mock::instant_now(),
321    })
322}
323
324/// Pending tape entry for a subprocess invocation that is currently
325/// running. Stamping the elapsed time happens at [`RecordingSpan::finish`]
326/// using the unified mock clock, so testbench callers see virtual time
327/// in their tapes.
328pub struct RecordingSpan {
329    tape: Arc<ProcessTape>,
330    program: String,
331    args: Vec<String>,
332    cwd: Option<String>,
333    started_at: clock_mock::ClockInstant,
334}
335
336impl RecordingSpan {
337    pub fn finish(self, output: &Output) {
338        let duration = clock_mock::instant_now().duration_since(self.started_at);
339        let entry = TapeEntry {
340            program: self.program,
341            args: self.args,
342            cwd: self.cwd,
343            env: BTreeMap::new(),
344            stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
345            stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
346            exit_code: output.status.code().unwrap_or(-1),
347            duration_ms: duration.as_millis().min(u64::MAX as u128) as u64,
348        };
349        record_unified_spawn(&entry);
350        self.tape.record_entry(entry);
351    }
352}
353
354#[cfg(unix)]
355fn synthesize_status(code: i32) -> ExitStatus {
356    use std::os::unix::process::ExitStatusExt;
357    ExitStatus::from_raw((code & 0xff) << 8)
358}
359
360#[cfg(windows)]
361fn synthesize_status(code: i32) -> ExitStatus {
362    use std::os::windows::process::ExitStatusExt;
363    ExitStatus::from_raw(code as u32)
364}
365
366fn synthesize_output(entry: &TapeEntry) -> Output {
367    Output {
368        status: synthesize_status(entry.exit_code),
369        stdout: entry.stdout.as_bytes().to_vec(),
370        stderr: entry.stderr.as_bytes().to_vec(),
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377    use std::process::Output;
378
379    #[test]
380    fn replay_emits_recorded_output_and_advances_clock() {
381        let tape = ProcessTape::replay_from(vec![TapeEntry {
382            program: "git".to_string(),
383            args: vec!["status".to_string()],
384            cwd: Some("/tmp/repo".to_string()),
385            env: BTreeMap::new(),
386            stdout: "On branch main\n".to_string(),
387            stderr: String::new(),
388            exit_code: 0,
389            duration_ms: 250,
390        }]);
391        let _guard = install_process_tape(Arc::new(tape));
392        let _clock = clock_mock::install_override(clock_mock::MockClock::at_wall_ms(1_000_000));
393        let before = clock_mock::now_ms();
394        let intercepted =
395            intercept_spawn("git", &["status".to_string()], Some(Path::new("/tmp/repo")))
396                .expect("tape produced output");
397        let output = intercepted.expect("replay succeeded");
398        let after = clock_mock::now_ms();
399        assert_eq!(output.stdout, b"On branch main\n");
400        assert_eq!(output.status.code(), Some(0));
401        assert_eq!(after - before, 250);
402    }
403
404    #[test]
405    fn replay_diverges_when_program_does_not_match() {
406        let tape = ProcessTape::replay_from(vec![TapeEntry {
407            program: "git".to_string(),
408            args: vec!["status".to_string()],
409            cwd: None,
410            env: BTreeMap::new(),
411            stdout: String::new(),
412            stderr: String::new(),
413            exit_code: 0,
414            duration_ms: 0,
415        }]);
416        let _guard = install_process_tape(Arc::new(tape));
417        let result = intercept_spawn("gh", &["pr".to_string()], None)
418            .expect("tape active")
419            .expect_err("divergence reported");
420        assert!(result.contains("diverged"), "{result}");
421    }
422
423    #[test]
424    fn record_mode_appends_completed_entries() {
425        let tape = Arc::new(ProcessTape::recording());
426        let _guard = install_process_tape(Arc::clone(&tape));
427        // Drive the recording span under a paused mock clock so the
428        // captured duration is deterministic — proves the duration
429        // capture honors the injected clock, not wall time.
430        let _clock = clock_mock::install_override(clock_mock::MockClock::at_wall_ms(0));
431        let span = start_recording("echo", &["hi".to_string()], None).expect("recording active");
432        clock_mock::advance(Duration::from_millis(7));
433        span.finish(&Output {
434            status: synthesize_status(0),
435            stdout: b"hi\n".to_vec(),
436            stderr: Vec::new(),
437        });
438        let recorded = tape.recorded();
439        assert_eq!(recorded.len(), 1);
440        assert_eq!(recorded[0].stdout, "hi\n");
441        assert_eq!(recorded[0].duration_ms, 7);
442    }
443
444    #[test]
445    fn replay_recorded_cwd_none_acts_as_wildcard() {
446        // A fixture authored with `cwd: null` is portable — it matches
447        // whatever cwd the conformance runner happens to set.
448        let tape = ProcessTape::replay_from(vec![TapeEntry {
449            program: "echo".to_string(),
450            args: vec!["hi".to_string()],
451            cwd: None,
452            env: BTreeMap::new(),
453            stdout: "hi\n".to_string(),
454            stderr: String::new(),
455            exit_code: 0,
456            duration_ms: 0,
457        }]);
458        let _guard = install_process_tape(Arc::new(tape));
459        let intercepted = intercept_spawn(
460            "echo",
461            &["hi".to_string()],
462            Some(Path::new("/some/runner/cwd")),
463        )
464        .expect("tape produced output")
465        .expect("replay succeeded");
466        assert_eq!(intercepted.stdout, b"hi\n");
467    }
468
469    #[test]
470    fn replay_exhausted_surfaces_clear_error() {
471        let tape = ProcessTape::replay_from(Vec::new());
472        let _guard = install_process_tape(Arc::new(tape));
473        let result = intercept_spawn("git", &[], None)
474            .expect("tape active")
475            .expect_err("exhausted");
476        assert!(result.contains("exhausted"), "{result}");
477    }
478}