use std::time::{Duration, Instant};
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct CheckpointConfig {
pub flush_fraction: f64,
pub tick_interval: Duration,
pub force_flush_threshold: usize,
pub io_budget_bytes_per_tick: usize,
}
impl Default for CheckpointConfig {
fn default() -> Self {
Self {
flush_fraction: 0.10,
tick_interval: Duration::from_secs(30),
io_budget_bytes_per_tick: 64 * 1024 * 1024, force_flush_threshold: 100_000,
}
}
}
#[derive(Debug, Clone)]
pub struct EngineCheckpointState {
pub engine_name: String,
pub dirty_pages: usize,
pub total_flushed: u64,
pub last_flush: Option<Instant>,
}
impl EngineCheckpointState {
pub fn new(engine_name: &str) -> Self {
Self {
engine_name: engine_name.to_string(),
dirty_pages: 0,
total_flushed: 0,
last_flush: None,
}
}
pub fn mark_dirty(&mut self, count: usize) {
self.dirty_pages += count;
}
pub fn pages_to_flush(&self, config: &CheckpointConfig) -> usize {
if self.dirty_pages >= config.force_flush_threshold {
self.dirty_pages
} else {
let target = (self.dirty_pages as f64 * config.flush_fraction).ceil() as usize;
target.max(1).min(self.dirty_pages)
}
}
pub fn record_flush(&mut self, count: usize) {
self.dirty_pages = self.dirty_pages.saturating_sub(count);
self.total_flushed += count as u64;
self.last_flush = Some(Instant::now());
}
}
pub struct CheckpointCoordinator {
config: CheckpointConfig,
engines: Vec<EngineCheckpointState>,
last_tick: Option<Instant>,
checkpoint_lsn: u64,
checkpoint_count: u64,
}
impl CheckpointCoordinator {
pub fn new(config: CheckpointConfig) -> Self {
Self {
config,
engines: Vec::new(),
last_tick: None,
checkpoint_lsn: 0,
checkpoint_count: 0,
}
}
pub fn register_engine(&mut self, name: &str) {
if !self.engines.iter().any(|e| e.engine_name == name) {
self.engines.push(EngineCheckpointState::new(name));
}
}
pub fn mark_dirty(&mut self, engine: &str, count: usize) {
if let Some(state) = self.engines.iter_mut().find(|e| e.engine_name == engine) {
state.mark_dirty(count);
}
}
pub fn tick(&mut self) -> Vec<(String, usize)> {
let now = Instant::now();
if let Some(last) = self.last_tick
&& now.duration_since(last) < self.config.tick_interval
{
return Vec::new();
}
self.last_tick = Some(now);
let mut flush_plan = Vec::new();
let mut budget_remaining = self.config.io_budget_bytes_per_tick;
let page_size = 4096;
for engine in &self.engines {
if engine.dirty_pages == 0 {
continue;
}
let target = engine.pages_to_flush(&self.config);
let budget_pages = budget_remaining / page_size;
let actual = target.min(budget_pages);
if actual > 0 {
flush_plan.push((engine.engine_name.clone(), actual));
budget_remaining = budget_remaining.saturating_sub(actual * page_size);
}
}
if !flush_plan.is_empty() {
debug!(
engines = flush_plan.len(),
total_pages = flush_plan.iter().map(|(_, p)| p).sum::<usize>(),
"checkpoint tick: flushing"
);
}
flush_plan
}
pub fn record_flush(&mut self, engine: &str, count: usize) {
if let Some(state) = self.engines.iter_mut().find(|e| e.engine_name == engine) {
state.record_flush(count);
}
}
pub fn complete_checkpoint(&mut self, lsn: u64) {
self.checkpoint_lsn = lsn;
self.checkpoint_count += 1;
info!(lsn, count = self.checkpoint_count, "checkpoint completed");
}
pub fn checkpoint_lsn(&self) -> u64 {
self.checkpoint_lsn
}
pub fn is_clean(&self) -> bool {
self.engines.iter().all(|e| e.dirty_pages == 0)
}
pub fn total_dirty_pages(&self) -> usize {
self.engines.iter().map(|e| e.dirty_pages).sum()
}
pub fn checkpoint_count(&self) -> u64 {
self.checkpoint_count
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn incremental_flush() {
let config = CheckpointConfig {
flush_fraction: 0.10,
tick_interval: Duration::from_millis(0), ..Default::default()
};
let mut coord = CheckpointCoordinator::new(config);
coord.register_engine("sparse");
coord.register_engine("vector");
coord.mark_dirty("sparse", 100);
coord.mark_dirty("vector", 50);
let plan = coord.tick();
assert!(!plan.is_empty());
let sparse_flush = plan.iter().find(|(e, _)| e == "sparse").unwrap().1;
assert_eq!(sparse_flush, 10);
coord.record_flush("sparse", sparse_flush);
assert_eq!(
coord
.engines
.iter()
.find(|e| e.engine_name == "sparse")
.unwrap()
.dirty_pages,
90
);
}
#[test]
fn force_flush_over_threshold() {
let config = CheckpointConfig {
force_flush_threshold: 50,
tick_interval: Duration::from_millis(0),
..Default::default()
};
let mut coord = CheckpointCoordinator::new(config);
coord.register_engine("sparse");
coord.mark_dirty("sparse", 100);
let plan = coord.tick();
let sparse_flush = plan.iter().find(|(e, _)| e == "sparse").unwrap().1;
assert_eq!(sparse_flush, 100); }
#[test]
fn clean_after_all_flushed() {
let config = CheckpointConfig {
flush_fraction: 1.0, tick_interval: Duration::from_millis(0),
..Default::default()
};
let mut coord = CheckpointCoordinator::new(config);
coord.register_engine("sparse");
coord.mark_dirty("sparse", 50);
let plan = coord.tick();
for (engine, count) in &plan {
coord.record_flush(engine, *count);
}
assert!(coord.is_clean());
}
#[test]
fn checkpoint_lsn_tracking() {
let mut coord = CheckpointCoordinator::new(CheckpointConfig::default());
assert_eq!(coord.checkpoint_lsn(), 0);
coord.complete_checkpoint(42);
assert_eq!(coord.checkpoint_lsn(), 42);
assert_eq!(coord.checkpoint_count(), 1);
}
}