#![allow(dead_code)]
use std::fmt::Write as _;
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use crate::error::LogError;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EpochEntry {
pub epoch: i32,
pub start_offset: i64,
}
#[derive(Debug)]
pub struct LeaderEpochCheckpoint {
path: PathBuf,
entries: Vec<EpochEntry>,
}
pub const UNDEFINED_EPOCH: i32 = -1;
pub const UNDEFINED_OFFSET: i64 = -1;
impl LeaderEpochCheckpoint {
pub fn open(path: PathBuf) -> Result<Self, LogError> {
let entries = match fs::read_to_string(&path) {
Ok(s) => Self::parse(&s)?,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Vec::new(),
Err(e) => return Err(LogError::Io(e)),
};
Ok(Self { path, entries })
}
fn parse(s: &str) -> Result<Vec<EpochEntry>, LogError> {
let mut lines = s.lines();
let _version = lines.next();
let count: usize = lines
.next()
.and_then(|l| l.trim().parse().ok())
.unwrap_or(0);
let mut out = Vec::new();
for line in lines.take(count) {
let mut parts = line.split_whitespace();
let epoch = parts
.next()
.and_then(|t| t.parse().ok())
.ok_or_else(|| LogError::Corrupt(format!("bad checkpoint row: {line:?}")))?;
let start_offset = parts
.next()
.and_then(|t| t.parse().ok())
.ok_or_else(|| LogError::Corrupt(format!("bad checkpoint row: {line:?}")))?;
out.push(EpochEntry {
epoch,
start_offset,
});
}
Ok(out)
}
pub fn append(&mut self, epoch: i32, start_offset: i64) -> Result<(), LogError> {
if self.entries.iter().any(|e| e.epoch == epoch) {
return Ok(());
}
self.entries.push(EpochEntry {
epoch,
start_offset,
});
self.flush()
}
pub fn truncate_from_end(&mut self, end_offset: i64) -> Result<(), LogError> {
let before = self.entries.len();
self.entries.retain(|e| e.start_offset < end_offset);
if self.entries.len() != before {
self.flush()?;
}
Ok(())
}
fn flush(&self) -> Result<(), LogError> {
let mut s = String::new();
s.push_str("0\n");
let _ = writeln!(s, "{}", self.entries.len());
for e in &self.entries {
let _ = writeln!(s, "{} {}", e.epoch, e.start_offset);
}
let tmp = self.path.with_extension("tmp");
{
let mut f = fs::File::create(&tmp).map_err(LogError::Io)?;
f.write_all(s.as_bytes()).map_err(LogError::Io)?;
f.sync_data().map_err(LogError::Io)?;
}
fs::rename(&tmp, &self.path).map_err(LogError::Io)?;
Ok(())
}
#[must_use]
pub fn end_offset_for_epoch(&self, epoch: i32, log_end_offset: i64) -> i64 {
if !self.entries.iter().any(|e| e.epoch == epoch) {
return -1;
}
self.entries
.iter()
.filter(|e| e.epoch > epoch)
.map(|e| e.start_offset)
.min()
.unwrap_or(log_end_offset)
}
#[must_use]
pub fn epoch_for_offset(&self, offset: i64) -> Option<i32> {
self.entries
.iter()
.filter(|e| e.start_offset <= offset)
.max_by_key(|e| e.start_offset)
.map(|e| e.epoch)
}
#[must_use]
pub fn epoch_and_offset_for(&self, requested_epoch: i32, log_end_offset: i64) -> (i32, i64) {
if requested_epoch == UNDEFINED_EPOCH {
return (UNDEFINED_EPOCH, log_end_offset);
}
if self.latest_epoch() == Some(requested_epoch) {
return (requested_epoch, log_end_offset);
}
let higher = self
.entries
.iter()
.filter(|e| e.epoch > requested_epoch)
.min_by_key(|e| e.epoch);
match higher {
None => (UNDEFINED_EPOCH, log_end_offset),
Some(next) => {
let floor = self
.entries
.iter()
.filter(|e| e.epoch <= requested_epoch)
.map(|e| e.epoch)
.max();
match floor {
Some(f) => (f, next.start_offset),
None => (requested_epoch, next.start_offset),
}
}
}
}
#[must_use]
pub fn latest_epoch(&self) -> Option<i32> {
self.entries.iter().map(|e| e.epoch).max()
}
#[must_use]
pub fn entries(&self) -> &[EpochEntry] {
&self.entries
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use tempfile::TempDir;
fn fresh() -> (TempDir, PathBuf) {
let dir = TempDir::new().unwrap();
let path = dir.path().join("leader-epoch-checkpoint");
(dir, path)
}
#[test]
fn round_trip_byte_compat_format() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path.clone()).unwrap();
c.append(0, 0).unwrap();
c.append(1, 50).unwrap();
c.append(2, 100).unwrap();
let s = std::fs::read_to_string(&path).unwrap();
assert!(s == "0\n3\n0 0\n1 50\n2 100\n");
}
#[test]
fn append_preserves_existing_rows() {
let (_d, path) = fresh();
{
let mut c = LeaderEpochCheckpoint::open(path.clone()).unwrap();
c.append(0, 0).unwrap();
}
let mut c2 = LeaderEpochCheckpoint::open(path).unwrap();
c2.append(1, 50).unwrap();
assert!(c2.entries().len() == 2);
}
#[test]
fn append_idempotent_for_same_epoch() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path).unwrap();
c.append(0, 0).unwrap();
c.append(0, 999).unwrap(); assert!(
c.entries()
== &[EpochEntry {
epoch: 0,
start_offset: 0
}]
);
}
#[test]
fn end_offset_for_current_epoch_returns_log_end_offset() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path).unwrap();
c.append(0, 0).unwrap();
c.append(1, 50).unwrap();
assert!(c.end_offset_for_epoch(1, 100) == 100);
}
#[test]
fn end_offset_for_older_epoch_returns_next_start() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path).unwrap();
c.append(0, 0).unwrap();
c.append(1, 50).unwrap();
c.append(2, 100).unwrap();
assert!(c.end_offset_for_epoch(0, 200) == 50);
assert!(c.end_offset_for_epoch(1, 200) == 100);
}
#[test]
fn end_offset_for_unknown_epoch_returns_undefined() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path).unwrap();
c.append(0, 0).unwrap();
assert!(c.end_offset_for_epoch(7, 200) == -1);
}
#[test]
fn truncate_from_end_removes_entries_at_or_after_end_offset() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path).unwrap();
c.append(1, 0).unwrap();
c.append(7, 4).unwrap();
c.truncate_from_end(4).unwrap();
assert!(c.latest_epoch() == Some(1));
assert!(c.end_offset_for_epoch(7, 4) == -1);
assert!(c.end_offset_for_epoch(1, 4) == 4);
}
#[test]
fn missing_file_yields_empty() {
let (_d, path) = fresh();
let c = LeaderEpochCheckpoint::open(path).unwrap();
assert!(c.entries().is_empty());
assert!(c.latest_epoch() == None);
}
#[test]
fn absurd_declared_count_does_not_over_allocate() {
let s = "0\n9999999999999\n3 42\n";
let entries = LeaderEpochCheckpoint::parse(s).unwrap();
assert!(
entries
== [EpochEntry {
epoch: 3,
start_offset: 42,
}],
"only the one real row is parsed despite the absurd declared count"
);
assert!(entries.capacity() < 9_999_999_999_999);
}
#[test]
fn epoch_for_offset_empty_returns_none() {
let (_d, path) = fresh();
let c = LeaderEpochCheckpoint::open(path).unwrap();
assert!(c.epoch_for_offset(0) == None, "empty checkpoint → None");
assert!(c.epoch_for_offset(100) == None, "empty checkpoint → None");
}
#[test]
fn epoch_for_offset_before_first_entry_returns_none() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path).unwrap();
c.append(0, 10).unwrap();
c.append(1, 50).unwrap();
assert!(
c.epoch_for_offset(9) == None,
"offset before first entry's start_offset → None"
);
}
#[test]
fn epoch_for_offset_within_epoch_range() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path).unwrap();
c.append(0, 0).unwrap();
c.append(1, 50).unwrap();
c.append(2, 100).unwrap();
assert!(c.epoch_for_offset(0) == Some(0), "start of epoch 0");
assert!(c.epoch_for_offset(25) == Some(0), "middle of epoch 0");
assert!(
c.epoch_for_offset(49) == Some(0),
"last offset before epoch 1"
);
assert!(c.epoch_for_offset(50) == Some(1), "start of epoch 1");
assert!(c.epoch_for_offset(75) == Some(1), "middle of epoch 1");
assert!(
c.epoch_for_offset(99) == Some(1),
"last offset before epoch 2"
);
}
#[test]
fn epoch_for_offset_at_epoch_boundary() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path).unwrap();
c.append(0, 0).unwrap();
c.append(1, 50).unwrap();
assert!(
c.epoch_for_offset(50) == Some(1),
"boundary offset belongs to the epoch that starts there"
);
}
#[test]
fn epoch_for_offset_past_last_entry() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path).unwrap();
c.append(0, 0).unwrap();
c.append(1, 50).unwrap();
assert!(
c.epoch_for_offset(100) == Some(1),
"offset past last entry → last epoch"
);
assert!(
c.epoch_for_offset(999) == Some(1),
"far past last entry → last epoch"
);
}
#[test]
fn epoch_for_offset_single_entry_at_zero() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path).unwrap();
c.append(0, 0).unwrap();
assert!(c.epoch_for_offset(0) == Some(0));
assert!(c.epoch_for_offset(1000) == Some(0));
}
#[test]
fn epoch_and_offset_latest_returns_pair_at_log_end() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path).unwrap();
c.append(0, 0).unwrap();
c.append(1, 50).unwrap();
assert!(c.epoch_and_offset_for(1, 100) == (1, 100));
}
#[test]
fn epoch_and_offset_older_returns_floor_epoch_and_next_start() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path).unwrap();
c.append(0, 0).unwrap();
c.append(1, 50).unwrap();
c.append(2, 100).unwrap();
assert!(c.epoch_and_offset_for(0, 200) == (0, 50));
assert!(c.epoch_and_offset_for(1, 200) == (1, 100));
}
#[test]
fn epoch_and_offset_gap_uses_floor_epoch() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path).unwrap();
c.append(0, 0).unwrap();
c.append(5, 100).unwrap();
assert!(c.epoch_and_offset_for(3, 200) == (0, 100));
}
#[test]
fn epoch_and_offset_future_epoch_is_undefined_at_log_end() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path).unwrap();
c.append(0, 0).unwrap();
c.append(1, 50).unwrap();
assert!(c.epoch_and_offset_for(7, 100) == (UNDEFINED_EPOCH, 100));
}
#[test]
fn epoch_and_offset_below_all_returns_requested_and_first_start() {
let (_d, path) = fresh();
let mut c = LeaderEpochCheckpoint::open(path).unwrap();
c.append(3, 30).unwrap();
c.append(4, 40).unwrap();
assert!(c.epoch_and_offset_for(1, 100) == (1, 30));
}
#[test]
fn epoch_and_offset_empty_cache_is_undefined_at_log_end() {
let (_d, path) = fresh();
let c = LeaderEpochCheckpoint::open(path).unwrap();
assert!(c.epoch_and_offset_for(0, 9) == (UNDEFINED_EPOCH, 9));
}
}