use crate::coordinate::Coordinate;
use crate::event::{EventKind, HashChain};
use crate::store::index::{DiskPos, IndexEntry, StoreIndex};
use crate::store::StoreError;
use serde::{Deserialize, Serialize};
use std::io::{BufWriter, Write};
use std::path::Path;
use tempfile::NamedTempFile;
pub(crate) const CHECKPOINT_MAGIC: &[u8; 6] = b"FBATCK";
pub(crate) const CHECKPOINT_VERSION: u16 = 2;
pub(crate) const CHECKPOINT_FILENAME: &str = "index.ckpt";
#[derive(Serialize, Deserialize)]
struct CheckpointData {
global_sequence: u64,
watermark_segment_id: u64,
watermark_offset: u64,
interner_strings: Vec<String>,
entries: Vec<CheckpointEntry>,
}
#[derive(Serialize, Deserialize)]
pub(crate) struct CheckpointEntry {
#[serde(with = "crate::wire::u128_bytes")]
pub event_id: u128,
#[serde(with = "crate::wire::u128_bytes")]
pub correlation_id: u128,
#[serde(with = "crate::wire::option_u128_bytes")]
pub causation_id: Option<u128>,
pub entity_id: u32,
pub scope_id: u32,
pub kind: EventKind,
pub wall_ms: u64,
pub clock: u32,
pub prev_hash: [u8; 32],
pub event_hash: [u8; 32],
pub segment_id: u64,
pub offset: u64,
pub length: u32,
pub global_sequence: u64,
}
pub(crate) struct WatermarkInfo {
pub watermark_segment_id: u64,
pub watermark_offset: u64,
}
pub(crate) fn write_checkpoint(
index: &StoreIndex,
data_dir: &Path,
watermark_segment_id: u64,
watermark_offset: u64,
) -> Result<(), StoreError> {
let mut entries: Vec<CheckpointEntry> = index
.all_entries()
.into_iter()
.map(|e| CheckpointEntry {
event_id: e.event_id,
correlation_id: e.correlation_id,
causation_id: e.causation_id,
entity_id: e.entity_id.as_u32(),
scope_id: e.scope_id.as_u32(),
kind: e.kind,
wall_ms: e.wall_ms,
clock: e.clock,
prev_hash: e.hash_chain.prev_hash,
event_hash: e.hash_chain.event_hash,
segment_id: e.disk_pos.segment_id,
offset: e.disk_pos.offset,
length: e.disk_pos.length,
global_sequence: e.global_sequence,
})
.collect();
entries.sort_by_key(|e| e.global_sequence);
let mut interner_strings = vec![String::new()]; interner_strings.extend(index.interner.to_snapshot());
tracing::debug!(
"checkpoint: {} entries, {} interned strings",
entries.len(),
index.interner.len()
);
let data = CheckpointData {
global_sequence: index.global_sequence(),
watermark_segment_id,
watermark_offset,
interner_strings,
entries,
};
let body =
rmp_serde::to_vec_named(&data).map_err(|e| StoreError::Serialization(Box::new(e)))?;
let crc: u32 = crc32fast::hash(&body);
let final_path = data_dir.join(CHECKPOINT_FILENAME);
reject_symlink_leaf(&final_path)?;
{
let tmp = NamedTempFile::new_in(data_dir)?;
{
let file = tmp.as_file();
let mut w = BufWriter::new(file);
w.write_all(CHECKPOINT_MAGIC)?;
w.write_all(&CHECKPOINT_VERSION.to_le_bytes())?;
w.write_all(&crc.to_le_bytes())?;
w.write_all(&body)?;
w.flush()?;
}
tmp.as_file().sync_all()?;
tmp.persist(&final_path)
.map_err(|e| StoreError::Io(e.error))?;
}
tracing::debug!(
target: "batpak::checkpoint",
entries = data.global_sequence,
watermark_segment_id,
watermark_offset,
body_bytes = body.len(),
"checkpoint written"
);
Ok(())
}
fn reject_symlink_leaf(path: &Path) -> Result<(), StoreError> {
match std::fs::symlink_metadata(path) {
Ok(meta) if meta.file_type().is_symlink() => Err(StoreError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!(
"refusing to write checkpoint through symlink {}",
path.display()
),
))),
Ok(_) | Err(_) => Ok(()),
}
}
pub(crate) fn try_load_checkpoint(
data_dir: &Path,
) -> Option<(Vec<CheckpointEntry>, Vec<String>, WatermarkInfo, u64)> {
let path = data_dir.join(CHECKPOINT_FILENAME);
let raw = match std::fs::read(&path) {
Ok(b) => b,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return None;
}
Err(e) => {
tracing::warn!(
target: "batpak::checkpoint",
path = %path.display(),
error = %e,
"failed to read checkpoint file"
);
return None;
}
};
const HEADER_LEN: usize = 6 + 2 + 4;
if raw.len() < HEADER_LEN {
tracing::warn!(
target: "batpak::checkpoint",
path = %path.display(),
len = raw.len(),
"checkpoint file too short to contain a valid header"
);
return None;
}
if &raw[..6] != CHECKPOINT_MAGIC.as_ref() {
tracing::warn!(
target: "batpak::checkpoint",
path = %path.display(),
"checkpoint file has wrong magic bytes — ignoring"
);
return None;
}
let version = u16::from_le_bytes([raw[6], raw[7]]);
if version != CHECKPOINT_VERSION {
tracing::warn!(
target: "batpak::checkpoint",
path = %path.display(),
version,
expected = CHECKPOINT_VERSION,
"unsupported checkpoint version — ignoring"
);
return None;
}
let stored_crc = u32::from_le_bytes([raw[8], raw[9], raw[10], raw[11]]);
let body = &raw[HEADER_LEN..];
let computed_crc = crc32fast::hash(body);
if stored_crc != computed_crc {
tracing::warn!(
target: "batpak::checkpoint",
path = %path.display(),
stored = stored_crc,
computed = computed_crc,
"checkpoint CRC mismatch — file is corrupt, ignoring"
);
return None;
}
let data: CheckpointData = match rmp_serde::from_slice(body) {
Ok(d) => d,
Err(e) => {
tracing::warn!(
target: "batpak::checkpoint",
path = %path.display(),
error = %e,
"checkpoint deserialisation failed — ignoring"
);
return None;
}
};
let seg_filename = format!(
"{:06}.{}",
data.watermark_segment_id,
crate::store::segment::SEGMENT_EXTENSION
);
let seg_path = data_dir.join(&seg_filename);
if !seg_path.exists() {
tracing::warn!(
target: "batpak::checkpoint",
path = %path.display(),
missing_segment = %seg_path.display(),
"watermark segment referenced by checkpoint is missing — ignoring checkpoint"
);
return None;
}
let watermark = WatermarkInfo {
watermark_segment_id: data.watermark_segment_id,
watermark_offset: data.watermark_offset,
};
tracing::debug!(
target: "batpak::checkpoint",
entries = data.entries.len(),
global_sequence = data.global_sequence,
watermark_segment_id = data.watermark_segment_id,
watermark_offset = data.watermark_offset,
"checkpoint loaded successfully"
);
Some((
data.entries,
data.interner_strings,
watermark,
data.global_sequence,
))
}
pub(crate) fn restore_from_checkpoint(
index: &StoreIndex,
entries: Vec<CheckpointEntry>,
interner_strings: &[String],
stored_allocator: u64,
) -> Result<(), StoreError> {
let mut cursor = index.begin_replay();
for ce in entries {
let entity_str = interner_strings
.get(ce.entity_id as usize)
.ok_or_else(|| StoreError::ser_msg("checkpoint entity_id out of interner range"))?;
let scope_str = interner_strings
.get(ce.scope_id as usize)
.ok_or_else(|| StoreError::ser_msg("checkpoint scope_id out of interner range"))?;
let coord = Coordinate::new(entity_str, scope_str)?;
let entity_id = index.interner.intern(entity_str);
let scope_id = index.interner.intern(scope_str);
let entry = IndexEntry {
event_id: ce.event_id,
correlation_id: ce.correlation_id,
causation_id: ce.causation_id,
coord,
entity_id,
scope_id,
kind: ce.kind,
wall_ms: ce.wall_ms,
clock: ce.clock,
hash_chain: HashChain {
prev_hash: ce.prev_hash,
event_hash: ce.event_hash,
},
disk_pos: DiskPos {
segment_id: ce.segment_id,
offset: ce.offset,
length: ce.length,
},
global_sequence: ce.global_sequence,
};
cursor.insert(entry);
}
cursor.commit(stored_allocator);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::index::StoreIndex;
use tempfile::TempDir;
fn make_index(n: u64) -> StoreIndex {
let idx = StoreIndex::new();
for i in 0..n {
let coord =
Coordinate::new(format!("entity:{i}"), "test-scope").expect("valid coordinate");
let entity_id = idx.interner.intern(coord.entity());
let scope_id = idx.interner.intern(coord.scope());
let entry = IndexEntry {
event_id: (i + 1) as u128,
correlation_id: (i + 1) as u128,
causation_id: if i == 0 { None } else { Some(i as u128) },
coord,
entity_id,
scope_id,
kind: EventKind::custom(0x1, (i & 0x0FFF) as u16),
wall_ms: 1_700_000_000_000 + i * 1000,
clock: u32::try_from(i).expect("i fits u32"),
hash_chain: HashChain::default(),
disk_pos: DiskPos {
segment_id: 0,
offset: i * 256,
length: 256,
},
global_sequence: i,
};
idx.insert(entry);
}
idx.publish(idx.global_sequence());
idx
}
fn touch_segment(dir: &Path, segment_id: u64) {
let name = format!("{segment_id:06}.fbat");
std::fs::write(dir.join(name), b"dummy").expect("write dummy segment");
}
#[test]
fn round_trip_empty_index() {
let tmp = TempDir::new().expect("tempdir");
let dir = tmp.path();
touch_segment(dir, 0);
let idx = StoreIndex::new();
write_checkpoint(&idx, dir, 0, 0).expect("write");
let result = try_load_checkpoint(dir);
assert!(result.is_some(), "checkpoint should load");
let (entries, _strings, wm, _alloc) = result.expect("some");
assert_eq!(entries.len(), 0);
assert_eq!(wm.watermark_segment_id, 0);
assert_eq!(wm.watermark_offset, 0);
}
#[test]
fn round_trip_with_entries() {
let tmp = TempDir::new().expect("tempdir");
let dir = tmp.path();
touch_segment(dir, 0);
let idx = make_index(16);
write_checkpoint(&idx, dir, 0, 4096).expect("write");
let (entries, _strings, wm, _alloc) = try_load_checkpoint(dir).expect("should load");
assert_eq!(entries.len(), 16);
assert_eq!(wm.watermark_offset, 4096);
let seqs: Vec<u64> = entries.iter().map(|e| e.global_sequence).collect();
let mut sorted = seqs.clone();
sorted.sort_unstable();
assert_eq!(seqs, sorted, "entries must be sorted by global_sequence");
}
#[test]
fn restore_rebuilds_index() {
let tmp = TempDir::new().expect("tempdir");
let dir = tmp.path();
touch_segment(dir, 0);
let src = make_index(8);
write_checkpoint(&src, dir, 0, 0).expect("write");
let (entries, interner_strings, _wm, stored_alloc) =
try_load_checkpoint(dir).expect("should load");
let dst = StoreIndex::new();
restore_from_checkpoint(&dst, entries, &interner_strings, stored_alloc).expect("restore");
assert_eq!(dst.len(), 8);
}
#[test]
fn missing_file_returns_none() {
let tmp = TempDir::new().expect("tempdir");
assert!(
try_load_checkpoint(tmp.path()).is_none(),
"missing file should return None"
);
}
#[test]
fn bad_magic_returns_none() {
let tmp = TempDir::new().expect("tempdir");
let path = tmp.path().join(CHECKPOINT_FILENAME);
std::fs::write(&path, b"BADMAGIC\x00\x00\x00\x00").expect("write");
assert!(
try_load_checkpoint(tmp.path()).is_none(),
"bad magic should return None"
);
}
#[test]
fn crc_mismatch_returns_none() {
let tmp = TempDir::new().expect("tempdir");
let dir = tmp.path();
touch_segment(dir, 0);
let idx = make_index(4);
write_checkpoint(&idx, dir, 0, 0).expect("write");
let path = dir.join(CHECKPOINT_FILENAME);
let mut raw = std::fs::read(&path).expect("read");
let last = raw.len() - 1;
raw[last] ^= 0xFF;
std::fs::write(&path, &raw).expect("rewrite");
assert!(
try_load_checkpoint(dir).is_none(),
"CRC mismatch should return None"
);
}
#[test]
fn missing_watermark_segment_returns_none() {
let tmp = TempDir::new().expect("tempdir");
let dir = tmp.path();
touch_segment(dir, 0);
let idx = make_index(2);
write_checkpoint(&idx, dir, 99, 0).expect("write");
assert!(
try_load_checkpoint(dir).is_none(),
"missing watermark segment should return None"
);
}
#[test]
fn wrong_version_returns_none() {
let tmp = TempDir::new().expect("tempdir");
let dir = tmp.path();
touch_segment(dir, 0);
let idx = StoreIndex::new();
write_checkpoint(&idx, dir, 0, 0).expect("write");
let path = dir.join(CHECKPOINT_FILENAME);
let mut raw = std::fs::read(&path).expect("read");
raw[6] = 99;
raw[7] = 0;
let body_crc = crc32fast::hash(&raw[12..]);
let crc_bytes = body_crc.to_le_bytes();
raw[8] = crc_bytes[0];
raw[9] = crc_bytes[1];
raw[10] = crc_bytes[2];
raw[11] = crc_bytes[3];
std::fs::write(&path, &raw).expect("rewrite");
assert!(
try_load_checkpoint(dir).is_none(),
"wrong version should return None"
);
}
#[test]
fn restore_advances_global_sequence() {
let tmp = TempDir::new().expect("tempdir");
let dir = tmp.path();
touch_segment(dir, 0);
let src = make_index(16);
write_checkpoint(&src, dir, 0, 0).expect("write");
let (entries, interner_strings, _wm, stored_alloc) =
try_load_checkpoint(dir).expect("should load");
assert_eq!(entries.len(), 16);
let dst = StoreIndex::new();
restore_from_checkpoint(&dst, entries, &interner_strings, stored_alloc).expect("restore");
assert_eq!(
dst.global_sequence(),
16,
"PROPERTY: global_sequence after restore must equal the number of restored entries."
);
assert_eq!(
dst.visible_sequence(),
16,
"PROPERTY: visible_sequence after restore must equal global_sequence."
);
}
}