use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use thiserror::Error;
use crate::canonical::{decode_record, CanonicalRecord};
pub const LOG_MAGIC: [u8; 4] = *b"MIMR";
pub const LOG_FORMAT_VERSION: u32 = 1;
pub const LOG_HEADER_SIZE: u64 = 8;
pub trait LogBackend {
fn append(&mut self, bytes: &[u8]) -> Result<(), LogError>;
fn sync(&mut self) -> Result<(), LogError>;
fn truncate(&mut self, new_len: u64) -> Result<(), LogError>;
fn read_all(&mut self) -> Result<Vec<u8>, LogError>;
fn len(&self) -> u64;
fn is_empty(&self) -> bool {
self.len() == 0
}
fn last_checkpoint_end(&mut self) -> Result<u64, LogError>;
}
#[derive(Debug)]
pub struct CanonicalLog {
file: File,
path: PathBuf,
len: u64,
}
impl CanonicalLog {
pub fn open(path: impl AsRef<Path>) -> Result<Self, LogError> {
let path = path.as_ref().to_path_buf();
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&path)
.map_err(LogError::Io)?;
let physical_len = file.metadata().map_err(LogError::Io)?.len();
if physical_len == 0 {
file.seek(SeekFrom::Start(0)).map_err(LogError::Io)?;
let mut header = [0_u8; 8];
header[0..4].copy_from_slice(&LOG_MAGIC);
header[4..8].copy_from_slice(&LOG_FORMAT_VERSION.to_le_bytes());
file.write_all(&header).map_err(LogError::Io)?;
file.sync_all().map_err(LogError::Io)?;
return Ok(Self { file, path, len: 0 });
}
if physical_len < LOG_HEADER_SIZE {
return Err(LogError::IncompatibleFormat {
reason: format!(
"file is {physical_len} bytes; expected at least \
{LOG_HEADER_SIZE}-byte Mimir header"
),
});
}
file.seek(SeekFrom::Start(0)).map_err(LogError::Io)?;
let mut header = [0_u8; 8];
file.read_exact(&mut header).map_err(LogError::Io)?;
if header[0..4] != LOG_MAGIC {
return Err(LogError::IncompatibleFormat {
reason: format!(
"magic mismatch: got {:?}, expected {:?} ({:?})",
&header[0..4],
LOG_MAGIC,
std::str::from_utf8(&LOG_MAGIC).unwrap_or("?"),
),
});
}
let version = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
if version != LOG_FORMAT_VERSION {
return Err(LogError::IncompatibleFormat {
reason: format!(
"format version {version} not supported \
(this build supports version {LOG_FORMAT_VERSION})"
),
});
}
let len = physical_len - LOG_HEADER_SIZE;
Ok(Self { file, path, len })
}
#[must_use]
pub fn path(&self) -> &Path {
&self.path
}
}
impl LogBackend for CanonicalLog {
fn append(&mut self, bytes: &[u8]) -> Result<(), LogError> {
self.file.seek(SeekFrom::End(0)).map_err(LogError::Io)?;
self.file.write_all(bytes).map_err(LogError::Io)?;
self.len = self
.len
.checked_add(bytes.len() as u64)
.ok_or(LogError::LogOverflow)?;
Ok(())
}
fn sync(&mut self) -> Result<(), LogError> {
self.file.sync_all().map_err(LogError::Io)
}
fn truncate(&mut self, new_len: u64) -> Result<(), LogError> {
if new_len > self.len {
return Err(LogError::TruncateBeyondEnd {
requested: new_len,
current: self.len,
});
}
let physical_new_len = LOG_HEADER_SIZE
.checked_add(new_len)
.ok_or(LogError::LogOverflow)?;
self.file.set_len(physical_new_len).map_err(LogError::Io)?;
self.file.sync_all().map_err(LogError::Io)?;
self.len = new_len;
Ok(())
}
fn read_all(&mut self) -> Result<Vec<u8>, LogError> {
self.file
.seek(SeekFrom::Start(LOG_HEADER_SIZE))
.map_err(LogError::Io)?;
let capacity = usize::try_from(self.len).unwrap_or(usize::MAX);
let mut buf = Vec::with_capacity(capacity);
self.file.read_to_end(&mut buf).map_err(LogError::Io)?;
Ok(buf)
}
fn len(&self) -> u64 {
self.len
}
fn last_checkpoint_end(&mut self) -> Result<u64, LogError> {
let bytes = self.read_all()?;
let mut pos: usize = 0;
let mut last_checkpoint_end: u64 = 0;
while pos < bytes.len() {
match decode_record(&bytes[pos..]) {
Ok((record, consumed)) => {
pos += consumed;
if matches!(record, CanonicalRecord::Checkpoint(_)) {
last_checkpoint_end = pos as u64;
}
}
Err(_) => break,
}
}
Ok(last_checkpoint_end)
}
}
#[derive(Debug, Error)]
pub enum LogError {
#[error("log I/O error: {0}")]
Io(#[source] std::io::Error),
#[error("log length would overflow u64")]
LogOverflow,
#[error("truncate target {requested} exceeds current length {current}")]
TruncateBeyondEnd {
requested: u64,
current: u64,
},
#[error("incompatible canonical-log format: {reason}")]
IncompatibleFormat {
reason: String,
},
}
impl PartialEq for LogError {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Io(a), Self::Io(b)) => a.kind() == b.kind(),
(Self::LogOverflow, Self::LogOverflow) => true,
(
Self::TruncateBeyondEnd {
requested: ra,
current: ca,
},
Self::TruncateBeyondEnd {
requested: rb,
current: cb,
},
) => ra == rb && ca == cb,
(Self::IncompatibleFormat { reason: ra }, Self::IncompatibleFormat { reason: rb }) => {
ra == rb
}
_ => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::canonical::{encode_record, CheckpointRecord};
use crate::clock::ClockTime;
use crate::symbol::SymbolId;
use std::fs;
use tempfile::TempDir;
fn checkpoint_bytes(seed: u64) -> Vec<u8> {
let mut buf = Vec::new();
encode_record(
&CanonicalRecord::Checkpoint(CheckpointRecord {
episode_id: SymbolId::new(seed),
at: ClockTime::try_from_millis(seed * 1000).expect("non-sentinel"),
memory_count: 1,
}),
&mut buf,
);
buf
}
#[test]
fn open_creates_empty_log_and_writes_header() {
let tmp = TempDir::new().expect("tmp");
let path = tmp.path().join("canonical.log");
let log = CanonicalLog::open(&path).expect("open");
assert!(
log.is_empty(),
"logical length is 0 (header is transparent)"
);
assert_eq!(log.len(), 0);
let physical = fs::metadata(&path).expect("stat").len();
assert_eq!(physical, LOG_HEADER_SIZE);
let raw = fs::read(&path).expect("read raw");
assert_eq!(&raw[0..4], &LOG_MAGIC, "magic prefix written");
assert_eq!(
u32::from_le_bytes([raw[4], raw[5], raw[6], raw[7]]),
LOG_FORMAT_VERSION,
"format version written LE"
);
}
#[test]
fn open_reopens_existing_log_preserving_length() {
let tmp = TempDir::new().expect("tmp");
let path = tmp.path().join("canonical.log");
let payload = checkpoint_bytes(1);
{
let mut log = CanonicalLog::open(&path).expect("open");
log.append(&payload).expect("append");
log.sync().expect("sync");
}
let log = CanonicalLog::open(&path).expect("reopen");
assert_eq!(log.len(), payload.len() as u64);
}
#[test]
fn open_refuses_to_initialize_non_mimir_file() {
let tmp = TempDir::new().expect("tmp");
let path = tmp.path().join("not-a-mimir-log.cfg");
let original: &[u8] = b"some_other_format=hello\nimportant_data=42\n";
fs::write(&path, original).expect("write fixture");
let err = CanonicalLog::open(&path).expect_err("must reject non-Mimir file");
assert!(
matches!(err, LogError::IncompatibleFormat { .. }),
"expected IncompatibleFormat, got {err:?}"
);
let after = fs::read(&path).expect("read post-open");
assert_eq!(
after, original,
"non-Mimir file must be preserved byte-for-byte on rejected open"
);
}
#[test]
fn open_refuses_truncated_header() {
let tmp = TempDir::new().expect("tmp");
let path = tmp.path().join("canonical.log");
fs::write(&path, b"MIMR\x01").expect("write fixture");
let err = CanonicalLog::open(&path).expect_err("must reject truncated header");
assert!(
matches!(err, LogError::IncompatibleFormat { .. }),
"expected IncompatibleFormat, got {err:?}"
);
assert_eq!(fs::read(&path).expect("read"), b"MIMR\x01");
}
#[test]
fn open_refuses_wrong_magic() {
let tmp = TempDir::new().expect("tmp");
let path = tmp.path().join("canonical.log");
fs::write(&path, b"WICK\x01\x00\x00\x00").expect("write fixture");
let err = CanonicalLog::open(&path).expect_err("must reject wrong magic");
assert!(
matches!(err, LogError::IncompatibleFormat { .. }),
"expected IncompatibleFormat, got {err:?}"
);
}
#[test]
fn open_refuses_unsupported_format_version() {
let tmp = TempDir::new().expect("tmp");
let path = tmp.path().join("canonical.log");
let mut header = Vec::with_capacity(8);
header.extend_from_slice(&LOG_MAGIC);
header.extend_from_slice(&999_u32.to_le_bytes());
fs::write(&path, &header).expect("write fixture");
let err = CanonicalLog::open(&path).expect_err("must reject unsupported version");
match err {
LogError::IncompatibleFormat { reason } => {
assert!(
reason.contains("999"),
"diagnostic should name the bad version, got: {reason}"
);
}
other => panic!("expected IncompatibleFormat, got {other:?}"),
}
}
#[test]
fn open_idempotent_against_reopen() {
let tmp = TempDir::new().expect("tmp");
let path = tmp.path().join("canonical.log");
let _first = CanonicalLog::open(&path).expect("first open");
let raw1 = fs::read(&path).expect("read 1");
let _second = CanonicalLog::open(&path).expect("reopen");
let raw2 = fs::read(&path).expect("read 2");
assert_eq!(raw1, raw2, "reopen does not mutate the header");
assert_eq!(
raw1.len(),
usize::try_from(LOG_HEADER_SIZE).expect("header fits")
);
}
#[test]
fn append_sync_roundtrip_preserves_bytes() {
let tmp = TempDir::new().expect("tmp");
let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
let payload = checkpoint_bytes(42);
log.append(&payload).expect("append");
log.sync().expect("sync");
let read = log.read_all().expect("read");
assert_eq!(read, payload);
}
#[test]
fn truncate_shrinks_log() {
let tmp = TempDir::new().expect("tmp");
let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
let first = checkpoint_bytes(1);
let second = checkpoint_bytes(2);
log.append(&first).expect("append 1");
log.append(&second).expect("append 2");
log.sync().expect("sync");
log.truncate(first.len() as u64).expect("truncate");
assert_eq!(log.len(), first.len() as u64);
let read = log.read_all().expect("read");
assert_eq!(read, first);
}
#[test]
fn truncate_beyond_end_errors() {
let tmp = TempDir::new().expect("tmp");
let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
let err = log.truncate(100).expect_err("beyond");
assert!(matches!(
err,
LogError::TruncateBeyondEnd {
requested: 100,
current: 0
}
));
}
#[test]
fn truncate_to_zero_clears_the_log() {
let tmp = TempDir::new().expect("tmp");
let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
let payload = checkpoint_bytes(1);
log.append(&payload).expect("append");
log.sync().expect("sync");
assert!(log.len() > 0);
log.truncate(0).expect("truncate to zero");
assert_eq!(log.len(), 0);
assert!(log.is_empty());
assert!(log.read_all().expect("read").is_empty());
}
#[test]
fn truncate_to_current_length_is_a_noop() {
let tmp = TempDir::new().expect("tmp");
let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
let payload = checkpoint_bytes(1);
log.append(&payload).expect("append");
log.sync().expect("sync");
let before = log.len();
log.truncate(before).expect("truncate to current len");
assert_eq!(log.len(), before);
assert_eq!(log.read_all().expect("read"), payload);
}
#[test]
fn last_checkpoint_end_returns_zero_for_empty_log() {
let tmp = TempDir::new().expect("tmp");
let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
assert_eq!(log.last_checkpoint_end().expect("scan"), 0);
}
#[test]
fn last_checkpoint_end_finds_the_final_checkpoint() {
let tmp = TempDir::new().expect("tmp");
let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
let cp_a = checkpoint_bytes(1);
let cp_b = checkpoint_bytes(2);
log.append(&cp_a).expect("append a");
log.append(&cp_b).expect("append b");
log.sync().expect("sync");
let end = log.last_checkpoint_end().expect("scan");
assert_eq!(end, (cp_a.len() + cp_b.len()) as u64);
}
#[test]
fn last_checkpoint_end_stops_at_corruption() {
let tmp = TempDir::new().expect("tmp");
let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
let cp = checkpoint_bytes(1);
log.append(&cp).expect("append");
log.append(&[0x01_u8]).expect("append garbage");
log.sync().expect("sync");
let end = log.last_checkpoint_end().expect("scan");
assert_eq!(end, cp.len() as u64);
}
}