1use std::cell::RefCell;
44use std::collections::BTreeMap;
45use std::path::{Path, PathBuf};
46use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
47use std::sync::{Arc, Mutex};
48
49use serde::{Deserialize, Serialize};
50
51use crate::clock_mock;
52
53pub const TAPE_FORMAT_VERSION: u32 = 1;
56
57pub const MAX_INLINE_BYTES: usize = 4 * 1024;
62
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
66pub struct TapeHeader {
67 pub version: u32,
68 pub harn_version: String,
72 #[serde(default)]
76 pub started_at_unix_ms: Option<i64>,
77 #[serde(default)]
80 pub script_path: Option<String>,
81 #[serde(default)]
84 pub argv: Vec<String>,
85}
86
87impl TapeHeader {
88 pub fn current(
89 started_at_unix_ms: Option<i64>,
90 script_path: Option<String>,
91 argv: Vec<String>,
92 ) -> Self {
93 Self {
94 version: TAPE_FORMAT_VERSION,
95 harn_version: env!("CARGO_PKG_VERSION").to_string(),
96 started_at_unix_ms,
97 script_path,
98 argv,
99 }
100 }
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
107#[serde(tag = "type", rename_all = "snake_case")]
108enum TapeLine {
109 Header(TapeHeader),
110 Record(TapeRecord),
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct TapeRecord {
118 pub seq: u64,
120 #[serde(default)]
124 pub phase: TapePhase,
125 pub virtual_time_ms: i64,
128 pub monotonic_ms: i64,
132 pub kind: TapeRecordKind,
134}
135
136#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
138#[serde(rename_all = "snake_case")]
139pub enum TapePhase {
140 #[default]
142 UserScript,
143 RuntimeFinalize,
146}
147
148impl TapePhase {
149 fn as_u8(self) -> u8 {
150 match self {
151 Self::UserScript => 0,
152 Self::RuntimeFinalize => 1,
153 }
154 }
155
156 fn from_u8(value: u8) -> Self {
157 match value {
158 1 => Self::RuntimeFinalize,
159 _ => Self::UserScript,
160 }
161 }
162
163 pub fn label(self) -> &'static str {
164 match self {
165 Self::UserScript => "user_script",
166 Self::RuntimeFinalize => "runtime_finalize",
167 }
168 }
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
176#[serde(tag = "kind", rename_all = "snake_case")]
177pub enum TapeRecordKind {
178 ClockRead { source: ClockSource, value_ms: i64 },
183 ClockSleep { duration_ms: u64 },
186 LlmCall {
191 request_digest: String,
192 response: TapePayload,
193 },
194 FileRead {
198 path: String,
199 content_hash: String,
200 len_bytes: u64,
201 },
202 FileWrite {
204 path: String,
205 content_hash: String,
206 len_bytes: u64,
207 },
208 FileDelete { path: String },
210 ProcessSpawn {
214 program: String,
215 args: Vec<String>,
216 cwd: Option<String>,
217 exit_code: i32,
218 duration_ms: u64,
219 stdout_payload: TapePayload,
220 stderr_payload: TapePayload,
221 },
222 McpJsonRpc {
226 server: String,
227 method: String,
228 request_digest: String,
229 response_digest: String,
230 latency_ms: u64,
231 request_payload: TapePayload,
232 response_payload: TapePayload,
233 },
234 #[serde(other)]
238 Unknown,
239}
240
241impl TapeRecordKind {
242 pub fn label(&self) -> &'static str {
247 match self {
248 Self::ClockRead { .. } => "clock_read",
249 Self::ClockSleep { .. } => "clock_sleep",
250 Self::LlmCall { .. } => "llm_call",
251 Self::FileRead { .. } => "file_read",
252 Self::FileWrite { .. } => "file_write",
253 Self::FileDelete { .. } => "file_delete",
254 Self::ProcessSpawn { .. } => "process_spawn",
255 Self::McpJsonRpc { .. } => "mcp_json_rpc",
256 Self::Unknown => "unknown",
257 }
258 }
259}
260
261#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
264#[serde(rename_all = "snake_case")]
265pub enum ClockSource {
266 Wall,
267 Monotonic,
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize)]
273#[serde(untagged)]
274pub enum TapePayload {
275 Inline { content_hash: String, text: String },
278 Cas {
280 content_hash: String,
281 len_bytes: u64,
282 },
283}
284
285impl TapePayload {
286 pub fn content_hash(&self) -> &str {
287 match self {
288 Self::Inline { content_hash, .. } | Self::Cas { content_hash, .. } => content_hash,
289 }
290 }
291
292 pub fn len_bytes(&self) -> u64 {
293 match self {
294 Self::Inline { text, .. } => text.len() as u64,
295 Self::Cas { len_bytes, .. } => *len_bytes,
296 }
297 }
298}
299
300pub fn content_hash(bytes: &[u8]) -> String {
303 blake3::hash(bytes).to_hex().to_string()
304}
305
306fn build_payload(bytes: Vec<u8>, cas: &mut BTreeMap<String, Vec<u8>>) -> TapePayload {
309 let hash = content_hash(&bytes);
310 if bytes.len() > MAX_INLINE_BYTES {
311 let len_bytes = bytes.len() as u64;
312 cas.entry(hash.clone()).or_insert(bytes);
313 TapePayload::Cas {
314 content_hash: hash,
315 len_bytes,
316 }
317 } else {
318 let text = match String::from_utf8(bytes) {
319 Ok(text) => text,
320 Err(error) => {
321 let bytes = error.into_bytes();
325 let len_bytes = bytes.len() as u64;
326 cas.entry(hash.clone()).or_insert(bytes);
327 return TapePayload::Cas {
328 content_hash: hash,
329 len_bytes,
330 };
331 }
332 };
333 TapePayload::Inline {
334 content_hash: hash,
335 text,
336 }
337 }
338}
339
340#[derive(Debug, Clone)]
344pub struct EventTape {
345 pub header: TapeHeader,
346 pub records: Vec<TapeRecord>,
347 cas: BTreeMap<String, Vec<u8>>,
351}
352
353impl EventTape {
354 pub fn new(header: TapeHeader) -> Self {
355 Self {
356 header,
357 records: Vec::new(),
358 cas: BTreeMap::new(),
359 }
360 }
361
362 pub fn resolve_payload(&self, payload: &TapePayload) -> Result<Vec<u8>, String> {
365 match payload {
366 TapePayload::Inline { text, .. } => Ok(text.as_bytes().to_vec()),
367 TapePayload::Cas { content_hash, .. } => self
368 .cas
369 .get(content_hash)
370 .cloned()
371 .ok_or_else(|| format!("tape CAS missing entry for {content_hash}")),
372 }
373 }
374
375 pub fn cas_len(&self) -> usize {
377 self.cas.len()
378 }
379
380 pub fn persist(&self, path: &Path) -> Result<(), String> {
383 if let Some(parent) = path.parent() {
384 if !parent.as_os_str().is_empty() {
385 std::fs::create_dir_all(parent)
386 .map_err(|err| format!("mkdir {}: {err}", parent.display()))?;
387 }
388 }
389
390 let mut body = String::new();
391 let header_line = serde_json::to_string(&TapeLine::Header(self.header.clone()))
392 .map_err(|err| format!("serialize tape header: {err}"))?;
393 body.push_str(&header_line);
394 body.push('\n');
395 for record in &self.records {
396 let line = serde_json::to_string(&TapeLine::Record(record.clone()))
397 .map_err(|err| format!("serialize tape record: {err}"))?;
398 body.push_str(&line);
399 body.push('\n');
400 }
401 std::fs::write(path, body).map_err(|err| format!("write {}: {err}", path.display()))?;
402
403 if !self.cas.is_empty() {
404 let cas_dir = cas_dir_for(path);
405 std::fs::create_dir_all(&cas_dir)
406 .map_err(|err| format!("mkdir {}: {err}", cas_dir.display()))?;
407 for (hash, bytes) in &self.cas {
408 let entry = cas_dir.join(hash);
409 std::fs::write(&entry, bytes)
410 .map_err(|err| format!("write {}: {err}", entry.display()))?;
411 }
412 }
413 Ok(())
414 }
415
416 pub fn load(path: &Path) -> Result<Self, String> {
419 let body = std::fs::read_to_string(path)
420 .map_err(|err| format!("read {}: {err}", path.display()))?;
421 let mut lines = body.lines();
422 let first_line = lines
423 .next()
424 .ok_or_else(|| format!("empty tape file: {}", path.display()))?;
425 let header_line: TapeLine = serde_json::from_str(first_line)
426 .map_err(|err| format!("parse tape header in {}: {err}", path.display()))?;
427 let header = match header_line {
428 TapeLine::Header(header) => header,
429 TapeLine::Record(_) => {
430 return Err(format!(
431 "tape {} is missing its header (first line is a record)",
432 path.display()
433 ))
434 }
435 };
436 if header.version > TAPE_FORMAT_VERSION {
437 return Err(format!(
438 "tape {} declares version {} but this runtime supports up to {TAPE_FORMAT_VERSION}",
439 path.display(),
440 header.version
441 ));
442 }
443 let mut records = Vec::new();
444 for (idx, line) in lines.enumerate() {
445 let trimmed = line.trim();
446 if trimmed.is_empty() {
447 continue;
448 }
449 let parsed: TapeLine = serde_json::from_str(trimmed).map_err(|err| {
450 format!(
451 "parse tape record at line {} in {}: {err}",
452 idx + 2,
453 path.display()
454 )
455 })?;
456 match parsed {
457 TapeLine::Record(record) => records.push(record),
458 TapeLine::Header(_) => {
459 return Err(format!(
460 "tape {} contains a second header at line {}",
461 path.display(),
462 idx + 2
463 ))
464 }
465 }
466 }
467
468 let mut cas = BTreeMap::new();
469 let cas_dir = cas_dir_for(path);
470 if cas_dir.is_dir() {
471 for record in &records {
472 visit_payloads(&record.kind, |payload| {
473 if let TapePayload::Cas { content_hash, .. } = payload {
474 if cas.contains_key(content_hash) {
475 return;
476 }
477 let entry = cas_dir.join(content_hash);
478 if let Ok(bytes) = std::fs::read(&entry) {
479 cas.insert(content_hash.clone(), bytes);
480 }
481 }
482 });
483 }
484 }
485 Ok(Self {
486 header,
487 records,
488 cas,
489 })
490 }
491}
492
493fn cas_dir_for(tape_path: &Path) -> PathBuf {
494 let mut buf = tape_path.as_os_str().to_owned();
495 buf.push(".cas");
496 PathBuf::from(buf)
497}
498
499fn visit_payloads(kind: &TapeRecordKind, mut visit: impl FnMut(&TapePayload)) {
500 match kind {
501 TapeRecordKind::LlmCall { response, .. } => visit(response),
502 TapeRecordKind::ProcessSpawn {
503 stdout_payload,
504 stderr_payload,
505 ..
506 } => {
507 visit(stdout_payload);
508 visit(stderr_payload);
509 }
510 TapeRecordKind::McpJsonRpc {
511 request_payload,
512 response_payload,
513 ..
514 } => {
515 visit(request_payload);
516 visit(response_payload);
517 }
518 TapeRecordKind::ClockRead { .. }
519 | TapeRecordKind::ClockSleep { .. }
520 | TapeRecordKind::FileRead { .. }
521 | TapeRecordKind::FileWrite { .. }
522 | TapeRecordKind::FileDelete { .. }
523 | TapeRecordKind::Unknown => {}
524 }
525}
526
527#[derive(Debug)]
532pub struct TapeRecorder {
533 next_seq: AtomicU64,
534 phase: AtomicU8,
535 started_at: clock_mock::ClockInstant,
536 inner: Mutex<RecorderInner>,
537}
538
539#[derive(Debug, Default)]
540struct RecorderInner {
541 records: Vec<TapeRecord>,
542 cas: BTreeMap<String, Vec<u8>>,
543}
544
545impl Default for TapeRecorder {
546 fn default() -> Self {
547 Self::new()
548 }
549}
550
551impl TapeRecorder {
552 pub fn new() -> Self {
553 Self {
554 next_seq: AtomicU64::new(0),
555 phase: AtomicU8::new(TapePhase::UserScript.as_u8()),
556 started_at: clock_mock::instant_now(),
557 inner: Mutex::new(RecorderInner::default()),
558 }
559 }
560
561 pub fn record(&self, kind: TapeRecordKind) {
564 let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
565 let virtual_time_ms = clock_mock::now_ms();
566 let monotonic_ms = clock_mock::instant_now()
567 .duration_since(self.started_at)
568 .as_millis()
569 .min(i64::MAX as u128) as i64;
570 let record = TapeRecord {
571 seq,
572 phase: TapePhase::from_u8(self.phase.load(Ordering::SeqCst)),
573 virtual_time_ms,
574 monotonic_ms,
575 kind,
576 };
577 self.inner
578 .lock()
579 .expect("tape recorder mutex poisoned")
580 .records
581 .push(record);
582 }
583
584 fn swap_phase(&self, phase: TapePhase) -> TapePhase {
585 TapePhase::from_u8(self.phase.swap(phase.as_u8(), Ordering::SeqCst))
586 }
587
588 pub fn payload_from_bytes(&self, bytes: Vec<u8>) -> TapePayload {
593 let mut inner = self.inner.lock().expect("tape recorder mutex poisoned");
594 build_payload(bytes, &mut inner.cas)
595 }
596
597 pub fn snapshot(&self, header: TapeHeader) -> EventTape {
602 let inner = self.inner.lock().expect("tape recorder mutex poisoned");
603 EventTape {
604 header,
605 records: inner.records.clone(),
606 cas: inner.cas.clone(),
607 }
608 }
609}
610
611thread_local! {
612 static ACTIVE_RECORDER: RefCell<Option<Arc<TapeRecorder>>> = const { RefCell::new(None) };
613}
614
615pub struct TapeRecorderGuard {
618 previous: Option<Arc<TapeRecorder>>,
619}
620
621impl Drop for TapeRecorderGuard {
622 fn drop(&mut self) {
623 let prev = self.previous.take();
624 ACTIVE_RECORDER.with(|slot| {
625 *slot.borrow_mut() = prev;
626 });
627 }
628}
629
630pub fn install_recorder(recorder: Arc<TapeRecorder>) -> TapeRecorderGuard {
631 let previous = ACTIVE_RECORDER.with(|slot| slot.replace(Some(recorder)));
632 TapeRecorderGuard { previous }
633}
634
635pub fn active_recorder() -> Option<Arc<TapeRecorder>> {
638 ACTIVE_RECORDER.with(|slot| slot.borrow().clone())
639}
640
641pub fn record_mcp_json_rpc(
645 server: &str,
646 method: &str,
647 request: &serde_json::Value,
648 response: &serde_json::Value,
649 latency_ms: u64,
650) {
651 let Some(recorder) = active_recorder() else {
652 return;
653 };
654 let policy = crate::redact::current_policy();
655 let request = policy.redact_json(request);
656 let response = policy.redact_json(response);
657 let request_bytes = serde_json::to_vec(&request).unwrap_or_default();
658 let response_bytes = serde_json::to_vec(&response).unwrap_or_default();
659 let request_digest = content_hash(&request_bytes);
660 let response_digest = content_hash(&response_bytes);
661 let request_payload = recorder.payload_from_bytes(request_bytes);
662 let response_payload = recorder.payload_from_bytes(response_bytes);
663 recorder.record(TapeRecordKind::McpJsonRpc {
664 server: server.to_string(),
665 method: method.to_string(),
666 request_digest,
667 response_digest,
668 latency_ms,
669 request_payload,
670 response_payload,
671 });
672}
673
674pub struct TapePhaseGuard {
677 recorder: Arc<TapeRecorder>,
678 previous: TapePhase,
679}
680
681impl Drop for TapePhaseGuard {
682 fn drop(&mut self) {
683 self.recorder.swap_phase(self.previous);
684 }
685}
686
687pub fn enter_phase(phase: TapePhase) -> Option<TapePhaseGuard> {
690 let recorder = active_recorder()?;
691 let previous = recorder.swap_phase(phase);
692 Some(TapePhaseGuard { recorder, previous })
693}
694
695pub fn with_active_recorder<F>(build: F)
698where
699 F: FnOnce(&Arc<TapeRecorder>) -> Option<TapeRecordKind>,
700{
701 let Some(recorder) = active_recorder() else {
702 return;
703 };
704 if let Some(kind) = build(&recorder) {
705 recorder.record(kind);
706 }
707}
708
709#[cfg(test)]
710mod tests {
711 use super::*;
712 use tempfile::TempDir;
713
714 fn small_record(seq: u64, dur: u64) -> TapeRecord {
715 TapeRecord {
716 seq,
717 phase: TapePhase::UserScript,
718 virtual_time_ms: seq as i64 * 1000,
719 monotonic_ms: seq as i64 * 1000,
720 kind: TapeRecordKind::ClockSleep { duration_ms: dur },
721 }
722 }
723
724 #[test]
725 fn round_trip_inline_records() {
726 let temp = TempDir::new().unwrap();
727 let path = temp.path().join("run.tape");
728 let mut tape = EventTape::new(TapeHeader::current(
729 Some(1_700_000_000_000),
730 Some("script.harn".to_string()),
731 vec!["a".into()],
732 ));
733 tape.records.push(small_record(0, 250));
734 tape.records.push(small_record(1, 750));
735 tape.persist(&path).unwrap();
736
737 let loaded = EventTape::load(&path).unwrap();
738 assert_eq!(loaded.header.version, TAPE_FORMAT_VERSION);
739 assert_eq!(loaded.header.argv, vec!["a".to_string()]);
740 assert_eq!(loaded.records.len(), 2);
741 match &loaded.records[0].kind {
742 TapeRecordKind::ClockSleep { duration_ms } => assert_eq!(*duration_ms, 250),
743 other => panic!("unexpected: {other:?}"),
744 }
745 }
746
747 #[test]
748 fn recorder_phase_guard_stamps_and_restores() {
749 let recorder = Arc::new(TapeRecorder::new());
750 let _recorder_guard = install_recorder(Arc::clone(&recorder));
751
752 with_active_recorder(|_| Some(TapeRecordKind::ClockSleep { duration_ms: 1 }));
753 {
754 let _phase_guard = enter_phase(TapePhase::RuntimeFinalize).unwrap();
755 with_active_recorder(|_| Some(TapeRecordKind::ClockSleep { duration_ms: 2 }));
756 }
757 with_active_recorder(|_| Some(TapeRecordKind::ClockSleep { duration_ms: 3 }));
758
759 let tape = recorder.snapshot(TapeHeader::current(None, None, Vec::new()));
760 let phases = tape
761 .records
762 .iter()
763 .map(|record| record.phase)
764 .collect::<Vec<_>>();
765 assert_eq!(
766 phases,
767 vec![
768 TapePhase::UserScript,
769 TapePhase::RuntimeFinalize,
770 TapePhase::UserScript
771 ]
772 );
773 }
774
775 #[test]
776 fn large_payloads_spill_to_cas_and_round_trip() {
777 let temp = TempDir::new().unwrap();
778 let path = temp.path().join("run.tape");
779 let mut tape = EventTape::new(TapeHeader::current(None, None, Vec::new()));
780 let big = vec![b'x'; MAX_INLINE_BYTES + 32];
781 let payload = build_payload(big.clone(), &mut tape.cas);
782 let hash = payload.content_hash().to_string();
783 let kind = TapeRecordKind::ProcessSpawn {
784 program: "/bin/echo".to_string(),
785 args: vec!["x".to_string()],
786 cwd: None,
787 exit_code: 0,
788 duration_ms: 1,
789 stdout_payload: payload,
790 stderr_payload: build_payload(Vec::new(), &mut tape.cas),
791 };
792 tape.records.push(TapeRecord {
793 seq: 0,
794 phase: TapePhase::UserScript,
795 virtual_time_ms: 0,
796 monotonic_ms: 0,
797 kind,
798 });
799 tape.persist(&path).unwrap();
800
801 assert!(path.with_extension("tape.cas").exists() || cas_dir_for(&path).exists());
803 let cas_dir = cas_dir_for(&path);
804 assert!(cas_dir.join(&hash).exists());
805
806 let loaded = EventTape::load(&path).unwrap();
807 let resolved = match &loaded.records[0].kind {
808 TapeRecordKind::ProcessSpawn { stdout_payload, .. } => {
809 loaded.resolve_payload(stdout_payload).unwrap()
810 }
811 other => panic!("unexpected: {other:?}"),
812 };
813 assert_eq!(resolved.len(), big.len());
814 }
815
816 #[test]
817 fn rejects_newer_version() {
818 let temp = TempDir::new().unwrap();
819 let path = temp.path().join("future.tape");
820 std::fs::write(
821 &path,
822 r#"{"type":"header","version":99,"harn_version":"x","started_at_unix_ms":null,"script_path":null,"argv":[]}
823"#,
824 )
825 .unwrap();
826 let err = EventTape::load(&path).unwrap_err();
827 assert!(err.contains("version 99"), "{err}");
828 }
829
830 #[test]
831 fn recorder_assigns_monotonic_seq() {
832 let recorder = Arc::new(TapeRecorder::new());
833 recorder.record(TapeRecordKind::ClockSleep { duration_ms: 1 });
834 recorder.record(TapeRecordKind::ClockSleep { duration_ms: 2 });
835 let snapshot = recorder.snapshot(TapeHeader::current(None, None, Vec::new()));
836 assert_eq!(snapshot.records[0].seq, 0);
837 assert_eq!(snapshot.records[1].seq, 1);
838 }
839}