1use std::cell::RefCell;
21use std::collections::BTreeMap;
22use std::path::Path;
23use std::process::{ExitStatus, Output};
24use std::sync::atomic::{AtomicUsize, Ordering};
25use std::sync::Arc;
26use std::sync::Mutex;
27use std::time::Duration;
28
29use serde::{Deserialize, Serialize};
30
31use crate::clock_mock;
32use crate::testbench::tape::{self, TapeRecordKind};
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum ProcessTapeMode {
38 Record,
39 Replay,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
44pub struct TapeEntry {
45 pub program: String,
46 #[serde(default)]
47 pub args: Vec<String>,
48 #[serde(default)]
49 pub cwd: Option<String>,
50 #[serde(default)]
53 pub env: BTreeMap<String, String>,
54 #[serde(default)]
55 pub stdout: String,
56 #[serde(default)]
57 pub stderr: String,
58 #[serde(default)]
59 pub exit_code: i32,
60 #[serde(default)]
64 pub duration_ms: u64,
65}
66
67#[derive(Debug)]
68pub struct ProcessTape {
69 mode: ProcessTapeMode,
70 entries: Vec<TapeEntry>,
71 cursor: AtomicUsize,
72 captured: Mutex<Vec<TapeEntry>>,
73}
74
75impl ProcessTape {
76 pub fn recording() -> Self {
77 Self {
78 mode: ProcessTapeMode::Record,
79 entries: Vec::new(),
80 cursor: AtomicUsize::new(0),
81 captured: Mutex::new(Vec::new()),
82 }
83 }
84
85 pub fn replay_from(entries: Vec<TapeEntry>) -> Self {
86 Self {
87 mode: ProcessTapeMode::Replay,
88 entries,
89 cursor: AtomicUsize::new(0),
90 captured: Mutex::new(Vec::new()),
91 }
92 }
93
94 pub fn load(path: &Path) -> Result<Self, String> {
95 let body = std::fs::read_to_string(path)
96 .map_err(|err| format!("read {}: {err}", path.display()))?;
97 let entries = if body.trim().is_empty() {
98 Vec::new()
99 } else {
100 serde_json::from_str::<Vec<TapeEntry>>(&body)
101 .map_err(|err| format!("parse {}: {err}", path.display()))?
102 };
103 Ok(Self::replay_from(entries))
104 }
105
106 pub fn persist(&self, path: &Path) -> Result<(), String> {
107 if let Some(parent) = path.parent() {
108 if !parent.as_os_str().is_empty() {
109 std::fs::create_dir_all(parent)
110 .map_err(|err| format!("mkdir {}: {err}", parent.display()))?;
111 }
112 }
113 let recorded = self.recorded();
114 let body = serde_json::to_string_pretty(&recorded)
115 .map_err(|err| format!("serialize tape: {err}"))?;
116 std::fs::write(path, body).map_err(|err| format!("write {}: {err}", path.display()))
117 }
118
119 pub fn mode(&self) -> ProcessTapeMode {
120 self.mode
121 }
122
123 pub fn recorded(&self) -> Vec<TapeEntry> {
124 self.captured
125 .lock()
126 .expect("process tape captured mutex poisoned")
127 .clone()
128 }
129
130 pub fn fully_consumed(&self) -> bool {
132 self.cursor.load(Ordering::SeqCst) >= self.entries.len()
133 }
134
135 fn record_entry(&self, entry: TapeEntry) {
136 self.captured
137 .lock()
138 .expect("process tape captured mutex poisoned")
139 .push(entry);
140 }
141
142 fn next_replay(&self, expected: &TapeEntry) -> Result<TapeEntry, String> {
143 let idx = self.cursor.fetch_add(1, Ordering::SeqCst);
144 let candidate = self.entries.get(idx).cloned().ok_or_else(|| {
145 format!(
146 "process tape exhausted at call #{idx}: program={:?} args={:?}",
147 expected.program, expected.args
148 )
149 })?;
150 if !invocation_matches(expected, &candidate) {
151 return Err(format!(
152 "process tape diverged at call #{idx}: expected {:?}({:?}) cwd={:?}, tape has {:?}({:?}) cwd={:?}",
153 expected.program,
154 expected.args,
155 expected.cwd,
156 candidate.program,
157 candidate.args,
158 candidate.cwd
159 ));
160 }
161 Ok(candidate)
162 }
163}
164
165fn invocation_matches(expected: &TapeEntry, recorded: &TapeEntry) -> bool {
166 if expected.program != recorded.program || expected.args != recorded.args {
167 return false;
168 }
169 match &recorded.cwd {
173 None => true,
174 Some(_) => expected.cwd == recorded.cwd,
175 }
176}
177
178thread_local! {
179 static ACTIVE_TAPE: RefCell<Option<Arc<ProcessTape>>> = const { RefCell::new(None) };
180}
181
182pub struct ProcessTapeGuard {
183 previous: Option<Arc<ProcessTape>>,
184}
185
186impl Drop for ProcessTapeGuard {
187 fn drop(&mut self) {
188 let prev = self.previous.take();
189 ACTIVE_TAPE.with(|slot| {
190 *slot.borrow_mut() = prev;
191 });
192 }
193}
194
195pub fn install_process_tape(tape: Arc<ProcessTape>) -> ProcessTapeGuard {
196 let previous = ACTIVE_TAPE.with(|slot| slot.replace(Some(tape)));
197 ProcessTapeGuard { previous }
198}
199
200pub fn active_tape() -> Option<Arc<ProcessTape>> {
201 ACTIVE_TAPE.with(|slot| slot.borrow().clone())
202}
203
204pub fn intercept_spawn(
219 program: &str,
220 args: &[String],
221 cwd: Option<&Path>,
222) -> Option<Result<Output, String>> {
223 #[cfg(feature = "testbench-wasi")]
224 if let Some(intercepted) = wasi_intercept(program, args, cwd) {
225 return Some(intercepted);
226 }
227
228 let tape = active_tape()?;
229 if matches!(tape.mode(), ProcessTapeMode::Record) {
230 return None;
231 }
232 let expected = TapeEntry {
233 program: program.to_string(),
234 args: args.to_vec(),
235 cwd: cwd.map(|p| p.to_string_lossy().into_owned()),
236 env: BTreeMap::new(),
237 stdout: String::new(),
238 stderr: String::new(),
239 exit_code: 0,
240 duration_ms: 0,
241 };
242 Some(match tape.next_replay(&expected) {
243 Ok(entry) => {
244 if entry.duration_ms > 0 {
245 clock_mock::advance(Duration::from_millis(entry.duration_ms));
246 }
247 record_unified_spawn(&entry);
248 Ok(synthesize_output(&entry))
249 }
250 Err(err) => Err(err),
251 })
252}
253
254#[cfg(feature = "testbench-wasi")]
255fn wasi_intercept(
256 program: &str,
257 args: &[String],
258 cwd: Option<&Path>,
259) -> Option<Result<Output, String>> {
260 use crate::testbench::wasi_process;
261 let module = wasi_process::wasi_module_for(program)?;
262 let env: Vec<(String, String)> = Vec::new();
263 let result = wasi_process::run_wasm_module(&module, args, &env);
264 Some(result.map(|out| {
265 let entry = TapeEntry {
266 program: program.to_string(),
267 args: args.to_vec(),
268 cwd: cwd.map(|p| p.to_string_lossy().into_owned()),
269 env: BTreeMap::new(),
270 stdout: String::from_utf8_lossy(&out.stdout).into_owned(),
271 stderr: String::from_utf8_lossy(&out.stderr).into_owned(),
272 exit_code: out.exit_code,
273 duration_ms: out.virtual_duration_ms,
274 };
275 record_unified_spawn(&entry);
276 Output {
277 status: synthesize_status(out.exit_code),
278 stdout: out.stdout,
279 stderr: out.stderr,
280 }
281 }))
282}
283
284fn record_unified_spawn(entry: &TapeEntry) {
285 tape::with_active_recorder(|recorder| {
286 let stdout_payload = recorder.payload_from_bytes(entry.stdout.as_bytes().to_vec());
287 let stderr_payload = recorder.payload_from_bytes(entry.stderr.as_bytes().to_vec());
288 Some(TapeRecordKind::ProcessSpawn {
289 program: entry.program.clone(),
290 args: entry.args.clone(),
291 cwd: entry.cwd.clone(),
292 exit_code: entry.exit_code,
293 duration_ms: entry.duration_ms,
294 stdout_payload,
295 stderr_payload,
296 })
297 });
298}
299
300pub fn start_recording(
307 program: &str,
308 args: &[String],
309 cwd: Option<&Path>,
310) -> Option<RecordingSpan> {
311 let tape = active_tape()?;
312 if !matches!(tape.mode(), ProcessTapeMode::Record) {
313 return None;
314 }
315 Some(RecordingSpan {
316 tape,
317 program: program.to_string(),
318 args: args.to_vec(),
319 cwd: cwd.map(|p| p.to_string_lossy().into_owned()),
320 started_at: clock_mock::instant_now(),
321 })
322}
323
324pub struct RecordingSpan {
329 tape: Arc<ProcessTape>,
330 program: String,
331 args: Vec<String>,
332 cwd: Option<String>,
333 started_at: clock_mock::ClockInstant,
334}
335
336impl RecordingSpan {
337 pub fn finish(self, output: &Output) {
338 let duration = clock_mock::instant_now().duration_since(self.started_at);
339 let entry = TapeEntry {
340 program: self.program,
341 args: self.args,
342 cwd: self.cwd,
343 env: BTreeMap::new(),
344 stdout: String::from_utf8_lossy(&output.stdout).into_owned(),
345 stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
346 exit_code: output.status.code().unwrap_or(-1),
347 duration_ms: duration.as_millis().min(u64::MAX as u128) as u64,
348 };
349 record_unified_spawn(&entry);
350 self.tape.record_entry(entry);
351 }
352}
353
354#[cfg(unix)]
355fn synthesize_status(code: i32) -> ExitStatus {
356 use std::os::unix::process::ExitStatusExt;
357 ExitStatus::from_raw((code & 0xff) << 8)
358}
359
360#[cfg(windows)]
361fn synthesize_status(code: i32) -> ExitStatus {
362 use std::os::windows::process::ExitStatusExt;
363 ExitStatus::from_raw(code as u32)
364}
365
366fn synthesize_output(entry: &TapeEntry) -> Output {
367 Output {
368 status: synthesize_status(entry.exit_code),
369 stdout: entry.stdout.as_bytes().to_vec(),
370 stderr: entry.stderr.as_bytes().to_vec(),
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377 use std::process::Output;
378
379 #[test]
380 fn replay_emits_recorded_output_and_advances_clock() {
381 let tape = ProcessTape::replay_from(vec![TapeEntry {
382 program: "git".to_string(),
383 args: vec!["status".to_string()],
384 cwd: Some("/tmp/repo".to_string()),
385 env: BTreeMap::new(),
386 stdout: "On branch main\n".to_string(),
387 stderr: String::new(),
388 exit_code: 0,
389 duration_ms: 250,
390 }]);
391 let _guard = install_process_tape(Arc::new(tape));
392 let _clock = clock_mock::install_override(clock_mock::MockClock::at_wall_ms(1_000_000));
393 let before = clock_mock::now_ms();
394 let intercepted =
395 intercept_spawn("git", &["status".to_string()], Some(Path::new("/tmp/repo")))
396 .expect("tape produced output");
397 let output = intercepted.expect("replay succeeded");
398 let after = clock_mock::now_ms();
399 assert_eq!(output.stdout, b"On branch main\n");
400 assert_eq!(output.status.code(), Some(0));
401 assert_eq!(after - before, 250);
402 }
403
404 #[test]
405 fn replay_diverges_when_program_does_not_match() {
406 let tape = ProcessTape::replay_from(vec![TapeEntry {
407 program: "git".to_string(),
408 args: vec!["status".to_string()],
409 cwd: None,
410 env: BTreeMap::new(),
411 stdout: String::new(),
412 stderr: String::new(),
413 exit_code: 0,
414 duration_ms: 0,
415 }]);
416 let _guard = install_process_tape(Arc::new(tape));
417 let result = intercept_spawn("gh", &["pr".to_string()], None)
418 .expect("tape active")
419 .expect_err("divergence reported");
420 assert!(result.contains("diverged"), "{result}");
421 }
422
423 #[test]
424 fn record_mode_appends_completed_entries() {
425 let tape = Arc::new(ProcessTape::recording());
426 let _guard = install_process_tape(Arc::clone(&tape));
427 let _clock = clock_mock::install_override(clock_mock::MockClock::at_wall_ms(0));
431 let span = start_recording("echo", &["hi".to_string()], None).expect("recording active");
432 clock_mock::advance(Duration::from_millis(7));
433 span.finish(&Output {
434 status: synthesize_status(0),
435 stdout: b"hi\n".to_vec(),
436 stderr: Vec::new(),
437 });
438 let recorded = tape.recorded();
439 assert_eq!(recorded.len(), 1);
440 assert_eq!(recorded[0].stdout, "hi\n");
441 assert_eq!(recorded[0].duration_ms, 7);
442 }
443
444 #[test]
445 fn replay_recorded_cwd_none_acts_as_wildcard() {
446 let tape = ProcessTape::replay_from(vec![TapeEntry {
449 program: "echo".to_string(),
450 args: vec!["hi".to_string()],
451 cwd: None,
452 env: BTreeMap::new(),
453 stdout: "hi\n".to_string(),
454 stderr: String::new(),
455 exit_code: 0,
456 duration_ms: 0,
457 }]);
458 let _guard = install_process_tape(Arc::new(tape));
459 let intercepted = intercept_spawn(
460 "echo",
461 &["hi".to_string()],
462 Some(Path::new("/some/runner/cwd")),
463 )
464 .expect("tape produced output")
465 .expect("replay succeeded");
466 assert_eq!(intercepted.stdout, b"hi\n");
467 }
468
469 #[test]
470 fn replay_exhausted_surfaces_clear_error() {
471 let tape = ProcessTape::replay_from(Vec::new());
472 let _guard = install_process_tape(Arc::new(tape));
473 let result = intercept_spawn("git", &[], None)
474 .expect("tape active")
475 .expect_err("exhausted");
476 assert!(result.contains("exhausted"), "{result}");
477 }
478}