1use std::cell::RefCell;
43use std::collections::BTreeMap;
44use std::path::{Path, PathBuf};
45use std::sync::atomic::{AtomicU64, Ordering};
46use std::sync::{Arc, Mutex};
47
48use serde::{Deserialize, Serialize};
49
50use crate::clock_mock;
51
52pub const TAPE_FORMAT_VERSION: u32 = 1;
55
56pub const MAX_INLINE_BYTES: usize = 4 * 1024;
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
65pub struct TapeHeader {
66 pub version: u32,
67 pub harn_version: String,
71 #[serde(default)]
75 pub started_at_unix_ms: Option<i64>,
76 #[serde(default)]
79 pub script_path: Option<String>,
80 #[serde(default)]
83 pub argv: Vec<String>,
84}
85
86impl TapeHeader {
87 pub fn current(
88 started_at_unix_ms: Option<i64>,
89 script_path: Option<String>,
90 argv: Vec<String>,
91 ) -> Self {
92 Self {
93 version: TAPE_FORMAT_VERSION,
94 harn_version: env!("CARGO_PKG_VERSION").to_string(),
95 started_at_unix_ms,
96 script_path,
97 argv,
98 }
99 }
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
106#[serde(tag = "type", rename_all = "snake_case")]
107enum TapeLine {
108 Header(TapeHeader),
109 Record(TapeRecord),
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct TapeRecord {
117 pub seq: u64,
119 pub virtual_time_ms: i64,
122 pub monotonic_ms: i64,
126 pub kind: TapeRecordKind,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
135#[serde(tag = "kind", rename_all = "snake_case")]
136pub enum TapeRecordKind {
137 ClockRead { source: ClockSource, value_ms: i64 },
142 ClockSleep { duration_ms: u64 },
145 LlmCall {
150 request_digest: String,
151 response: TapePayload,
152 },
153 FileRead {
157 path: String,
158 content_hash: String,
159 len_bytes: u64,
160 },
161 FileWrite {
163 path: String,
164 content_hash: String,
165 len_bytes: u64,
166 },
167 FileDelete { path: String },
169 ProcessSpawn {
173 program: String,
174 args: Vec<String>,
175 cwd: Option<String>,
176 exit_code: i32,
177 duration_ms: u64,
178 stdout_payload: TapePayload,
179 stderr_payload: TapePayload,
180 },
181 #[serde(other)]
185 Unknown,
186}
187
188impl TapeRecordKind {
189 pub fn label(&self) -> &'static str {
194 match self {
195 Self::ClockRead { .. } => "clock_read",
196 Self::ClockSleep { .. } => "clock_sleep",
197 Self::LlmCall { .. } => "llm_call",
198 Self::FileRead { .. } => "file_read",
199 Self::FileWrite { .. } => "file_write",
200 Self::FileDelete { .. } => "file_delete",
201 Self::ProcessSpawn { .. } => "process_spawn",
202 Self::Unknown => "unknown",
203 }
204 }
205}
206
207#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
210#[serde(rename_all = "snake_case")]
211pub enum ClockSource {
212 Wall,
213 Monotonic,
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
219#[serde(untagged)]
220pub enum TapePayload {
221 Inline { content_hash: String, text: String },
224 Cas {
226 content_hash: String,
227 len_bytes: u64,
228 },
229}
230
231impl TapePayload {
232 pub fn content_hash(&self) -> &str {
233 match self {
234 Self::Inline { content_hash, .. } | Self::Cas { content_hash, .. } => content_hash,
235 }
236 }
237
238 pub fn len_bytes(&self) -> u64 {
239 match self {
240 Self::Inline { text, .. } => text.len() as u64,
241 Self::Cas { len_bytes, .. } => *len_bytes,
242 }
243 }
244}
245
246pub fn content_hash(bytes: &[u8]) -> String {
249 blake3::hash(bytes).to_hex().to_string()
250}
251
252fn build_payload(bytes: Vec<u8>, cas: &mut BTreeMap<String, Vec<u8>>) -> TapePayload {
255 let hash = content_hash(&bytes);
256 if bytes.len() > MAX_INLINE_BYTES {
257 let len_bytes = bytes.len() as u64;
258 cas.entry(hash.clone()).or_insert(bytes);
259 TapePayload::Cas {
260 content_hash: hash,
261 len_bytes,
262 }
263 } else {
264 let text = match String::from_utf8(bytes) {
265 Ok(text) => text,
266 Err(error) => {
267 let bytes = error.into_bytes();
271 let len_bytes = bytes.len() as u64;
272 cas.entry(hash.clone()).or_insert(bytes);
273 return TapePayload::Cas {
274 content_hash: hash,
275 len_bytes,
276 };
277 }
278 };
279 TapePayload::Inline {
280 content_hash: hash,
281 text,
282 }
283 }
284}
285
286#[derive(Debug, Clone)]
290pub struct EventTape {
291 pub header: TapeHeader,
292 pub records: Vec<TapeRecord>,
293 cas: BTreeMap<String, Vec<u8>>,
297}
298
299impl EventTape {
300 pub fn new(header: TapeHeader) -> Self {
301 Self {
302 header,
303 records: Vec::new(),
304 cas: BTreeMap::new(),
305 }
306 }
307
308 pub fn resolve_payload(&self, payload: &TapePayload) -> Result<Vec<u8>, String> {
311 match payload {
312 TapePayload::Inline { text, .. } => Ok(text.as_bytes().to_vec()),
313 TapePayload::Cas { content_hash, .. } => self
314 .cas
315 .get(content_hash)
316 .cloned()
317 .ok_or_else(|| format!("tape CAS missing entry for {content_hash}")),
318 }
319 }
320
321 pub fn cas_len(&self) -> usize {
323 self.cas.len()
324 }
325
326 pub fn persist(&self, path: &Path) -> Result<(), String> {
329 if let Some(parent) = path.parent() {
330 if !parent.as_os_str().is_empty() {
331 std::fs::create_dir_all(parent)
332 .map_err(|err| format!("mkdir {}: {err}", parent.display()))?;
333 }
334 }
335
336 let mut body = String::new();
337 let header_line = serde_json::to_string(&TapeLine::Header(self.header.clone()))
338 .map_err(|err| format!("serialize tape header: {err}"))?;
339 body.push_str(&header_line);
340 body.push('\n');
341 for record in &self.records {
342 let line = serde_json::to_string(&TapeLine::Record(record.clone()))
343 .map_err(|err| format!("serialize tape record: {err}"))?;
344 body.push_str(&line);
345 body.push('\n');
346 }
347 std::fs::write(path, body).map_err(|err| format!("write {}: {err}", path.display()))?;
348
349 if !self.cas.is_empty() {
350 let cas_dir = cas_dir_for(path);
351 std::fs::create_dir_all(&cas_dir)
352 .map_err(|err| format!("mkdir {}: {err}", cas_dir.display()))?;
353 for (hash, bytes) in &self.cas {
354 let entry = cas_dir.join(hash);
355 std::fs::write(&entry, bytes)
356 .map_err(|err| format!("write {}: {err}", entry.display()))?;
357 }
358 }
359 Ok(())
360 }
361
362 pub fn load(path: &Path) -> Result<Self, String> {
365 let body = std::fs::read_to_string(path)
366 .map_err(|err| format!("read {}: {err}", path.display()))?;
367 let mut lines = body.lines();
368 let first_line = lines
369 .next()
370 .ok_or_else(|| format!("empty tape file: {}", path.display()))?;
371 let header_line: TapeLine = serde_json::from_str(first_line)
372 .map_err(|err| format!("parse tape header in {}: {err}", path.display()))?;
373 let header = match header_line {
374 TapeLine::Header(header) => header,
375 TapeLine::Record(_) => {
376 return Err(format!(
377 "tape {} is missing its header (first line is a record)",
378 path.display()
379 ))
380 }
381 };
382 if header.version > TAPE_FORMAT_VERSION {
383 return Err(format!(
384 "tape {} declares version {} but this runtime supports up to {TAPE_FORMAT_VERSION}",
385 path.display(),
386 header.version
387 ));
388 }
389 let mut records = Vec::new();
390 for (idx, line) in lines.enumerate() {
391 let trimmed = line.trim();
392 if trimmed.is_empty() {
393 continue;
394 }
395 let parsed: TapeLine = serde_json::from_str(trimmed).map_err(|err| {
396 format!(
397 "parse tape record at line {} in {}: {err}",
398 idx + 2,
399 path.display()
400 )
401 })?;
402 match parsed {
403 TapeLine::Record(record) => records.push(record),
404 TapeLine::Header(_) => {
405 return Err(format!(
406 "tape {} contains a second header at line {}",
407 path.display(),
408 idx + 2
409 ))
410 }
411 }
412 }
413
414 let mut cas = BTreeMap::new();
415 let cas_dir = cas_dir_for(path);
416 if cas_dir.is_dir() {
417 for record in &records {
418 visit_payloads(&record.kind, |payload| {
419 if let TapePayload::Cas { content_hash, .. } = payload {
420 if cas.contains_key(content_hash) {
421 return;
422 }
423 let entry = cas_dir.join(content_hash);
424 if let Ok(bytes) = std::fs::read(&entry) {
425 cas.insert(content_hash.clone(), bytes);
426 }
427 }
428 });
429 }
430 }
431 Ok(Self {
432 header,
433 records,
434 cas,
435 })
436 }
437}
438
439fn cas_dir_for(tape_path: &Path) -> PathBuf {
440 let mut buf = tape_path.as_os_str().to_owned();
441 buf.push(".cas");
442 PathBuf::from(buf)
443}
444
445fn visit_payloads(kind: &TapeRecordKind, mut visit: impl FnMut(&TapePayload)) {
446 match kind {
447 TapeRecordKind::LlmCall { response, .. } => visit(response),
448 TapeRecordKind::ProcessSpawn {
449 stdout_payload,
450 stderr_payload,
451 ..
452 } => {
453 visit(stdout_payload);
454 visit(stderr_payload);
455 }
456 TapeRecordKind::ClockRead { .. }
457 | TapeRecordKind::ClockSleep { .. }
458 | TapeRecordKind::FileRead { .. }
459 | TapeRecordKind::FileWrite { .. }
460 | TapeRecordKind::FileDelete { .. }
461 | TapeRecordKind::Unknown => {}
462 }
463}
464
465#[derive(Debug)]
470pub struct TapeRecorder {
471 next_seq: AtomicU64,
472 started_at: clock_mock::ClockInstant,
473 inner: Mutex<RecorderInner>,
474}
475
476#[derive(Debug, Default)]
477struct RecorderInner {
478 records: Vec<TapeRecord>,
479 cas: BTreeMap<String, Vec<u8>>,
480}
481
482impl Default for TapeRecorder {
483 fn default() -> Self {
484 Self::new()
485 }
486}
487
488impl TapeRecorder {
489 pub fn new() -> Self {
490 Self {
491 next_seq: AtomicU64::new(0),
492 started_at: clock_mock::instant_now(),
493 inner: Mutex::new(RecorderInner::default()),
494 }
495 }
496
497 pub fn record(&self, kind: TapeRecordKind) {
500 let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
501 let virtual_time_ms = clock_mock::now_ms();
502 let monotonic_ms = clock_mock::instant_now()
503 .duration_since(self.started_at)
504 .as_millis()
505 .min(i64::MAX as u128) as i64;
506 let record = TapeRecord {
507 seq,
508 virtual_time_ms,
509 monotonic_ms,
510 kind,
511 };
512 self.inner
513 .lock()
514 .expect("tape recorder mutex poisoned")
515 .records
516 .push(record);
517 }
518
519 pub fn payload_from_bytes(&self, bytes: Vec<u8>) -> TapePayload {
524 let mut inner = self.inner.lock().expect("tape recorder mutex poisoned");
525 build_payload(bytes, &mut inner.cas)
526 }
527
528 pub fn snapshot(&self, header: TapeHeader) -> EventTape {
533 let inner = self.inner.lock().expect("tape recorder mutex poisoned");
534 EventTape {
535 header,
536 records: inner.records.clone(),
537 cas: inner.cas.clone(),
538 }
539 }
540}
541
542thread_local! {
543 static ACTIVE_RECORDER: RefCell<Option<Arc<TapeRecorder>>> = const { RefCell::new(None) };
544}
545
546pub struct TapeRecorderGuard {
549 previous: Option<Arc<TapeRecorder>>,
550}
551
552impl Drop for TapeRecorderGuard {
553 fn drop(&mut self) {
554 let prev = self.previous.take();
555 ACTIVE_RECORDER.with(|slot| {
556 *slot.borrow_mut() = prev;
557 });
558 }
559}
560
561pub fn install_recorder(recorder: Arc<TapeRecorder>) -> TapeRecorderGuard {
562 let previous = ACTIVE_RECORDER.with(|slot| slot.replace(Some(recorder)));
563 TapeRecorderGuard { previous }
564}
565
566pub fn active_recorder() -> Option<Arc<TapeRecorder>> {
569 ACTIVE_RECORDER.with(|slot| slot.borrow().clone())
570}
571
572pub fn with_active_recorder<F>(build: F)
575where
576 F: FnOnce(&Arc<TapeRecorder>) -> Option<TapeRecordKind>,
577{
578 let Some(recorder) = active_recorder() else {
579 return;
580 };
581 if let Some(kind) = build(&recorder) {
582 recorder.record(kind);
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589 use tempfile::TempDir;
590
591 fn small_record(seq: u64, dur: u64) -> TapeRecord {
592 TapeRecord {
593 seq,
594 virtual_time_ms: seq as i64 * 1000,
595 monotonic_ms: seq as i64 * 1000,
596 kind: TapeRecordKind::ClockSleep { duration_ms: dur },
597 }
598 }
599
600 #[test]
601 fn round_trip_inline_records() {
602 let temp = TempDir::new().unwrap();
603 let path = temp.path().join("run.tape");
604 let mut tape = EventTape::new(TapeHeader::current(
605 Some(1_700_000_000_000),
606 Some("script.harn".to_string()),
607 vec!["a".into()],
608 ));
609 tape.records.push(small_record(0, 250));
610 tape.records.push(small_record(1, 750));
611 tape.persist(&path).unwrap();
612
613 let loaded = EventTape::load(&path).unwrap();
614 assert_eq!(loaded.header.version, TAPE_FORMAT_VERSION);
615 assert_eq!(loaded.header.argv, vec!["a".to_string()]);
616 assert_eq!(loaded.records.len(), 2);
617 match &loaded.records[0].kind {
618 TapeRecordKind::ClockSleep { duration_ms } => assert_eq!(*duration_ms, 250),
619 other => panic!("unexpected: {other:?}"),
620 }
621 }
622
623 #[test]
624 fn large_payloads_spill_to_cas_and_round_trip() {
625 let temp = TempDir::new().unwrap();
626 let path = temp.path().join("run.tape");
627 let mut tape = EventTape::new(TapeHeader::current(None, None, Vec::new()));
628 let big = vec![b'x'; MAX_INLINE_BYTES + 32];
629 let payload = build_payload(big.clone(), &mut tape.cas);
630 let hash = payload.content_hash().to_string();
631 let kind = TapeRecordKind::ProcessSpawn {
632 program: "/bin/echo".to_string(),
633 args: vec!["x".to_string()],
634 cwd: None,
635 exit_code: 0,
636 duration_ms: 1,
637 stdout_payload: payload,
638 stderr_payload: build_payload(Vec::new(), &mut tape.cas),
639 };
640 tape.records.push(TapeRecord {
641 seq: 0,
642 virtual_time_ms: 0,
643 monotonic_ms: 0,
644 kind,
645 });
646 tape.persist(&path).unwrap();
647
648 assert!(path.with_extension("tape.cas").exists() || cas_dir_for(&path).exists());
650 let cas_dir = cas_dir_for(&path);
651 assert!(cas_dir.join(&hash).exists());
652
653 let loaded = EventTape::load(&path).unwrap();
654 let resolved = match &loaded.records[0].kind {
655 TapeRecordKind::ProcessSpawn { stdout_payload, .. } => {
656 loaded.resolve_payload(stdout_payload).unwrap()
657 }
658 other => panic!("unexpected: {other:?}"),
659 };
660 assert_eq!(resolved.len(), big.len());
661 }
662
663 #[test]
664 fn rejects_newer_version() {
665 let temp = TempDir::new().unwrap();
666 let path = temp.path().join("future.tape");
667 std::fs::write(
668 &path,
669 r#"{"type":"header","version":99,"harn_version":"x","started_at_unix_ms":null,"script_path":null,"argv":[]}
670"#,
671 )
672 .unwrap();
673 let err = EventTape::load(&path).unwrap_err();
674 assert!(err.contains("version 99"), "{err}");
675 }
676
677 #[test]
678 fn recorder_assigns_monotonic_seq() {
679 let recorder = Arc::new(TapeRecorder::new());
680 recorder.record(TapeRecordKind::ClockSleep { duration_ms: 1 });
681 recorder.record(TapeRecordKind::ClockSleep { duration_ms: 2 });
682 let snapshot = recorder.snapshot(TapeHeader::current(None, None, Vec::new()));
683 assert_eq!(snapshot.records[0].seq, 0);
684 assert_eq!(snapshot.records[1].seq, 1);
685 }
686}