use crate::residual::{ResidualSample, ResidualStream};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TapeManifest {
pub sha256: String,
pub sample_count: u64,
pub first_t: Option<f64>,
pub last_t: Option<f64>,
pub crate_version: String,
pub source: String,
}
pub struct Tape {
path: PathBuf,
writer: BufWriter<File>,
sample_count: u64,
first_t: Option<f64>,
last_t: Option<f64>,
source: String,
}
impl Tape {
pub fn create(path: &Path, source: impl Into<String>) -> Result<Self> {
let f = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)
.with_context(|| format!("opening tape file {}", path.display()))?;
Ok(Self {
path: path.to_path_buf(),
writer: BufWriter::new(f),
sample_count: 0,
first_t: None,
last_t: None,
source: source.into(),
})
}
pub fn append(&mut self, samples: &[ResidualSample]) -> Result<()> {
for s in samples {
let line = serde_json::to_string(s)
.with_context(|| "serializing ResidualSample to tape")?;
self.writer
.write_all(line.as_bytes())
.with_context(|| "writing tape line")?;
self.writer
.write_all(b"\n")
.with_context(|| "writing tape newline")?;
self.sample_count += 1;
self.first_t = Some(self.first_t.map(|t| t.min(s.t)).unwrap_or(s.t));
self.last_t = Some(self.last_t.map(|t| t.max(s.t)).unwrap_or(s.t));
}
Ok(())
}
pub fn finalize(mut self) -> Result<TapeManifest> {
self.writer
.flush()
.with_context(|| "flushing tape writer")?;
let f = self
.writer
.into_inner()
.with_context(|| "unwrapping BufWriter to fsync")?;
f.sync_all().with_context(|| "fsyncing tape file")?;
drop(f);
let tape_bytes = std::fs::read(&self.path)
.with_context(|| format!("re-reading tape for hashing: {}", self.path.display()))?;
let mut h = Sha256::new();
h.update(&tape_bytes);
let sha = h.finalize();
let sha_hex: String = sha.iter().map(|b| format!("{:02x}", b)).collect();
let manifest = TapeManifest {
sha256: sha_hex,
sample_count: self.sample_count,
first_t: self.first_t,
last_t: self.last_t,
crate_version: crate::CRATE_VERSION.to_string(),
source: self.source.clone(),
};
let manifest_path = manifest_path_for(&self.path);
let json = serde_json::to_string_pretty(&manifest)
.with_context(|| "serializing tape manifest")?;
std::fs::write(&manifest_path, json)
.with_context(|| format!("writing tape manifest {}", manifest_path.display()))?;
Ok(manifest)
}
}
pub fn manifest_path_for(tape: &Path) -> PathBuf {
let mut p = tape.as_os_str().to_os_string();
p.push(".hash");
PathBuf::from(p)
}
pub fn load_and_verify(path: &Path) -> Result<(ResidualStream, TapeManifest)> {
let manifest_path = manifest_path_for(path);
let manifest_json = std::fs::read_to_string(&manifest_path)
.with_context(|| format!("reading tape manifest {}", manifest_path.display()))?;
let manifest: TapeManifest = serde_json::from_str(&manifest_json)
.with_context(|| "parsing tape manifest")?;
let tape_bytes = std::fs::read(path)
.with_context(|| format!("reading tape {}", path.display()))?;
let mut h = Sha256::new();
h.update(&tape_bytes);
let sha = h.finalize();
let got: String = sha.iter().map(|b| format!("{:02x}", b)).collect();
if got != manifest.sha256 {
anyhow::bail!(
"tape hash mismatch at {}: manifest={} actual={}",
path.display(),
manifest.sha256,
got
);
}
let mut stream = ResidualStream::new(manifest.source.clone());
for (ix, line) in std::str::from_utf8(&tape_bytes)
.with_context(|| "tape contains non-UTF-8 bytes")?
.lines()
.enumerate()
{
if line.is_empty() {
continue;
}
let s: ResidualSample = serde_json::from_str(line)
.with_context(|| format!("parsing tape line {}", ix + 1))?;
stream.push(s);
}
stream.sort();
if stream.len() as u64 != manifest.sample_count {
anyhow::bail!(
"tape sample count mismatch: manifest={} actual={}",
manifest.sample_count,
stream.len()
);
}
Ok((stream, manifest))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::residual::ResidualClass;
use tempfile::tempdir;
#[test]
fn roundtrip_tape_preserves_samples() {
let dir = tempdir().unwrap();
let path = dir.path().join("t.jsonl");
let mut tape = Tape::create(&path, "live-postgres:test").unwrap();
let samples = vec![
ResidualSample::new(0.0, ResidualClass::PlanRegression, 0.1)
.with_channel("q1"),
ResidualSample::new(1.0, ResidualClass::WorkloadPhase, 0.2)
.with_channel("bucket"),
ResidualSample::new(2.0, ResidualClass::PlanRegression, 0.3)
.with_channel("q2"),
];
tape.append(&samples).unwrap();
let manifest = tape.finalize().unwrap();
assert_eq!(manifest.sample_count, 3);
assert_eq!(manifest.first_t, Some(0.0));
assert_eq!(manifest.last_t, Some(2.0));
let (stream, m) = load_and_verify(&path).unwrap();
assert_eq!(stream.len(), 3);
assert_eq!(m.sha256, manifest.sha256);
}
#[test]
fn tampered_tape_fails_verification() {
let dir = tempdir().unwrap();
let path = dir.path().join("t.jsonl");
let mut tape = Tape::create(&path, "live-postgres:test").unwrap();
tape.append(&[
ResidualSample::new(0.0, ResidualClass::PlanRegression, 0.1),
ResidualSample::new(1.0, ResidualClass::PlanRegression, 0.2),
])
.unwrap();
tape.finalize().unwrap();
let mut bytes = std::fs::read(&path).unwrap();
bytes[0] = b'X'; std::fs::write(&path, &bytes).unwrap();
let err = load_and_verify(&path).unwrap_err();
let msg = format!("{:#}", err);
assert!(
msg.contains("tape hash mismatch") || msg.contains("parsing tape line"),
"expected hash-mismatch or parse error, got: {}",
msg
);
}
#[test]
fn manifest_path_suffix() {
let p = Path::new("/tmp/a/b.tape");
assert_eq!(manifest_path_for(p), PathBuf::from("/tmp/a/b.tape.hash"));
}
}