use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy)]
pub struct CheckpointConfig {
pub timeout: Duration,
pub max_wal_bytes: u64,
pub min_completion_target_ratio: f64,
}
impl Default for CheckpointConfig {
fn default() -> Self {
Self {
timeout: Duration::from_secs(300),
max_wal_bytes: 1024 * 1024 * 1024,
min_completion_target_ratio: 0.9,
}
}
}
#[derive(Debug, Default)]
pub struct CheckpointStats {
pub checkpoints_completed: AtomicU64,
pub pages_flushed_total: AtomicU64,
pub wal_truncated_bytes: AtomicU64,
pub last_checkpoint_lsn: AtomicU64,
pub last_checkpoint_duration_ms: AtomicU64,
}
impl CheckpointStats {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
pub fn snapshot(&self) -> CheckpointStatsSnapshot {
CheckpointStatsSnapshot {
checkpoints_completed: self.checkpoints_completed.load(Ordering::Relaxed),
pages_flushed_total: self.pages_flushed_total.load(Ordering::Relaxed),
wal_truncated_bytes: self.wal_truncated_bytes.load(Ordering::Relaxed),
last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::Relaxed),
last_checkpoint_duration_ms: self.last_checkpoint_duration_ms.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct CheckpointStatsSnapshot {
pub checkpoints_completed: u64,
pub pages_flushed_total: u64,
pub wal_truncated_bytes: u64,
pub last_checkpoint_lsn: u64,
pub last_checkpoint_duration_ms: u64,
}
pub trait CheckpointDriver: Send + Sync {
fn current_wal_bytes(&self) -> u64;
fn last_checkpoint_wal_bytes(&self) -> u64;
fn run_checkpoint(&self) -> CheckpointResult;
}
#[derive(Debug, Clone, Copy)]
pub struct CheckpointResult {
pub pages_flushed: u64,
pub new_redo_lsn: u64,
pub wal_truncated_bytes: u64,
}
pub struct CheckpointerHandle {
stop: Arc<AtomicBool>,
pub stats: Arc<CheckpointStats>,
}
impl CheckpointerHandle {
pub fn stop(&self) {
self.stop.store(true, Ordering::Release);
}
}
impl Drop for CheckpointerHandle {
fn drop(&mut self) {
self.stop();
}
}
pub fn spawn(driver: Arc<dyn CheckpointDriver>, config: CheckpointConfig) -> CheckpointerHandle {
let stop = Arc::new(AtomicBool::new(false));
let stats = CheckpointStats::new();
let stop_clone = Arc::clone(&stop);
let stats_clone = Arc::clone(&stats);
std::thread::spawn(move || {
let mut last_checkpoint_at = Instant::now();
loop {
if stop_clone.load(Ordering::Acquire) {
break;
}
let elapsed = last_checkpoint_at.elapsed();
let wal_grown = driver.current_wal_bytes() - driver.last_checkpoint_wal_bytes();
let trigger = elapsed >= config.timeout || wal_grown >= config.max_wal_bytes;
if trigger {
let start = Instant::now();
let result = driver.run_checkpoint();
let duration = start.elapsed();
stats_clone
.checkpoints_completed
.fetch_add(1, Ordering::Relaxed);
stats_clone
.pages_flushed_total
.fetch_add(result.pages_flushed, Ordering::Relaxed);
stats_clone
.wal_truncated_bytes
.fetch_add(result.wal_truncated_bytes, Ordering::Relaxed);
stats_clone
.last_checkpoint_lsn
.store(result.new_redo_lsn, Ordering::Relaxed);
stats_clone
.last_checkpoint_duration_ms
.store(duration.as_millis() as u64, Ordering::Relaxed);
last_checkpoint_at = Instant::now();
}
std::thread::sleep(Duration::from_secs(1));
}
});
CheckpointerHandle { stop, stats }
}