use super::segment::{skim, Segment, SegmentReader};
use crate::error::Result;
use crate::merkle::{leaf_hash, Hash};
use crate::merkle_log::{ConsistencyProof, InclusionProof, MerkleLog};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::path::{Path, PathBuf};
const SEGMENT_PREFIX: &str = "seg-";
const SEGMENT_SUFFIX: &str = ".log";
const META_SUFFIX: &str = ".meta";
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
struct SegmentMeta {
min_timestamp: u64,
max_timestamp: u64,
records: u64,
#[serde(default)]
base_index: u64,
}
#[derive(Debug, Clone)]
struct SealedSeg {
path: PathBuf,
meta: SegmentMeta,
}
pub struct Table<T> {
dir: PathBuf,
active: Segment,
active_seq: u64,
sealed: BTreeMap<u64, SealedSeg>,
spine: MerkleLog,
max_segment_bytes: u64,
_marker: PhantomData<T>,
}
impl<T: Serialize + DeserializeOwned> Table<T> {
pub fn open(dir: &Path, max_segment_bytes: u64) -> Result<Self> {
std::fs::create_dir_all(dir)?;
let mut seqs: Vec<u64> = Vec::new();
for entry in std::fs::read_dir(dir)? {
let name = entry?.file_name();
let name = name.to_string_lossy();
if let Some(num) = name
.strip_prefix(SEGMENT_PREFIX)
.and_then(|s| s.strip_suffix(SEGMENT_SUFFIX))
.and_then(|s| s.parse::<u64>().ok())
{
seqs.push(num);
}
}
seqs.sort_unstable();
let active_seq = seqs.last().copied().unwrap_or(0);
let mut sealed = BTreeMap::new();
for &seq in seqs.iter().filter(|&&s| s != active_seq) {
let path = segment_path(dir, seq);
let meta = match read_meta(&meta_path(dir, seq)) {
Some(m) => m,
None => {
let m = skim(&path)?
.map(|s| SegmentMeta {
min_timestamp: s.min_timestamp,
max_timestamp: s.max_timestamp,
records: s.records,
base_index: s.base_index,
})
.unwrap_or(SegmentMeta {
min_timestamp: u64::MAX,
max_timestamp: 0,
records: 0,
base_index: 0,
});
write_meta(&meta_path(dir, seq), &m);
m
}
};
sealed.insert(seq, SealedSeg { path, meta });
}
let active_base = sealed
.values()
.map(|s| s.meta.base_index + s.meta.records)
.max()
.unwrap_or(0);
let active = Segment::open(&segment_path(dir, active_seq), active_base)?;
let spine = MerkleLog::open(dir)?;
let mut table = Self {
dir: dir.to_path_buf(),
active,
active_seq,
sealed,
spine,
max_segment_bytes,
_marker: PhantomData,
};
table.reconcile_spine()?;
Ok(table)
}
fn reconcile_spine(&mut self) -> Result<()> {
let expected_total = self.active.base_index() + self.active.records();
let spine_size = self.spine.size();
if spine_size > expected_total {
self.spine.truncate(expected_total)?;
self.spine.sync()?;
return Ok(());
}
if spine_size == expected_total {
return Ok(());
}
let mut segments: Vec<(PathBuf, u64, u64)> = self
.sealed
.values()
.map(|s| (s.path.clone(), s.meta.base_index, s.meta.records))
.collect();
segments.push((
self.active.path().to_path_buf(),
self.active.base_index(),
self.active.records(),
));
segments.sort_by_key(|(_, base, _)| *base);
for (path, base, records) in segments {
if base + records <= spine_size {
continue; }
let mut reader = SegmentReader::open(&path)?;
let mut idx = 0u64;
while let Some((_, payload)) = reader.next_record()? {
if base + idx >= spine_size {
self.spine.append(leaf_hash(&payload))?;
}
idx += 1;
}
}
self.spine.sync()?;
Ok(())
}
pub fn append(&mut self, row: &T, timestamp: u64) -> Result<()> {
let payload = bincode::serialize(row)?;
if !self.active.is_empty()
&& self.active.len() + payload.len() as u64 > self.max_segment_bytes
{
self.roll()?;
}
self.active.append(&payload, timestamp)?;
self.spine.append(leaf_hash(&payload))?;
Ok(())
}
fn roll(&mut self) -> Result<()> {
self.active.sync()?;
let meta = SegmentMeta {
min_timestamp: self.active.min_timestamp,
max_timestamp: self.active.max_timestamp,
records: self.active.records(),
base_index: self.active.base_index(),
};
write_meta(&meta_path(&self.dir, self.active_seq), &meta);
self.sealed.insert(
self.active_seq,
SealedSeg {
path: self.active.path().to_path_buf(),
meta,
},
);
self.active_seq += 1;
let next_base = meta.base_index + meta.records;
self.active = Segment::open(&segment_path(&self.dir, self.active_seq), next_base)?;
Ok(())
}
pub fn flush(&mut self) -> Result<()> {
self.active.flush()?;
self.spine.flush()
}
pub fn sync(&mut self) -> Result<()> {
self.active.sync()?;
self.spine.sync()
}
pub fn scan(&mut self) -> Result<TableScan<T>> {
Ok(TableScan::over(self.slices()?))
}
pub fn slices(&mut self) -> Result<Vec<SegmentSlice>> {
self.active.flush()?;
let mut slices: Vec<SegmentSlice> = self
.sealed
.iter()
.map(|(&seq, s)| SegmentSlice {
path: s.path.clone(),
bound: u64::MAX,
seq,
min_ts: s.meta.min_timestamp,
max_ts: s.meta.max_timestamp,
})
.collect();
slices.push(SegmentSlice {
path: self.active.path().to_path_buf(),
bound: self.active.len(),
seq: self.active_seq,
min_ts: self.active.min_timestamp,
max_ts: self.active.max_timestamp,
});
Ok(slices)
}
pub fn verify(&mut self) -> Result<Hash> {
self.active.flush()?;
let leaves = self.spine.leaves()?;
let mut segments: Vec<(PathBuf, u64)> = self
.sealed
.values()
.map(|s| (s.path.clone(), s.meta.base_index))
.collect();
segments.push((self.active.path().to_path_buf(), self.active.base_index()));
segments.sort_by_key(|(_, base)| *base);
for (path, base) in segments {
let mut reader = SegmentReader::open(&path)?;
let mut idx = 0u64;
while let Some((_, payload)) = reader.next_record()? {
let abs = base + idx;
let expected =
leaves
.get(abs as usize)
.ok_or_else(|| crate::error::Error::Corrupt {
segment: path.display().to_string(),
offset: abs,
reason: "record has no matching leaf in the merkle spine — the spine \
was truncated or a segment was replaced"
.into(),
})?;
if leaf_hash(&payload) != *expected {
return Err(crate::error::Error::Corrupt {
segment: path.display().to_string(),
offset: abs,
reason: "record payload does not match its merkle leaf — the record \
was modified after being written"
.into(),
});
}
idx += 1;
}
}
self.spine.verify()
}
pub fn purge_older_than(&mut self, cutoff_micros: u64) -> Result<usize> {
let doomed: Vec<u64> = self
.sealed
.iter()
.filter(|(_, s)| s.meta.max_timestamp < cutoff_micros)
.map(|(&seq, _)| seq)
.collect();
for seq in &doomed {
if let Some(s) = self.sealed.remove(seq) {
std::fs::remove_file(s.path)?;
let _ = std::fs::remove_file(meta_path(&self.dir, *seq));
}
}
Ok(doomed.len())
}
pub fn total_bytes(&mut self) -> Result<u64> {
self.active.flush()?;
let mut total = self.active.len();
for s in self.sealed.values() {
total += std::fs::metadata(&s.path)?.len();
}
Ok(total)
}
pub fn oldest_sealed_max_ts(&self) -> Option<u64> {
self.sealed.values().next().map(|s| s.meta.max_timestamp)
}
pub fn purge_oldest_sealed(&mut self) -> Result<Option<u64>> {
let Some(&seq) = self.sealed.keys().next() else {
return Ok(None);
};
let path = self.sealed.remove(&seq).expect("key just observed").path;
let bytes = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
std::fs::remove_file(path)?;
Ok(Some(bytes))
}
pub fn active_seq(&self) -> u64 {
self.active_seq
}
pub fn root(&self) -> Hash {
self.spine.root()
}
pub fn spine_size(&self) -> u64 {
self.spine.size()
}
pub fn purged_count(&self) -> u64 {
self.sealed
.values()
.map(|s| s.meta.base_index)
.min()
.unwrap_or_else(|| self.active.base_index())
}
pub fn record_count(&self) -> u64 {
self.active.records() + self.sealed.values().map(|s| s.meta.records).sum::<u64>()
}
pub fn prove_inclusion(&mut self, leaf_index: u64) -> Result<InclusionProof> {
self.spine.prove_inclusion(leaf_index)
}
pub fn prove_consistency(&mut self, first_size: u64) -> Result<ConsistencyProof> {
self.spine.prove_consistency(first_size)
}
}
impl<T: Serialize + DeserializeOwned> Table<T> {
pub fn clear(&mut self) -> Result<()> {
let old_active = self.active.path().to_path_buf();
let old_seqs: Vec<u64> = self.sealed.keys().copied().collect();
self.active_seq += 1;
self.active = Segment::open(&segment_path(&self.dir, self.active_seq), 0)?;
self.spine.truncate(0)?;
self.spine.sync()?;
std::fs::remove_file(old_active)?;
for (_, s) in std::mem::take(&mut self.sealed) {
std::fs::remove_file(s.path)?;
}
for seq in old_seqs {
let _ = std::fs::remove_file(meta_path(&self.dir, seq));
}
Ok(())
}
}
fn segment_path(dir: &Path, seq: u64) -> PathBuf {
dir.join(format!("{SEGMENT_PREFIX}{seq:010}{SEGMENT_SUFFIX}"))
}
#[derive(Debug, Clone)]
pub struct RewriteStats {
pub old_root: Hash,
pub new_root: Hash,
pub records: u64,
}
pub fn rewrite_table<T: Serialize + DeserializeOwned>(
dir: &Path,
max_segment_bytes: u64,
mut f: impl FnMut(T) -> Result<T>,
) -> Result<RewriteStats> {
let name = dir
.file_name()
.ok_or_else(|| crate::error::Error::Encode("table dir has no name".into()))?
.to_string_lossy();
let tmp = dir.with_file_name(format!("{name}.rewrite"));
let backup = dir.with_file_name(format!("{name}.pre-rewrite"));
for stale in [&tmp, &backup] {
if stale.exists() {
std::fs::remove_dir_all(stale)?;
}
}
let mut old: Table<T> = Table::open(dir, max_segment_bytes)?;
let old_root = old.root();
let slices = old.slices()?;
let mut fresh: Table<T> = Table::open(&tmp, max_segment_bytes)?;
for slice in slices {
let mut reader = SegmentReader::open_bounded(&slice.path, slice.bound)?;
while let Some((ts, payload)) = reader.next_record()? {
let row: T = bincode::deserialize(&payload)?;
fresh.append(&f(row)?, ts)?;
}
}
fresh.sync()?;
let new_root = fresh.root();
let records = fresh.record_count();
drop(old);
drop(fresh);
std::fs::rename(dir, &backup)?;
std::fs::rename(&tmp, dir)?;
std::fs::remove_dir_all(&backup)?;
Ok(RewriteStats {
old_root,
new_root,
records,
})
}
fn meta_path(dir: &Path, seq: u64) -> PathBuf {
dir.join(format!("{SEGMENT_PREFIX}{seq:010}{META_SUFFIX}"))
}
fn read_meta(path: &Path) -> Option<SegmentMeta> {
let bytes = std::fs::read(path).ok()?;
bincode::deserialize(&bytes).ok()
}
fn write_meta(path: &Path, meta: &SegmentMeta) {
if let Ok(bytes) = bincode::serialize(meta) {
let _ = std::fs::write(path, bytes);
}
}
#[derive(Debug, Clone)]
pub struct SegmentSlice {
pub path: PathBuf,
pub bound: u64,
pub seq: u64,
pub min_ts: u64,
pub max_ts: u64,
}
pub struct TableScan<T> {
slices: Vec<SegmentSlice>,
current: Option<SegmentReader>,
idx: usize,
_marker: PhantomData<T>,
}
impl<T> TableScan<T> {
pub fn over(slices: Vec<SegmentSlice>) -> Self {
Self {
slices,
current: None,
idx: 0,
_marker: PhantomData,
}
}
}
impl<T: DeserializeOwned> TableScan<T> {
pub fn next_row(&mut self) -> Result<Option<T>> {
loop {
if self.current.is_none() {
if self.idx >= self.slices.len() {
return Ok(None);
}
let s = &self.slices[self.idx];
self.idx += 1;
self.current = match SegmentReader::open_bounded(&s.path, s.bound) {
Ok(r) => Some(r),
Err(crate::error::Error::Io(e)) if e.kind() == std::io::ErrorKind::NotFound => {
continue;
}
Err(e) => return Err(e),
};
}
if let Some((_, payload)) = self.current.as_mut().unwrap().next_record()? {
return Ok(Some(bincode::deserialize(&payload)?));
}
self.current = None;
}
}
}
impl<T: DeserializeOwned> Iterator for TableScan<T> {
type Item = Result<T>;
fn next(&mut self) -> Option<Self::Item> {
self.next_row().transpose()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Position {
pub seq: u64,
pub idx: u64,
}
pub struct PositionedScan<T> {
slices: Vec<SegmentSlice>,
slice_idx: usize,
desc: bool,
from: u64,
to: u64,
after: Option<Position>,
current: Option<(u64, u64, SegmentReader)>, buffered: Vec<(Position, u64, Vec<u8>)>,
segments_opened: u64,
_marker: PhantomData<T>,
}
impl<T> PositionedScan<T> {
pub fn new(
mut slices: Vec<SegmentSlice>,
desc: bool,
from: Option<u64>,
to: Option<u64>,
after: Option<Position>,
) -> Self {
slices.sort_by_key(|s| s.seq);
if desc {
slices.reverse();
}
Self {
slices,
slice_idx: 0,
desc,
from: from.unwrap_or(0),
to: to.unwrap_or(u64::MAX),
after,
current: None,
buffered: Vec::new(),
segments_opened: 0,
_marker: PhantomData,
}
}
pub fn segments_opened(&self) -> u64 {
self.segments_opened
}
fn prune(&self, s: &SegmentSlice) -> bool {
if s.max_ts < self.from || s.min_ts > self.to {
return true;
}
match self.after {
Some(p) if !self.desc && s.seq < p.seq => true,
Some(p) if self.desc && s.seq > p.seq => true,
_ => false,
}
}
fn in_range(&self, ts: u64) -> bool {
ts >= self.from && ts <= self.to
}
fn past_cursor(&self, pos: Position) -> bool {
match self.after {
None => true,
Some(p) => {
if self.desc {
pos < p
} else {
pos > p
}
}
}
}
}
impl<T: DeserializeOwned> PositionedScan<T> {
pub fn next_row(&mut self) -> Result<Option<(Position, T)>> {
if self.desc {
self.next_desc()
} else {
self.next_asc()
}
}
fn next_asc(&mut self) -> Result<Option<(Position, T)>> {
loop {
if self.current.is_none() {
let Some(slice) = self.next_slice()? else {
return Ok(None);
};
let reader = SegmentReader::open_bounded(&slice.path, slice.bound)?;
self.current = Some((slice.seq, 0, reader));
}
let (seq, idx, reader) = self.current.as_mut().unwrap();
match reader.next_record()? {
Some((ts, payload)) => {
let pos = Position {
seq: *seq,
idx: *idx,
};
*idx += 1;
if self.in_range(ts) && self.past_cursor(pos) {
return Ok(Some((pos, bincode::deserialize(&payload)?)));
}
}
None => self.current = None,
}
}
}
fn next_desc(&mut self) -> Result<Option<(Position, T)>> {
loop {
if let Some((pos, _, payload)) = self.buffered.pop() {
return Ok(Some((pos, bincode::deserialize(&payload)?)));
}
let Some(slice) = self.next_slice()? else {
return Ok(None);
};
let mut reader = SegmentReader::open_bounded(&slice.path, slice.bound)?;
let mut idx = 0u64;
while let Some((ts, payload)) = reader.next_record()? {
let pos = Position {
seq: slice.seq,
idx,
};
idx += 1;
if self.in_range(ts) && self.past_cursor(pos) {
self.buffered.push((pos, ts, payload));
}
}
}
}
fn next_slice(&mut self) -> Result<Option<SegmentSlice>> {
while self.slice_idx < self.slices.len() {
let s = self.slices[self.slice_idx].clone();
self.slice_idx += 1;
if self.prune(&s) {
continue;
}
self.segments_opened += 1;
return Ok(Some(s));
}
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde::Deserialize;
#[derive(Debug, PartialEq, Serialize, Deserialize)]
struct Row {
ts: u64,
msg: String,
}
fn fill(t: &mut Table<Row>, n: u64) {
for i in 0..n {
t.append(
&Row {
ts: i,
msg: format!("row-{i}"),
},
i,
)
.unwrap();
}
t.sync().unwrap();
}
#[test]
fn rolls_segments_scans_and_purges() {
let dir = tempfile::tempdir().unwrap();
let mut t: Table<Row> = Table::open(dir.path(), 256).unwrap();
fill(&mut t, 50);
let rows: Vec<Row> = t.scan().unwrap().map(|r| r.unwrap()).collect();
assert_eq!(rows.len(), 50);
let files = std::fs::read_dir(dir.path())
.unwrap()
.filter(|e| {
e.as_ref()
.unwrap()
.file_name()
.to_string_lossy()
.ends_with(SEGMENT_SUFFIX)
})
.count();
assert!(files > 1, "expected rolled segments, got {files}");
drop(t);
let mut t2: Table<Row> = Table::open(dir.path(), 256).unwrap();
assert_eq!(t2.scan().unwrap().count(), 50);
let purged = t2.purge_older_than(40).unwrap();
assert!(purged > 0);
let remaining: Vec<Row> = t2.scan().unwrap().map(|r| r.unwrap()).collect();
assert!(remaining.len() < 50);
assert_eq!(remaining.last().unwrap().msg, "row-49");
for entry in std::fs::read_dir(dir.path()).unwrap() {
let name = entry.unwrap().file_name();
let name = name.to_string_lossy().to_string();
if let Some(seq) = name
.strip_prefix(SEGMENT_PREFIX)
.and_then(|s| s.strip_suffix(META_SUFFIX))
{
let seg = dir.path().join(format!("seg-{seq}.log"));
assert!(seg.exists(), "orphaned sidecar {name}");
}
}
}
#[test]
fn sidecar_meta_survives_reopen_and_rebuilds_when_missing() {
let dir = tempfile::tempdir().unwrap();
let mut t: Table<Row> = Table::open(dir.path(), 256).unwrap();
fill(&mut t, 50);
let slices_before = t.slices().unwrap();
assert!(slices_before.len() > 1);
drop(t);
let victim = meta_path(dir.path(), slices_before[0].seq);
assert!(victim.exists());
std::fs::remove_file(&victim).unwrap();
let mut t2: Table<Row> = Table::open(dir.path(), 256).unwrap();
let slices_after = t2.slices().unwrap();
for (b, a) in slices_before.iter().zip(&slices_after) {
assert_eq!((b.seq, b.min_ts, b.max_ts), (a.seq, a.min_ts, a.max_ts));
}
assert!(victim.exists(), "sidecar was not rebuilt");
}
#[test]
fn positioned_scan_prunes_by_time_and_orders_both_ways() {
let dir = tempfile::tempdir().unwrap();
let mut t: Table<Row> = Table::open(dir.path(), 256).unwrap();
fill(&mut t, 50);
let slices = t.slices().unwrap();
let total_segments = slices.len() as u64;
let mut scan: PositionedScan<Row> =
PositionedScan::new(slices.clone(), false, None, None, None);
let mut asc = Vec::new();
while let Some((_, row)) = scan.next_row().unwrap() {
asc.push(row.ts);
}
assert_eq!(asc, (0..50).collect::<Vec<_>>());
assert_eq!(scan.segments_opened(), total_segments);
let mut scan: PositionedScan<Row> =
PositionedScan::new(slices.clone(), true, None, None, None);
let mut desc = Vec::new();
while let Some((_, row)) = scan.next_row().unwrap() {
desc.push(row.ts);
}
assert_eq!(desc, (0..50).rev().collect::<Vec<_>>());
let mut scan: PositionedScan<Row> =
PositionedScan::new(slices, true, Some(45), Some(49), None);
let mut hits = Vec::new();
while let Some((_, row)) = scan.next_row().unwrap() {
hits.push(row.ts);
}
assert_eq!(hits, vec![49, 48, 47, 46, 45]);
assert!(
scan.segments_opened() < total_segments,
"pruning opened all {total_segments} segments"
);
}
#[test]
fn verify_passes_and_root_survives_reopen() {
let dir = tempfile::tempdir().unwrap();
let root;
{
let mut t: Table<Row> = Table::open(dir.path(), 256).unwrap();
fill(&mut t, 40);
root = t.verify().unwrap();
assert_eq!(t.spine_size(), 40);
}
let mut t2: Table<Row> = Table::open(dir.path(), 256).unwrap();
assert_eq!(t2.spine_size(), 40);
assert_eq!(t2.root(), root);
assert_eq!(t2.verify().unwrap(), root);
}
#[test]
fn reconcile_repairs_a_spine_left_behind() {
let dir = tempfile::tempdir().unwrap();
let root;
{
let mut t: Table<Row> = Table::open(dir.path(), 4096).unwrap();
fill(&mut t, 25);
root = t.root();
}
let spine = dir.path().join("merkle.spine");
let len = std::fs::metadata(&spine).unwrap().len();
let f = std::fs::OpenOptions::new()
.write(true)
.open(&spine)
.unwrap();
f.set_len(len - 64).unwrap();
drop(f);
let mut t: Table<Row> = Table::open(dir.path(), 4096).unwrap();
assert_eq!(t.spine_size(), 25, "spine re-derived from the segments");
assert_eq!(t.root(), root);
assert_eq!(t.verify().unwrap(), root);
}
#[test]
fn reconcile_truncates_a_spine_ahead() {
let dir = tempfile::tempdir().unwrap();
{
let mut t: Table<Row> = Table::open(dir.path(), 4096).unwrap();
fill(&mut t, 25);
}
let seg = dir.path().join("seg-0000000000.log");
let len = std::fs::metadata(&seg).unwrap().len();
let row_bytes = (crate::storage::segment::FRAME_HEADER as u64)
+ bincode::serialize(&Row {
ts: 0,
msg: "row-24".into(),
})
.unwrap()
.len() as u64;
let f = std::fs::OpenOptions::new().write(true).open(&seg).unwrap();
f.set_len(len - row_bytes).unwrap();
drop(f);
let mut t: Table<Row> = Table::open(dir.path(), 4096).unwrap();
assert_eq!(
t.spine_size(),
24,
"spine truncated to the surviving records"
);
assert_eq!(t.record_count(), 24);
t.verify().unwrap();
}
#[test]
fn scan_skips_segments_purged_after_snapshot() {
let dir = tempfile::tempdir().unwrap();
let mut t: Table<Row> = Table::open(dir.path(), 256).unwrap();
for i in 0..50u64 {
t.append(
&Row {
ts: i,
msg: format!("row-{i}"),
},
i,
)
.unwrap();
}
t.sync().unwrap();
let slices = t.slices().unwrap();
assert!(slices.len() > 2, "need several segments for this test");
std::fs::remove_file(&slices[0].path).unwrap();
let rows: Vec<Row> = TableScan::<Row>::over(slices)
.collect::<Result<Vec<_>>>()
.expect("scan must not fail on a purged segment");
assert!(!rows.is_empty() && rows.len() < 50);
assert_eq!(rows.last().unwrap().msg, "row-49");
}
}