use std::cell::RefCell;
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use serde::{Deserialize, Serialize};
use crate::clock_mock;
pub const TAPE_FORMAT_VERSION: u32 = 1;
pub const MAX_INLINE_BYTES: usize = 4 * 1024;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TapeHeader {
pub version: u32,
pub harn_version: String,
#[serde(default)]
pub started_at_unix_ms: Option<i64>,
#[serde(default)]
pub script_path: Option<String>,
#[serde(default)]
pub argv: Vec<String>,
}
impl TapeHeader {
pub fn current(
started_at_unix_ms: Option<i64>,
script_path: Option<String>,
argv: Vec<String>,
) -> Self {
Self {
version: TAPE_FORMAT_VERSION,
harn_version: env!("CARGO_PKG_VERSION").to_string(),
started_at_unix_ms,
script_path,
argv,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
enum TapeLine {
Header(TapeHeader),
Record(TapeRecord),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TapeRecord {
pub seq: u64,
pub virtual_time_ms: i64,
pub monotonic_ms: i64,
pub kind: TapeRecordKind,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum TapeRecordKind {
ClockRead { source: ClockSource, value_ms: i64 },
ClockSleep { duration_ms: u64 },
LlmCall {
request_digest: String,
response: TapePayload,
},
FileRead {
path: String,
content_hash: String,
len_bytes: u64,
},
FileWrite {
path: String,
content_hash: String,
len_bytes: u64,
},
FileDelete { path: String },
ProcessSpawn {
program: String,
args: Vec<String>,
cwd: Option<String>,
exit_code: i32,
duration_ms: u64,
stdout_payload: TapePayload,
stderr_payload: TapePayload,
},
#[serde(other)]
Unknown,
}
impl TapeRecordKind {
pub fn label(&self) -> &'static str {
match self {
Self::ClockRead { .. } => "clock_read",
Self::ClockSleep { .. } => "clock_sleep",
Self::LlmCall { .. } => "llm_call",
Self::FileRead { .. } => "file_read",
Self::FileWrite { .. } => "file_write",
Self::FileDelete { .. } => "file_delete",
Self::ProcessSpawn { .. } => "process_spawn",
Self::Unknown => "unknown",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ClockSource {
Wall,
Monotonic,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum TapePayload {
Inline { content_hash: String, text: String },
Cas {
content_hash: String,
len_bytes: u64,
},
}
impl TapePayload {
pub fn content_hash(&self) -> &str {
match self {
Self::Inline { content_hash, .. } | Self::Cas { content_hash, .. } => content_hash,
}
}
pub fn len_bytes(&self) -> u64 {
match self {
Self::Inline { text, .. } => text.len() as u64,
Self::Cas { len_bytes, .. } => *len_bytes,
}
}
}
pub fn content_hash(bytes: &[u8]) -> String {
blake3::hash(bytes).to_hex().to_string()
}
fn build_payload(bytes: Vec<u8>, cas: &mut BTreeMap<String, Vec<u8>>) -> TapePayload {
let hash = content_hash(&bytes);
if bytes.len() > MAX_INLINE_BYTES {
let len_bytes = bytes.len() as u64;
cas.entry(hash.clone()).or_insert(bytes);
TapePayload::Cas {
content_hash: hash,
len_bytes,
}
} else {
let text = match String::from_utf8(bytes) {
Ok(text) => text,
Err(error) => {
let bytes = error.into_bytes();
let len_bytes = bytes.len() as u64;
cas.entry(hash.clone()).or_insert(bytes);
return TapePayload::Cas {
content_hash: hash,
len_bytes,
};
}
};
TapePayload::Inline {
content_hash: hash,
text,
}
}
}
#[derive(Debug, Clone)]
pub struct EventTape {
pub header: TapeHeader,
pub records: Vec<TapeRecord>,
cas: BTreeMap<String, Vec<u8>>,
}
impl EventTape {
pub fn new(header: TapeHeader) -> Self {
Self {
header,
records: Vec::new(),
cas: BTreeMap::new(),
}
}
pub fn resolve_payload(&self, payload: &TapePayload) -> Result<Vec<u8>, String> {
match payload {
TapePayload::Inline { text, .. } => Ok(text.as_bytes().to_vec()),
TapePayload::Cas { content_hash, .. } => self
.cas
.get(content_hash)
.cloned()
.ok_or_else(|| format!("tape CAS missing entry for {content_hash}")),
}
}
pub fn cas_len(&self) -> usize {
self.cas.len()
}
pub fn persist(&self, path: &Path) -> Result<(), String> {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent)
.map_err(|err| format!("mkdir {}: {err}", parent.display()))?;
}
}
let mut body = String::new();
let header_line = serde_json::to_string(&TapeLine::Header(self.header.clone()))
.map_err(|err| format!("serialize tape header: {err}"))?;
body.push_str(&header_line);
body.push('\n');
for record in &self.records {
let line = serde_json::to_string(&TapeLine::Record(record.clone()))
.map_err(|err| format!("serialize tape record: {err}"))?;
body.push_str(&line);
body.push('\n');
}
std::fs::write(path, body).map_err(|err| format!("write {}: {err}", path.display()))?;
if !self.cas.is_empty() {
let cas_dir = cas_dir_for(path);
std::fs::create_dir_all(&cas_dir)
.map_err(|err| format!("mkdir {}: {err}", cas_dir.display()))?;
for (hash, bytes) in &self.cas {
let entry = cas_dir.join(hash);
std::fs::write(&entry, bytes)
.map_err(|err| format!("write {}: {err}", entry.display()))?;
}
}
Ok(())
}
pub fn load(path: &Path) -> Result<Self, String> {
let body = std::fs::read_to_string(path)
.map_err(|err| format!("read {}: {err}", path.display()))?;
let mut lines = body.lines();
let first_line = lines
.next()
.ok_or_else(|| format!("empty tape file: {}", path.display()))?;
let header_line: TapeLine = serde_json::from_str(first_line)
.map_err(|err| format!("parse tape header in {}: {err}", path.display()))?;
let header = match header_line {
TapeLine::Header(header) => header,
TapeLine::Record(_) => {
return Err(format!(
"tape {} is missing its header (first line is a record)",
path.display()
))
}
};
if header.version > TAPE_FORMAT_VERSION {
return Err(format!(
"tape {} declares version {} but this runtime supports up to {TAPE_FORMAT_VERSION}",
path.display(),
header.version
));
}
let mut records = Vec::new();
for (idx, line) in lines.enumerate() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let parsed: TapeLine = serde_json::from_str(trimmed).map_err(|err| {
format!(
"parse tape record at line {} in {}: {err}",
idx + 2,
path.display()
)
})?;
match parsed {
TapeLine::Record(record) => records.push(record),
TapeLine::Header(_) => {
return Err(format!(
"tape {} contains a second header at line {}",
path.display(),
idx + 2
))
}
}
}
let mut cas = BTreeMap::new();
let cas_dir = cas_dir_for(path);
if cas_dir.is_dir() {
for record in &records {
visit_payloads(&record.kind, |payload| {
if let TapePayload::Cas { content_hash, .. } = payload {
if cas.contains_key(content_hash) {
return;
}
let entry = cas_dir.join(content_hash);
if let Ok(bytes) = std::fs::read(&entry) {
cas.insert(content_hash.clone(), bytes);
}
}
});
}
}
Ok(Self {
header,
records,
cas,
})
}
}
fn cas_dir_for(tape_path: &Path) -> PathBuf {
let mut buf = tape_path.as_os_str().to_owned();
buf.push(".cas");
PathBuf::from(buf)
}
fn visit_payloads(kind: &TapeRecordKind, mut visit: impl FnMut(&TapePayload)) {
match kind {
TapeRecordKind::LlmCall { response, .. } => visit(response),
TapeRecordKind::ProcessSpawn {
stdout_payload,
stderr_payload,
..
} => {
visit(stdout_payload);
visit(stderr_payload);
}
TapeRecordKind::ClockRead { .. }
| TapeRecordKind::ClockSleep { .. }
| TapeRecordKind::FileRead { .. }
| TapeRecordKind::FileWrite { .. }
| TapeRecordKind::FileDelete { .. }
| TapeRecordKind::Unknown => {}
}
}
#[derive(Debug)]
pub struct TapeRecorder {
next_seq: AtomicU64,
started_at: clock_mock::ClockInstant,
inner: Mutex<RecorderInner>,
}
#[derive(Debug, Default)]
struct RecorderInner {
records: Vec<TapeRecord>,
cas: BTreeMap<String, Vec<u8>>,
}
impl Default for TapeRecorder {
fn default() -> Self {
Self::new()
}
}
impl TapeRecorder {
pub fn new() -> Self {
Self {
next_seq: AtomicU64::new(0),
started_at: clock_mock::instant_now(),
inner: Mutex::new(RecorderInner::default()),
}
}
pub fn record(&self, kind: TapeRecordKind) {
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
let virtual_time_ms = clock_mock::now_ms();
let monotonic_ms = clock_mock::instant_now()
.duration_since(self.started_at)
.as_millis()
.min(i64::MAX as u128) as i64;
let record = TapeRecord {
seq,
virtual_time_ms,
monotonic_ms,
kind,
};
self.inner
.lock()
.expect("tape recorder mutex poisoned")
.records
.push(record);
}
pub fn payload_from_bytes(&self, bytes: Vec<u8>) -> TapePayload {
let mut inner = self.inner.lock().expect("tape recorder mutex poisoned");
build_payload(bytes, &mut inner.cas)
}
pub fn snapshot(&self, header: TapeHeader) -> EventTape {
let inner = self.inner.lock().expect("tape recorder mutex poisoned");
EventTape {
header,
records: inner.records.clone(),
cas: inner.cas.clone(),
}
}
}
thread_local! {
static ACTIVE_RECORDER: RefCell<Option<Arc<TapeRecorder>>> = const { RefCell::new(None) };
}
pub struct TapeRecorderGuard {
previous: Option<Arc<TapeRecorder>>,
}
impl Drop for TapeRecorderGuard {
fn drop(&mut self) {
let prev = self.previous.take();
ACTIVE_RECORDER.with(|slot| {
*slot.borrow_mut() = prev;
});
}
}
pub fn install_recorder(recorder: Arc<TapeRecorder>) -> TapeRecorderGuard {
let previous = ACTIVE_RECORDER.with(|slot| slot.replace(Some(recorder)));
TapeRecorderGuard { previous }
}
pub fn active_recorder() -> Option<Arc<TapeRecorder>> {
ACTIVE_RECORDER.with(|slot| slot.borrow().clone())
}
pub fn with_active_recorder<F>(build: F)
where
F: FnOnce(&Arc<TapeRecorder>) -> Option<TapeRecordKind>,
{
let Some(recorder) = active_recorder() else {
return;
};
if let Some(kind) = build(&recorder) {
recorder.record(kind);
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn small_record(seq: u64, dur: u64) -> TapeRecord {
TapeRecord {
seq,
virtual_time_ms: seq as i64 * 1000,
monotonic_ms: seq as i64 * 1000,
kind: TapeRecordKind::ClockSleep { duration_ms: dur },
}
}
#[test]
fn round_trip_inline_records() {
let temp = TempDir::new().unwrap();
let path = temp.path().join("run.tape");
let mut tape = EventTape::new(TapeHeader::current(
Some(1_700_000_000_000),
Some("script.harn".to_string()),
vec!["a".into()],
));
tape.records.push(small_record(0, 250));
tape.records.push(small_record(1, 750));
tape.persist(&path).unwrap();
let loaded = EventTape::load(&path).unwrap();
assert_eq!(loaded.header.version, TAPE_FORMAT_VERSION);
assert_eq!(loaded.header.argv, vec!["a".to_string()]);
assert_eq!(loaded.records.len(), 2);
match &loaded.records[0].kind {
TapeRecordKind::ClockSleep { duration_ms } => assert_eq!(*duration_ms, 250),
other => panic!("unexpected: {other:?}"),
}
}
#[test]
fn large_payloads_spill_to_cas_and_round_trip() {
let temp = TempDir::new().unwrap();
let path = temp.path().join("run.tape");
let mut tape = EventTape::new(TapeHeader::current(None, None, Vec::new()));
let big = vec![b'x'; MAX_INLINE_BYTES + 32];
let payload = build_payload(big.clone(), &mut tape.cas);
let hash = payload.content_hash().to_string();
let kind = TapeRecordKind::ProcessSpawn {
program: "/bin/echo".to_string(),
args: vec!["x".to_string()],
cwd: None,
exit_code: 0,
duration_ms: 1,
stdout_payload: payload,
stderr_payload: build_payload(Vec::new(), &mut tape.cas),
};
tape.records.push(TapeRecord {
seq: 0,
virtual_time_ms: 0,
monotonic_ms: 0,
kind,
});
tape.persist(&path).unwrap();
assert!(path.with_extension("tape.cas").exists() || cas_dir_for(&path).exists());
let cas_dir = cas_dir_for(&path);
assert!(cas_dir.join(&hash).exists());
let loaded = EventTape::load(&path).unwrap();
let resolved = match &loaded.records[0].kind {
TapeRecordKind::ProcessSpawn { stdout_payload, .. } => {
loaded.resolve_payload(stdout_payload).unwrap()
}
other => panic!("unexpected: {other:?}"),
};
assert_eq!(resolved.len(), big.len());
}
#[test]
fn rejects_newer_version() {
let temp = TempDir::new().unwrap();
let path = temp.path().join("future.tape");
std::fs::write(
&path,
r#"{"type":"header","version":99,"harn_version":"x","started_at_unix_ms":null,"script_path":null,"argv":[]}
"#,
)
.unwrap();
let err = EventTape::load(&path).unwrap_err();
assert!(err.contains("version 99"), "{err}");
}
#[test]
fn recorder_assigns_monotonic_seq() {
let recorder = Arc::new(TapeRecorder::new());
recorder.record(TapeRecordKind::ClockSleep { duration_ms: 1 });
recorder.record(TapeRecordKind::ClockSleep { duration_ms: 2 });
let snapshot = recorder.snapshot(TapeHeader::current(None, None, Vec::new()));
assert_eq!(snapshot.records[0].seq, 0);
assert_eq!(snapshot.records[1].seq, 1);
}
}