use crate::coordinate::Coordinate;
use crate::event::{EventKind, HashChain};
use crate::store::index::{
recommended_restore_chunk_count, DiskPos, IndexEntry, RoutingSummary, StoreIndex,
};
use crate::store::StoreError;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::io::{BufWriter, Write};
use std::path::Path;
use tempfile::NamedTempFile;
pub(crate) const CHECKPOINT_MAGIC: &[u8; 6] = b"FBATCK";
pub(crate) const CHECKPOINT_VERSION: u16 = 3;
pub(crate) const CHECKPOINT_FILENAME: &str = "index.ckpt";
#[derive(Serialize, Deserialize)]
struct CheckpointDataV2 {
global_sequence: u64,
watermark_segment_id: u64,
watermark_offset: u64,
interner_strings: Vec<String>,
entries: Vec<CheckpointEntry>,
}
#[derive(Serialize, Deserialize)]
struct CheckpointDataV3 {
global_sequence: u64,
watermark_segment_id: u64,
watermark_offset: u64,
interner_strings: Vec<String>,
routing: RoutingSummary,
entries: Vec<CheckpointEntry>,
}
#[derive(Serialize, Deserialize)]
pub(crate) struct CheckpointEntry {
#[serde(with = "crate::wire::u128_bytes")]
pub event_id: u128,
#[serde(with = "crate::wire::u128_bytes")]
pub correlation_id: u128,
#[serde(with = "crate::wire::option_u128_bytes")]
pub causation_id: Option<u128>,
pub entity_id: u32,
pub scope_id: u32,
pub kind: EventKind,
pub wall_ms: u64,
pub clock: u32,
pub prev_hash: [u8; 32],
pub event_hash: [u8; 32],
pub segment_id: u64,
pub offset: u64,
pub length: u32,
pub global_sequence: u64,
}
pub(crate) struct WatermarkInfo {
pub watermark_segment_id: u64,
pub watermark_offset: u64,
}
pub(crate) struct LoadedCheckpointData {
pub(crate) entries: Vec<CheckpointEntry>,
pub(crate) interner_strings: Vec<String>,
pub(crate) watermark: WatermarkInfo,
pub(crate) stored_allocator: u64,
pub(crate) routing: RoutingSummary,
}
pub(crate) struct LoadedCheckpointSnapshot {
pub(crate) entries: Vec<IndexEntry>,
pub(crate) interner_strings: Vec<String>,
pub(crate) watermark: WatermarkInfo,
pub(crate) stored_allocator: u64,
pub(crate) routing: RoutingSummary,
}
fn checkpoint_entries_to_index_entries(
entries: &[CheckpointEntry],
interner_strings: &[String],
) -> Result<Vec<IndexEntry>, StoreError> {
entries
.iter()
.map(|ce| {
let entity_str = interner_strings
.get(ce.entity_id as usize)
.ok_or_else(|| StoreError::ser_msg("checkpoint entity_id out of interner range"))?;
let scope_str = interner_strings
.get(ce.scope_id as usize)
.ok_or_else(|| StoreError::ser_msg("checkpoint scope_id out of interner range"))?;
let coord = Coordinate::new(entity_str, scope_str)?;
Ok(IndexEntry {
event_id: ce.event_id,
correlation_id: ce.correlation_id,
causation_id: ce.causation_id,
coord,
entity_id: crate::store::interner::InternId(ce.entity_id),
scope_id: crate::store::interner::InternId(ce.scope_id),
kind: ce.kind,
wall_ms: ce.wall_ms,
clock: ce.clock,
hash_chain: HashChain {
prev_hash: ce.prev_hash,
event_hash: ce.event_hash,
},
disk_pos: DiskPos {
segment_id: ce.segment_id,
offset: ce.offset,
length: ce.length,
},
global_sequence: ce.global_sequence,
})
})
.collect()
}
pub(crate) fn write_checkpoint(
index: &StoreIndex,
data_dir: &Path,
watermark_segment_id: u64,
watermark_offset: u64,
) -> Result<(), StoreError> {
let mut entries: Vec<CheckpointEntry> = index
.all_entries()
.into_iter()
.map(|e| CheckpointEntry {
event_id: e.event_id,
correlation_id: e.correlation_id,
causation_id: e.causation_id,
entity_id: e.entity_id.as_u32(),
scope_id: e.scope_id.as_u32(),
kind: e.kind,
wall_ms: e.wall_ms,
clock: e.clock,
prev_hash: e.hash_chain.prev_hash,
event_hash: e.hash_chain.event_hash,
segment_id: e.disk_pos.segment_id,
offset: e.disk_pos.offset,
length: e.disk_pos.length,
global_sequence: e.global_sequence,
})
.collect();
entries.sort_by_key(|e| e.global_sequence);
let mut interner_strings = vec![String::new()]; interner_strings.extend(index.interner.to_snapshot());
tracing::debug!(
"checkpoint: {} entries, {} interned strings",
entries.len(),
index.interner.len()
);
let routing = RoutingSummary::from_sorted_entries(
&checkpoint_entries_to_index_entries(&entries, &interner_strings)?,
recommended_restore_chunk_count(entries.len()),
);
let data = CheckpointDataV3 {
global_sequence: index.global_sequence(),
watermark_segment_id,
watermark_offset,
interner_strings,
routing,
entries,
};
let body =
rmp_serde::to_vec_named(&data).map_err(|e| StoreError::Serialization(Box::new(e)))?;
let crc: u32 = crc32fast::hash(&body);
let final_path = data_dir.join(CHECKPOINT_FILENAME);
reject_symlink_leaf(&final_path)?;
{
let tmp = NamedTempFile::new_in(data_dir)?;
{
let file = tmp.as_file();
let mut w = BufWriter::new(file);
w.write_all(CHECKPOINT_MAGIC)?;
w.write_all(&CHECKPOINT_VERSION.to_le_bytes())?;
w.write_all(&crc.to_le_bytes())?;
w.write_all(&body)?;
w.flush()?;
}
tmp.as_file().sync_all()?;
tmp.persist(&final_path)
.map_err(|e| StoreError::Io(e.error))?;
}
tracing::debug!(
target: "batpak::checkpoint",
entries = data.global_sequence,
watermark_segment_id,
watermark_offset,
body_bytes = body.len(),
"checkpoint written"
);
Ok(())
}
fn reject_symlink_leaf(path: &Path) -> Result<(), StoreError> {
match std::fs::symlink_metadata(path) {
Ok(meta) if meta.file_type().is_symlink() => Err(StoreError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!(
"refusing to write checkpoint through symlink {}",
path.display()
),
))),
Ok(_) | Err(_) => Ok(()),
}
}
pub(crate) fn try_load_checkpoint(data_dir: &Path) -> Option<LoadedCheckpointData> {
let path = data_dir.join(CHECKPOINT_FILENAME);
let raw = match std::fs::read(&path) {
Ok(b) => b,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return None;
}
Err(e) => {
tracing::warn!(
target: "batpak::checkpoint",
path = %path.display(),
error = %e,
"failed to read checkpoint file"
);
return None;
}
};
const HEADER_LEN: usize = 6 + 2 + 4;
if raw.len() < HEADER_LEN {
tracing::warn!(
target: "batpak::checkpoint",
path = %path.display(),
len = raw.len(),
"checkpoint file too short to contain a valid header"
);
return None;
}
if &raw[..6] != CHECKPOINT_MAGIC.as_ref() {
tracing::warn!(
target: "batpak::checkpoint",
path = %path.display(),
"checkpoint file has wrong magic bytes — ignoring"
);
return None;
}
let version = u16::from_le_bytes([raw[6], raw[7]]);
let stored_crc = u32::from_le_bytes([raw[8], raw[9], raw[10], raw[11]]);
let body = &raw[HEADER_LEN..];
let computed_crc = crc32fast::hash(body);
if stored_crc != computed_crc {
tracing::warn!(
target: "batpak::checkpoint",
path = %path.display(),
stored = stored_crc,
computed = computed_crc,
"checkpoint CRC mismatch — file is corrupt, ignoring"
);
return None;
}
let (
entries,
interner_strings,
watermark_segment_id,
watermark_offset,
global_sequence,
routing,
) = match version {
2 => {
let data: CheckpointDataV2 = match rmp_serde::from_slice(body) {
Ok(d) => d,
Err(e) => {
tracing::warn!(
target: "batpak::checkpoint",
path = %path.display(),
error = %e,
"checkpoint deserialisation failed — ignoring"
);
return None;
}
};
let routing = RoutingSummary::from_sorted_entries(
&checkpoint_entries_to_index_entries(&data.entries, &data.interner_strings).ok()?,
recommended_restore_chunk_count(data.entries.len()),
);
(
data.entries,
data.interner_strings,
data.watermark_segment_id,
data.watermark_offset,
data.global_sequence,
routing,
)
}
3 => {
let data: CheckpointDataV3 = match rmp_serde::from_slice(body) {
Ok(d) => d,
Err(e) => {
tracing::warn!(
target: "batpak::checkpoint",
path = %path.display(),
error = %e,
"checkpoint deserialisation failed — ignoring"
);
return None;
}
};
(
data.entries,
data.interner_strings,
data.watermark_segment_id,
data.watermark_offset,
data.global_sequence,
data.routing,
)
}
_ => {
tracing::warn!(
target: "batpak::checkpoint",
path = %path.display(),
version,
expected = CHECKPOINT_VERSION,
"unsupported checkpoint version — ignoring"
);
return None;
}
};
let seg_filename = format!(
"{:06}.{}",
watermark_segment_id,
crate::store::segment::SEGMENT_EXTENSION
);
let seg_path = data_dir.join(&seg_filename);
if !seg_path.exists() {
tracing::warn!(
target: "batpak::checkpoint",
path = %path.display(),
missing_segment = %seg_path.display(),
"watermark segment referenced by checkpoint is missing — ignoring checkpoint"
);
return None;
}
let watermark = WatermarkInfo {
watermark_segment_id,
watermark_offset,
};
tracing::debug!(
target: "batpak::checkpoint",
entries = entries.len(),
global_sequence,
watermark_segment_id,
watermark_offset,
"checkpoint loaded successfully"
);
Some(LoadedCheckpointData {
entries,
interner_strings,
watermark,
stored_allocator: global_sequence,
routing,
})
}
#[cfg(test)]
pub(crate) fn restore_from_checkpoint(
index: &StoreIndex,
entries: Vec<CheckpointEntry>,
interner_strings: &[String],
stored_allocator: u64,
) -> Result<(), StoreError> {
index.clear();
index.interner.replace_from_full_snapshot(interner_strings);
let mut rebuilt_entries = Vec::with_capacity(entries.len());
for ce in entries {
let entity_str = interner_strings
.get(ce.entity_id as usize)
.ok_or_else(|| StoreError::ser_msg("checkpoint entity_id out of interner range"))?;
let scope_str = interner_strings
.get(ce.scope_id as usize)
.ok_or_else(|| StoreError::ser_msg("checkpoint scope_id out of interner range"))?;
let coord = Coordinate::new(entity_str, scope_str)?;
rebuilt_entries.push(IndexEntry {
event_id: ce.event_id,
correlation_id: ce.correlation_id,
causation_id: ce.causation_id,
coord,
entity_id: crate::store::interner::InternId(ce.entity_id),
scope_id: crate::store::interner::InternId(ce.scope_id),
kind: ce.kind,
wall_ms: ce.wall_ms,
clock: ce.clock,
hash_chain: HashChain {
prev_hash: ce.prev_hash,
event_hash: ce.event_hash,
},
disk_pos: DiskPos {
segment_id: ce.segment_id,
offset: ce.offset,
length: ce.length,
},
global_sequence: ce.global_sequence,
});
}
index.restore_sorted_entries(rebuilt_entries, stored_allocator);
Ok(())
}
pub(crate) fn try_load_checkpoint_snapshot(data_dir: &Path) -> Option<LoadedCheckpointSnapshot> {
let loaded = try_load_checkpoint(data_dir)?;
let chunk_ranges = if loaded.routing.chunks.is_empty() {
vec![(0usize, loaded.entries.len())]
} else {
loaded
.routing
.chunks
.iter()
.map(|chunk| {
let start = usize::try_from(chunk.start).ok()?;
let len = usize::try_from(chunk.len).ok()?;
Some((start, len))
})
.collect::<Option<Vec<_>>>()?
};
let mut per_chunk = chunk_ranges
.into_par_iter()
.enumerate()
.map(|(chunk_idx, (start, len))| {
let end = start + len;
let rebuilt = checkpoint_entries_to_index_entries(
&loaded.entries[start..end],
&loaded.interner_strings,
)?;
Ok::<_, StoreError>((chunk_idx, rebuilt))
})
.collect::<Result<Vec<_>, _>>()
.ok()?;
per_chunk.sort_by_key(|(chunk_idx, _)| *chunk_idx);
let mut rebuilt_entries = Vec::with_capacity(loaded.entries.len());
for (_, chunk_entries) in per_chunk {
rebuilt_entries.extend(chunk_entries);
}
Some(LoadedCheckpointSnapshot {
entries: rebuilt_entries,
interner_strings: loaded.interner_strings,
watermark: loaded.watermark,
stored_allocator: loaded.stored_allocator,
routing: loaded.routing,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::index::StoreIndex;
use tempfile::TempDir;
fn make_index(n: u64) -> StoreIndex {
let idx = StoreIndex::new();
for i in 0..n {
let coord =
Coordinate::new(format!("entity:{i}"), "test-scope").expect("valid coordinate");
let entity_id = idx.interner.intern(coord.entity());
let scope_id = idx.interner.intern(coord.scope());
let entry = IndexEntry {
event_id: (i + 1) as u128,
correlation_id: (i + 1) as u128,
causation_id: if i == 0 { None } else { Some(i as u128) },
coord,
entity_id,
scope_id,
kind: EventKind::custom(0x1, (i & 0x0FFF) as u16),
wall_ms: 1_700_000_000_000 + i * 1000,
clock: u32::try_from(i).expect("i fits u32"),
hash_chain: HashChain::default(),
disk_pos: DiskPos {
segment_id: 0,
offset: i * 256,
length: 256,
},
global_sequence: i,
};
idx.insert(entry);
}
idx.publish(idx.global_sequence());
idx
}
fn touch_segment(dir: &Path, segment_id: u64) {
let name = format!("{segment_id:06}.fbat");
std::fs::write(dir.join(name), b"dummy").expect("write dummy segment");
}
#[test]
fn round_trip_empty_index() {
let tmp = TempDir::new().expect("tempdir");
let dir = tmp.path();
touch_segment(dir, 0);
let idx = StoreIndex::new();
write_checkpoint(&idx, dir, 0, 0).expect("write");
let result = try_load_checkpoint(dir);
assert!(result.is_some(), "checkpoint should load");
let loaded = result.expect("some");
let entries = loaded.entries;
let wm = loaded.watermark;
assert_eq!(entries.len(), 0);
assert_eq!(wm.watermark_segment_id, 0);
assert_eq!(wm.watermark_offset, 0);
}
#[test]
fn round_trip_with_entries() {
let tmp = TempDir::new().expect("tempdir");
let dir = tmp.path();
touch_segment(dir, 0);
let idx = make_index(16);
write_checkpoint(&idx, dir, 0, 4096).expect("write");
let loaded = try_load_checkpoint(dir).expect("should load");
let routing = loaded.routing.clone();
let entries = loaded.entries;
let wm = loaded.watermark;
assert_eq!(entries.len(), 16);
assert_eq!(wm.watermark_offset, 4096);
let seqs: Vec<u64> = entries.iter().map(|e| e.global_sequence).collect();
let mut sorted = seqs.clone();
sorted.sort_unstable();
assert_eq!(seqs, sorted, "entries must be sorted by global_sequence");
assert_eq!(routing.entry_count, 16);
assert!(
!routing.entity_runs.is_empty(),
"v3 checkpoints must persist entity-run summaries"
);
assert!(
!routing.chunks.is_empty(),
"v3 checkpoints must persist chunk summaries"
);
}
#[test]
fn restore_rebuilds_index() {
let tmp = TempDir::new().expect("tempdir");
let dir = tmp.path();
touch_segment(dir, 0);
let src = make_index(8);
write_checkpoint(&src, dir, 0, 0).expect("write");
let loaded = try_load_checkpoint(dir).expect("should load");
let entries = loaded.entries;
let interner_strings = loaded.interner_strings;
let stored_alloc = loaded.stored_allocator;
let dst = StoreIndex::new();
restore_from_checkpoint(&dst, entries, &interner_strings, stored_alloc).expect("restore");
assert_eq!(dst.len(), 8);
}
#[test]
fn missing_file_returns_none() {
let tmp = TempDir::new().expect("tempdir");
assert!(
try_load_checkpoint(tmp.path()).is_none(),
"missing file should return None"
);
}
#[test]
fn bad_magic_returns_none() {
let tmp = TempDir::new().expect("tempdir");
let path = tmp.path().join(CHECKPOINT_FILENAME);
std::fs::write(&path, b"BADMAGIC\x00\x00\x00\x00").expect("write");
assert!(
try_load_checkpoint(tmp.path()).is_none(),
"bad magic should return None"
);
}
#[test]
fn crc_mismatch_returns_none() {
let tmp = TempDir::new().expect("tempdir");
let dir = tmp.path();
touch_segment(dir, 0);
let idx = make_index(4);
write_checkpoint(&idx, dir, 0, 0).expect("write");
let path = dir.join(CHECKPOINT_FILENAME);
let mut raw = std::fs::read(&path).expect("read");
let last = raw.len() - 1;
raw[last] ^= 0xFF;
std::fs::write(&path, &raw).expect("rewrite");
assert!(
try_load_checkpoint(dir).is_none(),
"CRC mismatch should return None"
);
}
#[test]
fn missing_watermark_segment_returns_none() {
let tmp = TempDir::new().expect("tempdir");
let dir = tmp.path();
touch_segment(dir, 0);
let idx = make_index(2);
write_checkpoint(&idx, dir, 99, 0).expect("write");
assert!(
try_load_checkpoint(dir).is_none(),
"missing watermark segment should return None"
);
}
#[test]
fn wrong_version_returns_none() {
let tmp = TempDir::new().expect("tempdir");
let dir = tmp.path();
touch_segment(dir, 0);
let idx = StoreIndex::new();
write_checkpoint(&idx, dir, 0, 0).expect("write");
let path = dir.join(CHECKPOINT_FILENAME);
let mut raw = std::fs::read(&path).expect("read");
raw[6] = 99;
raw[7] = 0;
let body_crc = crc32fast::hash(&raw[12..]);
let crc_bytes = body_crc.to_le_bytes();
raw[8] = crc_bytes[0];
raw[9] = crc_bytes[1];
raw[10] = crc_bytes[2];
raw[11] = crc_bytes[3];
std::fs::write(&path, &raw).expect("rewrite");
assert!(
try_load_checkpoint(dir).is_none(),
"wrong version should return None"
);
}
#[test]
fn v2_checkpoint_fallback_is_still_readable() {
let tmp = TempDir::new().expect("tempdir");
let dir = tmp.path();
touch_segment(dir, 0);
let idx = make_index(6);
let mut entries: Vec<CheckpointEntry> = idx
.all_entries()
.into_iter()
.map(|e| CheckpointEntry {
event_id: e.event_id,
correlation_id: e.correlation_id,
causation_id: e.causation_id,
entity_id: e.entity_id.as_u32(),
scope_id: e.scope_id.as_u32(),
kind: e.kind,
wall_ms: e.wall_ms,
clock: e.clock,
prev_hash: e.hash_chain.prev_hash,
event_hash: e.hash_chain.event_hash,
segment_id: e.disk_pos.segment_id,
offset: e.disk_pos.offset,
length: e.disk_pos.length,
global_sequence: e.global_sequence,
})
.collect();
entries.sort_by_key(|entry| entry.global_sequence);
let mut interner_strings = vec![String::new()];
interner_strings.extend(idx.interner.to_snapshot());
let body = rmp_serde::to_vec_named(&CheckpointDataV2 {
global_sequence: idx.global_sequence(),
watermark_segment_id: 0,
watermark_offset: 0,
interner_strings,
entries,
})
.expect("serialize v2 checkpoint");
let crc = crc32fast::hash(&body);
let path = dir.join(CHECKPOINT_FILENAME);
let mut bytes = Vec::new();
bytes.extend_from_slice(CHECKPOINT_MAGIC);
bytes.extend_from_slice(&2u16.to_le_bytes());
bytes.extend_from_slice(&crc.to_le_bytes());
bytes.extend_from_slice(&body);
std::fs::write(&path, bytes).expect("write v2 checkpoint");
let loaded = try_load_checkpoint(dir).expect("load v2 checkpoint");
assert_eq!(loaded.entries.len(), 6);
assert_eq!(loaded.routing.entry_count, 6);
assert!(
!loaded.routing.chunks.is_empty(),
"v2 fallback should synthesize chunk summaries on load"
);
}
#[test]
fn restore_advances_global_sequence() {
let tmp = TempDir::new().expect("tempdir");
let dir = tmp.path();
touch_segment(dir, 0);
let src = make_index(16);
write_checkpoint(&src, dir, 0, 0).expect("write");
let loaded = try_load_checkpoint(dir).expect("should load");
let entries = loaded.entries;
let interner_strings = loaded.interner_strings;
let stored_alloc = loaded.stored_allocator;
assert_eq!(entries.len(), 16);
let dst = StoreIndex::new();
restore_from_checkpoint(&dst, entries, &interner_strings, stored_alloc).expect("restore");
assert_eq!(
dst.global_sequence(),
16,
"PROPERTY: global_sequence after restore must equal the number of restored entries."
);
assert_eq!(
dst.visible_sequence(),
16,
"PROPERTY: visible_sequence after restore must equal global_sequence."
);
}
}