1use std::fs::{File, OpenOptions};
48use std::io::{Read, Seek, SeekFrom, Write};
49use std::path::{Path, PathBuf};
50
51use thiserror::Error;
52
53use crate::canonical::{decode_record, CanonicalRecord};
54
55pub const LOG_MAGIC: [u8; 4] = *b"MIMR";
57
58pub const LOG_FORMAT_VERSION: u32 = 1;
61
62pub const LOG_HEADER_SIZE: u64 = 8;
64
65pub trait LogBackend {
71 fn append(&mut self, bytes: &[u8]) -> Result<(), LogError>;
79
80 fn sync(&mut self) -> Result<(), LogError>;
87
88 fn truncate(&mut self, new_len: u64) -> Result<(), LogError>;
95
96 fn read_all(&mut self) -> Result<Vec<u8>, LogError>;
102
103 fn len(&self) -> u64;
105
106 fn is_empty(&self) -> bool {
108 self.len() == 0
109 }
110
111 fn last_checkpoint_end(&mut self) -> Result<u64, LogError>;
122}
123
124#[derive(Debug)]
126pub struct CanonicalLog {
127 file: File,
128 path: PathBuf,
129 len: u64,
130}
131
132impl CanonicalLog {
133 pub fn open(path: impl AsRef<Path>) -> Result<Self, LogError> {
152 let path = path.as_ref().to_path_buf();
153 let mut file = OpenOptions::new()
154 .read(true)
155 .write(true)
156 .create(true)
157 .truncate(false)
158 .open(&path)
159 .map_err(LogError::Io)?;
160 let physical_len = file.metadata().map_err(LogError::Io)?.len();
161
162 if physical_len == 0 {
163 file.seek(SeekFrom::Start(0)).map_err(LogError::Io)?;
167 let mut header = [0_u8; 8];
168 header[0..4].copy_from_slice(&LOG_MAGIC);
169 header[4..8].copy_from_slice(&LOG_FORMAT_VERSION.to_le_bytes());
170 file.write_all(&header).map_err(LogError::Io)?;
171 file.sync_all().map_err(LogError::Io)?;
172 return Ok(Self { file, path, len: 0 });
173 }
174
175 if physical_len < LOG_HEADER_SIZE {
176 return Err(LogError::IncompatibleFormat {
177 reason: format!(
178 "file is {physical_len} bytes; expected at least \
179 {LOG_HEADER_SIZE}-byte Mimir header"
180 ),
181 });
182 }
183
184 file.seek(SeekFrom::Start(0)).map_err(LogError::Io)?;
186 let mut header = [0_u8; 8];
187 file.read_exact(&mut header).map_err(LogError::Io)?;
188 if header[0..4] != LOG_MAGIC {
189 return Err(LogError::IncompatibleFormat {
190 reason: format!(
191 "magic mismatch: got {:?}, expected {:?} ({:?})",
192 &header[0..4],
193 LOG_MAGIC,
194 std::str::from_utf8(&LOG_MAGIC).unwrap_or("?"),
195 ),
196 });
197 }
198 let version = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
199 if version != LOG_FORMAT_VERSION {
200 return Err(LogError::IncompatibleFormat {
201 reason: format!(
202 "format version {version} not supported \
203 (this build supports version {LOG_FORMAT_VERSION})"
204 ),
205 });
206 }
207
208 let len = physical_len - LOG_HEADER_SIZE;
211 Ok(Self { file, path, len })
212 }
213
214 #[must_use]
216 pub fn path(&self) -> &Path {
217 &self.path
218 }
219}
220
221impl LogBackend for CanonicalLog {
222 fn append(&mut self, bytes: &[u8]) -> Result<(), LogError> {
223 self.file.seek(SeekFrom::End(0)).map_err(LogError::Io)?;
224 self.file.write_all(bytes).map_err(LogError::Io)?;
225 self.len = self
226 .len
227 .checked_add(bytes.len() as u64)
228 .ok_or(LogError::LogOverflow)?;
229 Ok(())
230 }
231
232 fn sync(&mut self) -> Result<(), LogError> {
233 self.file.sync_all().map_err(LogError::Io)
234 }
235
236 fn truncate(&mut self, new_len: u64) -> Result<(), LogError> {
237 if new_len > self.len {
238 return Err(LogError::TruncateBeyondEnd {
239 requested: new_len,
240 current: self.len,
241 });
242 }
243 let physical_new_len = LOG_HEADER_SIZE
247 .checked_add(new_len)
248 .ok_or(LogError::LogOverflow)?;
249 self.file.set_len(physical_new_len).map_err(LogError::Io)?;
250 self.file.sync_all().map_err(LogError::Io)?;
251 self.len = new_len;
252 Ok(())
253 }
254
255 fn read_all(&mut self) -> Result<Vec<u8>, LogError> {
256 self.file
258 .seek(SeekFrom::Start(LOG_HEADER_SIZE))
259 .map_err(LogError::Io)?;
260 let capacity = usize::try_from(self.len).unwrap_or(usize::MAX);
261 let mut buf = Vec::with_capacity(capacity);
262 self.file.read_to_end(&mut buf).map_err(LogError::Io)?;
263 Ok(buf)
264 }
265
266 fn len(&self) -> u64 {
267 self.len
268 }
269
270 fn last_checkpoint_end(&mut self) -> Result<u64, LogError> {
271 let bytes = self.read_all()?;
272 let mut pos: usize = 0;
273 let mut last_checkpoint_end: u64 = 0;
274 while pos < bytes.len() {
275 match decode_record(&bytes[pos..]) {
276 Ok((record, consumed)) => {
277 pos += consumed;
278 if matches!(record, CanonicalRecord::Checkpoint(_)) {
279 last_checkpoint_end = pos as u64;
280 }
281 }
282 Err(_) => break,
283 }
284 }
285 Ok(last_checkpoint_end)
286 }
287}
288
289#[derive(Debug, Error)]
291pub enum LogError {
292 #[error("log I/O error: {0}")]
294 Io(#[source] std::io::Error),
295
296 #[error("log length would overflow u64")]
299 LogOverflow,
300
301 #[error("truncate target {requested} exceeds current length {current}")]
303 TruncateBeyondEnd {
304 requested: u64,
306 current: u64,
308 },
309
310 #[error("incompatible canonical-log format: {reason}")]
317 IncompatibleFormat {
318 reason: String,
321 },
322}
323
324impl PartialEq for LogError {
325 fn eq(&self, other: &Self) -> bool {
326 match (self, other) {
327 (Self::Io(a), Self::Io(b)) => a.kind() == b.kind(),
328 (Self::LogOverflow, Self::LogOverflow) => true,
329 (
330 Self::TruncateBeyondEnd {
331 requested: ra,
332 current: ca,
333 },
334 Self::TruncateBeyondEnd {
335 requested: rb,
336 current: cb,
337 },
338 ) => ra == rb && ca == cb,
339 (Self::IncompatibleFormat { reason: ra }, Self::IncompatibleFormat { reason: rb }) => {
340 ra == rb
341 }
342 _ => false,
343 }
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350 use crate::canonical::{encode_record, CheckpointRecord};
351 use crate::clock::ClockTime;
352 use crate::symbol::SymbolId;
353 use std::fs;
354 use tempfile::TempDir;
355
356 fn checkpoint_bytes(seed: u64) -> Vec<u8> {
357 let mut buf = Vec::new();
358 encode_record(
359 &CanonicalRecord::Checkpoint(CheckpointRecord {
360 episode_id: SymbolId::new(seed),
361 at: ClockTime::try_from_millis(seed * 1000).expect("non-sentinel"),
362 memory_count: 1,
363 }),
364 &mut buf,
365 );
366 buf
367 }
368
369 #[test]
370 fn open_creates_empty_log_and_writes_header() {
371 let tmp = TempDir::new().expect("tmp");
372 let path = tmp.path().join("canonical.log");
373 let log = CanonicalLog::open(&path).expect("open");
374 assert!(
375 log.is_empty(),
376 "logical length is 0 (header is transparent)"
377 );
378 assert_eq!(log.len(), 0);
379 let physical = fs::metadata(&path).expect("stat").len();
381 assert_eq!(physical, LOG_HEADER_SIZE);
382 let raw = fs::read(&path).expect("read raw");
383 assert_eq!(&raw[0..4], &LOG_MAGIC, "magic prefix written");
384 assert_eq!(
385 u32::from_le_bytes([raw[4], raw[5], raw[6], raw[7]]),
386 LOG_FORMAT_VERSION,
387 "format version written LE"
388 );
389 }
390
391 #[test]
392 fn open_reopens_existing_log_preserving_length() {
393 let tmp = TempDir::new().expect("tmp");
394 let path = tmp.path().join("canonical.log");
395 let payload = checkpoint_bytes(1);
396 {
397 let mut log = CanonicalLog::open(&path).expect("open");
398 log.append(&payload).expect("append");
399 log.sync().expect("sync");
400 }
401 let log = CanonicalLog::open(&path).expect("reopen");
402 assert_eq!(log.len(), payload.len() as u64);
403 }
404
405 #[test]
414 fn open_refuses_to_initialize_non_mimir_file() {
415 let tmp = TempDir::new().expect("tmp");
416 let path = tmp.path().join("not-a-mimir-log.cfg");
417 let original: &[u8] = b"some_other_format=hello\nimportant_data=42\n";
418 fs::write(&path, original).expect("write fixture");
419
420 let err = CanonicalLog::open(&path).expect_err("must reject non-Mimir file");
421 assert!(
422 matches!(err, LogError::IncompatibleFormat { .. }),
423 "expected IncompatibleFormat, got {err:?}"
424 );
425
426 let after = fs::read(&path).expect("read post-open");
430 assert_eq!(
431 after, original,
432 "non-Mimir file must be preserved byte-for-byte on rejected open"
433 );
434 }
435
436 #[test]
437 fn open_refuses_truncated_header() {
438 let tmp = TempDir::new().expect("tmp");
439 let path = tmp.path().join("canonical.log");
440 fs::write(&path, b"MIMR\x01").expect("write fixture");
444 let err = CanonicalLog::open(&path).expect_err("must reject truncated header");
445 assert!(
446 matches!(err, LogError::IncompatibleFormat { .. }),
447 "expected IncompatibleFormat, got {err:?}"
448 );
449 assert_eq!(fs::read(&path).expect("read"), b"MIMR\x01");
451 }
452
453 #[test]
454 fn open_refuses_wrong_magic() {
455 let tmp = TempDir::new().expect("tmp");
456 let path = tmp.path().join("canonical.log");
457 fs::write(&path, b"WICK\x01\x00\x00\x00").expect("write fixture");
460 let err = CanonicalLog::open(&path).expect_err("must reject wrong magic");
461 assert!(
462 matches!(err, LogError::IncompatibleFormat { .. }),
463 "expected IncompatibleFormat, got {err:?}"
464 );
465 }
466
467 #[test]
468 fn open_refuses_unsupported_format_version() {
469 let tmp = TempDir::new().expect("tmp");
470 let path = tmp.path().join("canonical.log");
471 let mut header = Vec::with_capacity(8);
473 header.extend_from_slice(&LOG_MAGIC);
474 header.extend_from_slice(&999_u32.to_le_bytes());
475 fs::write(&path, &header).expect("write fixture");
476 let err = CanonicalLog::open(&path).expect_err("must reject unsupported version");
477 match err {
478 LogError::IncompatibleFormat { reason } => {
479 assert!(
480 reason.contains("999"),
481 "diagnostic should name the bad version, got: {reason}"
482 );
483 }
484 other => panic!("expected IncompatibleFormat, got {other:?}"),
485 }
486 }
487
488 #[test]
489 fn open_idempotent_against_reopen() {
490 let tmp = TempDir::new().expect("tmp");
493 let path = tmp.path().join("canonical.log");
494 let _first = CanonicalLog::open(&path).expect("first open");
495 let raw1 = fs::read(&path).expect("read 1");
497 let _second = CanonicalLog::open(&path).expect("reopen");
499 let raw2 = fs::read(&path).expect("read 2");
500 assert_eq!(raw1, raw2, "reopen does not mutate the header");
501 assert_eq!(
502 raw1.len(),
503 usize::try_from(LOG_HEADER_SIZE).expect("header fits")
504 );
505 }
506
507 #[test]
508 fn append_sync_roundtrip_preserves_bytes() {
509 let tmp = TempDir::new().expect("tmp");
510 let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
511 let payload = checkpoint_bytes(42);
512 log.append(&payload).expect("append");
513 log.sync().expect("sync");
514 let read = log.read_all().expect("read");
515 assert_eq!(read, payload);
516 }
517
518 #[test]
519 fn truncate_shrinks_log() {
520 let tmp = TempDir::new().expect("tmp");
521 let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
522 let first = checkpoint_bytes(1);
523 let second = checkpoint_bytes(2);
524 log.append(&first).expect("append 1");
525 log.append(&second).expect("append 2");
526 log.sync().expect("sync");
527 log.truncate(first.len() as u64).expect("truncate");
528 assert_eq!(log.len(), first.len() as u64);
529 let read = log.read_all().expect("read");
530 assert_eq!(read, first);
531 }
532
533 #[test]
534 fn truncate_beyond_end_errors() {
535 let tmp = TempDir::new().expect("tmp");
536 let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
537 let err = log.truncate(100).expect_err("beyond");
538 assert!(matches!(
539 err,
540 LogError::TruncateBeyondEnd {
541 requested: 100,
542 current: 0
543 }
544 ));
545 }
546
547 #[test]
548 fn truncate_to_zero_clears_the_log() {
549 let tmp = TempDir::new().expect("tmp");
550 let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
551 let payload = checkpoint_bytes(1);
552 log.append(&payload).expect("append");
553 log.sync().expect("sync");
554 assert!(log.len() > 0);
555 log.truncate(0).expect("truncate to zero");
556 assert_eq!(log.len(), 0);
557 assert!(log.is_empty());
558 assert!(log.read_all().expect("read").is_empty());
559 }
560
561 #[test]
562 fn truncate_to_current_length_is_a_noop() {
563 let tmp = TempDir::new().expect("tmp");
564 let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
565 let payload = checkpoint_bytes(1);
566 log.append(&payload).expect("append");
567 log.sync().expect("sync");
568 let before = log.len();
569 log.truncate(before).expect("truncate to current len");
570 assert_eq!(log.len(), before);
571 assert_eq!(log.read_all().expect("read"), payload);
572 }
573
574 #[test]
575 fn last_checkpoint_end_returns_zero_for_empty_log() {
576 let tmp = TempDir::new().expect("tmp");
577 let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
578 assert_eq!(log.last_checkpoint_end().expect("scan"), 0);
579 }
580
581 #[test]
582 fn last_checkpoint_end_finds_the_final_checkpoint() {
583 let tmp = TempDir::new().expect("tmp");
584 let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
585 let cp_a = checkpoint_bytes(1);
586 let cp_b = checkpoint_bytes(2);
587 log.append(&cp_a).expect("append a");
588 log.append(&cp_b).expect("append b");
589 log.sync().expect("sync");
590 let end = log.last_checkpoint_end().expect("scan");
591 assert_eq!(end, (cp_a.len() + cp_b.len()) as u64);
592 }
593
594 #[test]
595 fn last_checkpoint_end_stops_at_corruption() {
596 let tmp = TempDir::new().expect("tmp");
597 let mut log = CanonicalLog::open(tmp.path().join("canonical.log")).expect("open");
598 let cp = checkpoint_bytes(1);
599 log.append(&cp).expect("append");
600 log.append(&[0x01_u8]).expect("append garbage");
602 log.sync().expect("sync");
603 let end = log.last_checkpoint_end().expect("scan");
604 assert_eq!(end, cp.len() as u64);
606 }
607}