1use std::cell::RefCell;
44use std::collections::BTreeMap;
45use std::path::{Path, PathBuf};
46use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
47use std::sync::{Arc, Mutex};
48
49use serde::{Deserialize, Serialize};
50
51use crate::clock_mock;
52
53pub const TAPE_FORMAT_VERSION: u32 = 1;
56
57pub const MAX_INLINE_BYTES: usize = 4 * 1024;
62
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
66pub struct TapeHeader {
67 pub version: u32,
68 pub harn_version: String,
72 #[serde(default)]
76 pub started_at_unix_ms: Option<i64>,
77 #[serde(default)]
80 pub script_path: Option<String>,
81 #[serde(default)]
84 pub argv: Vec<String>,
85}
86
87impl TapeHeader {
88 pub fn current(
89 started_at_unix_ms: Option<i64>,
90 script_path: Option<String>,
91 argv: Vec<String>,
92 ) -> Self {
93 Self {
94 version: TAPE_FORMAT_VERSION,
95 harn_version: env!("CARGO_PKG_VERSION").to_string(),
96 started_at_unix_ms,
97 script_path,
98 argv,
99 }
100 }
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
107#[serde(tag = "type", rename_all = "snake_case")]
108enum TapeLine {
109 Header(TapeHeader),
110 Record(TapeRecord),
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct TapeRecord {
118 pub seq: u64,
120 #[serde(default)]
124 pub phase: TapePhase,
125 pub virtual_time_ms: i64,
128 pub monotonic_ms: i64,
132 pub kind: TapeRecordKind,
134}
135
136#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
138#[serde(rename_all = "snake_case")]
139pub enum TapePhase {
140 #[default]
142 UserScript,
143 RuntimeFinalize,
146}
147
148impl TapePhase {
149 fn as_u8(self) -> u8 {
150 match self {
151 Self::UserScript => 0,
152 Self::RuntimeFinalize => 1,
153 }
154 }
155
156 fn from_u8(value: u8) -> Self {
157 match value {
158 1 => Self::RuntimeFinalize,
159 _ => Self::UserScript,
160 }
161 }
162
163 pub fn label(self) -> &'static str {
164 match self {
165 Self::UserScript => "user_script",
166 Self::RuntimeFinalize => "runtime_finalize",
167 }
168 }
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
176#[serde(tag = "kind", rename_all = "snake_case")]
177pub enum TapeRecordKind {
178 ClockRead { source: ClockSource, value_ms: i64 },
183 ClockSleep { duration_ms: u64 },
186 LlmCall {
191 request_digest: String,
192 response: TapePayload,
193 },
194 FileRead {
198 path: String,
199 content_hash: String,
200 len_bytes: u64,
201 },
202 FileWrite {
204 path: String,
205 content_hash: String,
206 len_bytes: u64,
207 },
208 FileDelete { path: String },
210 ProcessSpawn {
214 program: String,
215 args: Vec<String>,
216 cwd: Option<String>,
217 exit_code: i32,
218 duration_ms: u64,
219 stdout_payload: TapePayload,
220 stderr_payload: TapePayload,
221 },
222 #[serde(other)]
226 Unknown,
227}
228
229impl TapeRecordKind {
230 pub fn label(&self) -> &'static str {
235 match self {
236 Self::ClockRead { .. } => "clock_read",
237 Self::ClockSleep { .. } => "clock_sleep",
238 Self::LlmCall { .. } => "llm_call",
239 Self::FileRead { .. } => "file_read",
240 Self::FileWrite { .. } => "file_write",
241 Self::FileDelete { .. } => "file_delete",
242 Self::ProcessSpawn { .. } => "process_spawn",
243 Self::Unknown => "unknown",
244 }
245 }
246}
247
248#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
251#[serde(rename_all = "snake_case")]
252pub enum ClockSource {
253 Wall,
254 Monotonic,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
260#[serde(untagged)]
261pub enum TapePayload {
262 Inline { content_hash: String, text: String },
265 Cas {
267 content_hash: String,
268 len_bytes: u64,
269 },
270}
271
272impl TapePayload {
273 pub fn content_hash(&self) -> &str {
274 match self {
275 Self::Inline { content_hash, .. } | Self::Cas { content_hash, .. } => content_hash,
276 }
277 }
278
279 pub fn len_bytes(&self) -> u64 {
280 match self {
281 Self::Inline { text, .. } => text.len() as u64,
282 Self::Cas { len_bytes, .. } => *len_bytes,
283 }
284 }
285}
286
287pub fn content_hash(bytes: &[u8]) -> String {
290 blake3::hash(bytes).to_hex().to_string()
291}
292
293fn build_payload(bytes: Vec<u8>, cas: &mut BTreeMap<String, Vec<u8>>) -> TapePayload {
296 let hash = content_hash(&bytes);
297 if bytes.len() > MAX_INLINE_BYTES {
298 let len_bytes = bytes.len() as u64;
299 cas.entry(hash.clone()).or_insert(bytes);
300 TapePayload::Cas {
301 content_hash: hash,
302 len_bytes,
303 }
304 } else {
305 let text = match String::from_utf8(bytes) {
306 Ok(text) => text,
307 Err(error) => {
308 let bytes = error.into_bytes();
312 let len_bytes = bytes.len() as u64;
313 cas.entry(hash.clone()).or_insert(bytes);
314 return TapePayload::Cas {
315 content_hash: hash,
316 len_bytes,
317 };
318 }
319 };
320 TapePayload::Inline {
321 content_hash: hash,
322 text,
323 }
324 }
325}
326
327#[derive(Debug, Clone)]
331pub struct EventTape {
332 pub header: TapeHeader,
333 pub records: Vec<TapeRecord>,
334 cas: BTreeMap<String, Vec<u8>>,
338}
339
340impl EventTape {
341 pub fn new(header: TapeHeader) -> Self {
342 Self {
343 header,
344 records: Vec::new(),
345 cas: BTreeMap::new(),
346 }
347 }
348
349 pub fn resolve_payload(&self, payload: &TapePayload) -> Result<Vec<u8>, String> {
352 match payload {
353 TapePayload::Inline { text, .. } => Ok(text.as_bytes().to_vec()),
354 TapePayload::Cas { content_hash, .. } => self
355 .cas
356 .get(content_hash)
357 .cloned()
358 .ok_or_else(|| format!("tape CAS missing entry for {content_hash}")),
359 }
360 }
361
362 pub fn cas_len(&self) -> usize {
364 self.cas.len()
365 }
366
367 pub fn persist(&self, path: &Path) -> Result<(), String> {
370 if let Some(parent) = path.parent() {
371 if !parent.as_os_str().is_empty() {
372 std::fs::create_dir_all(parent)
373 .map_err(|err| format!("mkdir {}: {err}", parent.display()))?;
374 }
375 }
376
377 let mut body = String::new();
378 let header_line = serde_json::to_string(&TapeLine::Header(self.header.clone()))
379 .map_err(|err| format!("serialize tape header: {err}"))?;
380 body.push_str(&header_line);
381 body.push('\n');
382 for record in &self.records {
383 let line = serde_json::to_string(&TapeLine::Record(record.clone()))
384 .map_err(|err| format!("serialize tape record: {err}"))?;
385 body.push_str(&line);
386 body.push('\n');
387 }
388 std::fs::write(path, body).map_err(|err| format!("write {}: {err}", path.display()))?;
389
390 if !self.cas.is_empty() {
391 let cas_dir = cas_dir_for(path);
392 std::fs::create_dir_all(&cas_dir)
393 .map_err(|err| format!("mkdir {}: {err}", cas_dir.display()))?;
394 for (hash, bytes) in &self.cas {
395 let entry = cas_dir.join(hash);
396 std::fs::write(&entry, bytes)
397 .map_err(|err| format!("write {}: {err}", entry.display()))?;
398 }
399 }
400 Ok(())
401 }
402
403 pub fn load(path: &Path) -> Result<Self, String> {
406 let body = std::fs::read_to_string(path)
407 .map_err(|err| format!("read {}: {err}", path.display()))?;
408 let mut lines = body.lines();
409 let first_line = lines
410 .next()
411 .ok_or_else(|| format!("empty tape file: {}", path.display()))?;
412 let header_line: TapeLine = serde_json::from_str(first_line)
413 .map_err(|err| format!("parse tape header in {}: {err}", path.display()))?;
414 let header = match header_line {
415 TapeLine::Header(header) => header,
416 TapeLine::Record(_) => {
417 return Err(format!(
418 "tape {} is missing its header (first line is a record)",
419 path.display()
420 ))
421 }
422 };
423 if header.version > TAPE_FORMAT_VERSION {
424 return Err(format!(
425 "tape {} declares version {} but this runtime supports up to {TAPE_FORMAT_VERSION}",
426 path.display(),
427 header.version
428 ));
429 }
430 let mut records = Vec::new();
431 for (idx, line) in lines.enumerate() {
432 let trimmed = line.trim();
433 if trimmed.is_empty() {
434 continue;
435 }
436 let parsed: TapeLine = serde_json::from_str(trimmed).map_err(|err| {
437 format!(
438 "parse tape record at line {} in {}: {err}",
439 idx + 2,
440 path.display()
441 )
442 })?;
443 match parsed {
444 TapeLine::Record(record) => records.push(record),
445 TapeLine::Header(_) => {
446 return Err(format!(
447 "tape {} contains a second header at line {}",
448 path.display(),
449 idx + 2
450 ))
451 }
452 }
453 }
454
455 let mut cas = BTreeMap::new();
456 let cas_dir = cas_dir_for(path);
457 if cas_dir.is_dir() {
458 for record in &records {
459 visit_payloads(&record.kind, |payload| {
460 if let TapePayload::Cas { content_hash, .. } = payload {
461 if cas.contains_key(content_hash) {
462 return;
463 }
464 let entry = cas_dir.join(content_hash);
465 if let Ok(bytes) = std::fs::read(&entry) {
466 cas.insert(content_hash.clone(), bytes);
467 }
468 }
469 });
470 }
471 }
472 Ok(Self {
473 header,
474 records,
475 cas,
476 })
477 }
478}
479
480fn cas_dir_for(tape_path: &Path) -> PathBuf {
481 let mut buf = tape_path.as_os_str().to_owned();
482 buf.push(".cas");
483 PathBuf::from(buf)
484}
485
486fn visit_payloads(kind: &TapeRecordKind, mut visit: impl FnMut(&TapePayload)) {
487 match kind {
488 TapeRecordKind::LlmCall { response, .. } => visit(response),
489 TapeRecordKind::ProcessSpawn {
490 stdout_payload,
491 stderr_payload,
492 ..
493 } => {
494 visit(stdout_payload);
495 visit(stderr_payload);
496 }
497 TapeRecordKind::ClockRead { .. }
498 | TapeRecordKind::ClockSleep { .. }
499 | TapeRecordKind::FileRead { .. }
500 | TapeRecordKind::FileWrite { .. }
501 | TapeRecordKind::FileDelete { .. }
502 | TapeRecordKind::Unknown => {}
503 }
504}
505
506#[derive(Debug)]
511pub struct TapeRecorder {
512 next_seq: AtomicU64,
513 phase: AtomicU8,
514 started_at: clock_mock::ClockInstant,
515 inner: Mutex<RecorderInner>,
516}
517
518#[derive(Debug, Default)]
519struct RecorderInner {
520 records: Vec<TapeRecord>,
521 cas: BTreeMap<String, Vec<u8>>,
522}
523
524impl Default for TapeRecorder {
525 fn default() -> Self {
526 Self::new()
527 }
528}
529
530impl TapeRecorder {
531 pub fn new() -> Self {
532 Self {
533 next_seq: AtomicU64::new(0),
534 phase: AtomicU8::new(TapePhase::UserScript.as_u8()),
535 started_at: clock_mock::instant_now(),
536 inner: Mutex::new(RecorderInner::default()),
537 }
538 }
539
540 pub fn record(&self, kind: TapeRecordKind) {
543 let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
544 let virtual_time_ms = clock_mock::now_ms();
545 let monotonic_ms = clock_mock::instant_now()
546 .duration_since(self.started_at)
547 .as_millis()
548 .min(i64::MAX as u128) as i64;
549 let record = TapeRecord {
550 seq,
551 phase: TapePhase::from_u8(self.phase.load(Ordering::SeqCst)),
552 virtual_time_ms,
553 monotonic_ms,
554 kind,
555 };
556 self.inner
557 .lock()
558 .expect("tape recorder mutex poisoned")
559 .records
560 .push(record);
561 }
562
563 fn swap_phase(&self, phase: TapePhase) -> TapePhase {
564 TapePhase::from_u8(self.phase.swap(phase.as_u8(), Ordering::SeqCst))
565 }
566
567 pub fn payload_from_bytes(&self, bytes: Vec<u8>) -> TapePayload {
572 let mut inner = self.inner.lock().expect("tape recorder mutex poisoned");
573 build_payload(bytes, &mut inner.cas)
574 }
575
576 pub fn snapshot(&self, header: TapeHeader) -> EventTape {
581 let inner = self.inner.lock().expect("tape recorder mutex poisoned");
582 EventTape {
583 header,
584 records: inner.records.clone(),
585 cas: inner.cas.clone(),
586 }
587 }
588}
589
590thread_local! {
591 static ACTIVE_RECORDER: RefCell<Option<Arc<TapeRecorder>>> = const { RefCell::new(None) };
592}
593
594pub struct TapeRecorderGuard {
597 previous: Option<Arc<TapeRecorder>>,
598}
599
600impl Drop for TapeRecorderGuard {
601 fn drop(&mut self) {
602 let prev = self.previous.take();
603 ACTIVE_RECORDER.with(|slot| {
604 *slot.borrow_mut() = prev;
605 });
606 }
607}
608
609pub fn install_recorder(recorder: Arc<TapeRecorder>) -> TapeRecorderGuard {
610 let previous = ACTIVE_RECORDER.with(|slot| slot.replace(Some(recorder)));
611 TapeRecorderGuard { previous }
612}
613
614pub fn active_recorder() -> Option<Arc<TapeRecorder>> {
617 ACTIVE_RECORDER.with(|slot| slot.borrow().clone())
618}
619
620pub struct TapePhaseGuard {
623 recorder: Arc<TapeRecorder>,
624 previous: TapePhase,
625}
626
627impl Drop for TapePhaseGuard {
628 fn drop(&mut self) {
629 self.recorder.swap_phase(self.previous);
630 }
631}
632
633pub fn enter_phase(phase: TapePhase) -> Option<TapePhaseGuard> {
636 let recorder = active_recorder()?;
637 let previous = recorder.swap_phase(phase);
638 Some(TapePhaseGuard { recorder, previous })
639}
640
641pub fn with_active_recorder<F>(build: F)
644where
645 F: FnOnce(&Arc<TapeRecorder>) -> Option<TapeRecordKind>,
646{
647 let Some(recorder) = active_recorder() else {
648 return;
649 };
650 if let Some(kind) = build(&recorder) {
651 recorder.record(kind);
652 }
653}
654
655#[cfg(test)]
656mod tests {
657 use super::*;
658 use tempfile::TempDir;
659
660 fn small_record(seq: u64, dur: u64) -> TapeRecord {
661 TapeRecord {
662 seq,
663 phase: TapePhase::UserScript,
664 virtual_time_ms: seq as i64 * 1000,
665 monotonic_ms: seq as i64 * 1000,
666 kind: TapeRecordKind::ClockSleep { duration_ms: dur },
667 }
668 }
669
670 #[test]
671 fn round_trip_inline_records() {
672 let temp = TempDir::new().unwrap();
673 let path = temp.path().join("run.tape");
674 let mut tape = EventTape::new(TapeHeader::current(
675 Some(1_700_000_000_000),
676 Some("script.harn".to_string()),
677 vec!["a".into()],
678 ));
679 tape.records.push(small_record(0, 250));
680 tape.records.push(small_record(1, 750));
681 tape.persist(&path).unwrap();
682
683 let loaded = EventTape::load(&path).unwrap();
684 assert_eq!(loaded.header.version, TAPE_FORMAT_VERSION);
685 assert_eq!(loaded.header.argv, vec!["a".to_string()]);
686 assert_eq!(loaded.records.len(), 2);
687 match &loaded.records[0].kind {
688 TapeRecordKind::ClockSleep { duration_ms } => assert_eq!(*duration_ms, 250),
689 other => panic!("unexpected: {other:?}"),
690 }
691 }
692
693 #[test]
694 fn recorder_phase_guard_stamps_and_restores() {
695 let recorder = Arc::new(TapeRecorder::new());
696 let _recorder_guard = install_recorder(Arc::clone(&recorder));
697
698 with_active_recorder(|_| Some(TapeRecordKind::ClockSleep { duration_ms: 1 }));
699 {
700 let _phase_guard = enter_phase(TapePhase::RuntimeFinalize).unwrap();
701 with_active_recorder(|_| Some(TapeRecordKind::ClockSleep { duration_ms: 2 }));
702 }
703 with_active_recorder(|_| Some(TapeRecordKind::ClockSleep { duration_ms: 3 }));
704
705 let tape = recorder.snapshot(TapeHeader::current(None, None, Vec::new()));
706 let phases = tape
707 .records
708 .iter()
709 .map(|record| record.phase)
710 .collect::<Vec<_>>();
711 assert_eq!(
712 phases,
713 vec![
714 TapePhase::UserScript,
715 TapePhase::RuntimeFinalize,
716 TapePhase::UserScript
717 ]
718 );
719 }
720
721 #[test]
722 fn large_payloads_spill_to_cas_and_round_trip() {
723 let temp = TempDir::new().unwrap();
724 let path = temp.path().join("run.tape");
725 let mut tape = EventTape::new(TapeHeader::current(None, None, Vec::new()));
726 let big = vec![b'x'; MAX_INLINE_BYTES + 32];
727 let payload = build_payload(big.clone(), &mut tape.cas);
728 let hash = payload.content_hash().to_string();
729 let kind = TapeRecordKind::ProcessSpawn {
730 program: "/bin/echo".to_string(),
731 args: vec!["x".to_string()],
732 cwd: None,
733 exit_code: 0,
734 duration_ms: 1,
735 stdout_payload: payload,
736 stderr_payload: build_payload(Vec::new(), &mut tape.cas),
737 };
738 tape.records.push(TapeRecord {
739 seq: 0,
740 phase: TapePhase::UserScript,
741 virtual_time_ms: 0,
742 monotonic_ms: 0,
743 kind,
744 });
745 tape.persist(&path).unwrap();
746
747 assert!(path.with_extension("tape.cas").exists() || cas_dir_for(&path).exists());
749 let cas_dir = cas_dir_for(&path);
750 assert!(cas_dir.join(&hash).exists());
751
752 let loaded = EventTape::load(&path).unwrap();
753 let resolved = match &loaded.records[0].kind {
754 TapeRecordKind::ProcessSpawn { stdout_payload, .. } => {
755 loaded.resolve_payload(stdout_payload).unwrap()
756 }
757 other => panic!("unexpected: {other:?}"),
758 };
759 assert_eq!(resolved.len(), big.len());
760 }
761
762 #[test]
763 fn rejects_newer_version() {
764 let temp = TempDir::new().unwrap();
765 let path = temp.path().join("future.tape");
766 std::fs::write(
767 &path,
768 r#"{"type":"header","version":99,"harn_version":"x","started_at_unix_ms":null,"script_path":null,"argv":[]}
769"#,
770 )
771 .unwrap();
772 let err = EventTape::load(&path).unwrap_err();
773 assert!(err.contains("version 99"), "{err}");
774 }
775
776 #[test]
777 fn recorder_assigns_monotonic_seq() {
778 let recorder = Arc::new(TapeRecorder::new());
779 recorder.record(TapeRecordKind::ClockSleep { duration_ms: 1 });
780 recorder.record(TapeRecordKind::ClockSleep { duration_ms: 2 });
781 let snapshot = recorder.snapshot(TapeHeader::current(None, None, Vec::new()));
782 assert_eq!(snapshot.records[0].seq, 0);
783 assert_eq!(snapshot.records[1].seq, 1);
784 }
785}