mod eviction;
mod io;
mod round;
use crossbeam_channel::{bounded, Sender};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use crate::concurrency::{CommitGate, MaintenanceGate};
use crate::journal::group_commit::Journal;
use crate::store::BufferManager;
use self::io::IoTask;
#[derive(Debug, Clone)]
pub struct CheckpointConfig {
pub enabled: bool,
pub idle_interval: Duration,
pub dirty_blob_threshold: usize,
pub auto_merge: bool,
pub eviction_interval: Duration,
pub eviction_idle_ticks: u64,
pub io_queue_capacity: usize,
}
impl Default for CheckpointConfig {
fn default() -> Self {
Self {
enabled: false,
idle_interval: Duration::from_millis(100),
dirty_blob_threshold: 16,
auto_merge: true,
eviction_interval: Duration::from_secs(1),
eviction_idle_ticks: 1024,
io_queue_capacity: 16,
}
}
}
impl CheckpointConfig {
#[must_use]
pub fn enabled() -> Self {
Self {
enabled: true,
..Self::default()
}
}
}
pub(super) struct Shared {
pub(super) bm: Arc<BufferManager>,
pub(super) journal: Option<Arc<Journal>>,
pub(super) commit_gate: Arc<CommitGate>,
pub(super) maintenance_gate: Arc<MaintenanceGate>,
pub(super) cfg: CheckpointConfig,
pub(super) io_tx: Sender<IoTask>,
pub(super) checkpoint_stop: AtomicBool,
pub(super) eviction_stop: AtomicBool,
pub(super) rounds_attempted: AtomicU64,
pub(super) rounds_succeeded: AtomicU64,
pub(super) blobs_flushed: AtomicU64,
pub(super) merges_total: AtomicU64,
pub(super) truncates: AtomicU64,
pub(super) evictions: AtomicU64,
pub(super) last_dirty_count: AtomicUsize,
}
pub(crate) struct Checkpointer {
shared: Arc<Shared>,
checkpoint_handle: Option<JoinHandle<()>>,
io_handle: Option<JoinHandle<()>>,
eviction_handle: Option<JoinHandle<()>>,
}
impl Checkpointer {
#[must_use]
pub(crate) fn spawn(
bm: Arc<BufferManager>,
journal: Option<Arc<Journal>>,
maintenance_gate: Arc<MaintenanceGate>,
commit_gate: Arc<CommitGate>,
cfg: CheckpointConfig,
) -> Option<Self> {
if !cfg.enabled {
return None;
}
let (io_tx, io_rx) = bounded::<IoTask>(cfg.io_queue_capacity.max(1));
let shared = Arc::new(Shared {
bm,
journal,
commit_gate,
maintenance_gate,
cfg,
io_tx,
checkpoint_stop: AtomicBool::new(false),
eviction_stop: AtomicBool::new(false),
rounds_attempted: AtomicU64::new(0),
rounds_succeeded: AtomicU64::new(0),
blobs_flushed: AtomicU64::new(0),
merges_total: AtomicU64::new(0),
truncates: AtomicU64::new(0),
evictions: AtomicU64::new(0),
last_dirty_count: AtomicUsize::new(0),
});
let io_handle = {
let s = Arc::clone(&shared);
thread::Builder::new()
.name("holt-ckpt-io".to_owned())
.spawn(move || io::run(&s, io_rx))
.expect("OS rejected thread spawn for holt-ckpt-io")
};
let checkpoint_handle = {
let s = Arc::clone(&shared);
thread::Builder::new()
.name("holt-ckpt-planner".to_owned())
.spawn(move || checkpoint_main(&s))
.expect("OS rejected thread spawn for holt-ckpt-planner")
};
let eviction_handle = {
let s = Arc::clone(&shared);
thread::Builder::new()
.name("holt-ckpt-eviction".to_owned())
.spawn(move || eviction::run(&s))
.expect("OS rejected thread spawn for holt-ckpt-eviction")
};
Some(Self {
shared,
checkpoint_handle: Some(checkpoint_handle),
io_handle: Some(io_handle),
eviction_handle: Some(eviction_handle),
})
}
#[cfg(test)]
pub(crate) fn wake(&self) {
if let Some(h) = &self.checkpoint_handle {
h.thread().unpark();
}
}
#[must_use]
pub(crate) fn rounds_attempted(&self) -> u64 {
self.shared.rounds_attempted.load(Ordering::Relaxed)
}
#[must_use]
pub(crate) fn rounds_succeeded(&self) -> u64 {
self.shared.rounds_succeeded.load(Ordering::Relaxed)
}
#[must_use]
pub(crate) fn blobs_flushed(&self) -> u64 {
self.shared.blobs_flushed.load(Ordering::Relaxed)
}
#[must_use]
pub(crate) fn truncates(&self) -> u64 {
self.shared.truncates.load(Ordering::Relaxed)
}
#[must_use]
pub(crate) fn merges_total(&self) -> u64 {
self.shared.merges_total.load(Ordering::Relaxed)
}
#[must_use]
pub(crate) fn evictions(&self) -> u64 {
self.shared.evictions.load(Ordering::Relaxed)
}
}
impl Drop for Checkpointer {
fn drop(&mut self) {
self.shared.checkpoint_stop.store(true, Ordering::SeqCst);
if let Some(h) = self.checkpoint_handle.take() {
h.thread().unpark();
let _ = h.join();
}
if let Err(e) = round::run_round(&self.shared) {
eprintln!("holt: final checkpoint round during shutdown failed: {e}");
}
let _ = self.shared.io_tx.send(IoTask::Stop);
if let Some(h) = self.io_handle.take() {
let _ = h.join();
}
self.shared.eviction_stop.store(true, Ordering::SeqCst);
if let Some(h) = self.eviction_handle.take() {
h.thread().unpark();
let _ = h.join();
}
}
}
fn checkpoint_main(shared: &Arc<Shared>) {
loop {
if shared.checkpoint_stop.load(Ordering::Acquire) {
break;
}
thread::park_timeout(shared.cfg.idle_interval);
if shared.checkpoint_stop.load(Ordering::Acquire) {
break;
}
if let Err(e) = round::run_round(shared) {
eprintln!("holt: checkpoint round failed: {e}");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::blob_store::{BlobStore, MemoryBlobStore};
use std::time::Instant;
fn make_bm() -> Arc<BufferManager> {
Arc::new(BufferManager::new(Arc::new(MemoryBlobStore::new()), 8))
}
fn maintenance_gate() -> Arc<MaintenanceGate> {
Arc::new(MaintenanceGate::new())
}
fn commit_gate() -> Arc<CommitGate> {
Arc::new(CommitGate::new())
}
fn no_merge_cfg() -> CheckpointConfig {
CheckpointConfig {
auto_merge: false,
..CheckpointConfig::enabled()
}
}
#[test]
fn disabled_config_spawns_nothing() {
let bm = make_bm();
let cfg = CheckpointConfig::default();
assert!(!cfg.enabled);
let ck = Checkpointer::spawn(bm, None, maintenance_gate(), commit_gate(), cfg);
assert!(ck.is_none());
}
#[test]
fn spawn_and_drop_is_leak_free() {
let bm = make_bm();
let ck = Checkpointer::spawn(bm, None, maintenance_gate(), commit_gate(), no_merge_cfg())
.expect("spawn");
thread::sleep(Duration::from_millis(50));
drop(ck);
}
#[test]
fn round_drains_dirty_set_via_io_queue() {
let bm = make_bm();
let mut scratch = crate::store::blob_store::AlignedBlobBuf::zeroed();
scratch.as_mut_slice()[100] = 0xAB;
bm.write_blob([0x42; 16], &scratch).unwrap();
let _pin = bm.pin([0x42; 16]).unwrap();
bm.mark_dirty([0x42; 16], 10);
assert_eq!(bm.dirty_count(), 1);
let ck = Checkpointer::spawn(
Arc::clone(&bm),
None,
maintenance_gate(),
commit_gate(),
no_merge_cfg(),
)
.expect("spawn");
let deadline = Instant::now() + Duration::from_secs(2);
loop {
if bm.dirty_count() == 0 && ck.blobs_flushed() >= 1 {
break;
}
assert!(
Instant::now() < deadline,
"checkpoint round didn't drain dirty in time"
);
thread::sleep(Duration::from_millis(10));
}
drop(ck);
}
#[test]
fn wake_short_circuits_idle_wait() {
let bm = make_bm();
let mut cfg = no_merge_cfg();
cfg.idle_interval = Duration::from_secs(10);
let ck = Checkpointer::spawn(
Arc::clone(&bm),
None,
maintenance_gate(),
commit_gate(),
cfg,
)
.expect("spawn");
let scratch = crate::store::blob_store::AlignedBlobBuf::zeroed();
bm.write_blob([0x01; 16], &scratch).unwrap();
let _pin = bm.pin([0x01; 16]).unwrap();
bm.mark_dirty([0x01; 16], 1);
ck.wake();
let deadline = Instant::now() + Duration::from_secs(2);
loop {
if ck.rounds_succeeded() >= 1 && bm.dirty_count() == 0 {
break;
}
assert!(
Instant::now() < deadline,
"checkpointer never drained dirty set after wake"
);
thread::sleep(Duration::from_millis(5));
}
}
#[test]
fn eviction_thread_drops_cold_entries() {
let bm = make_bm();
let scratch = crate::store::blob_store::AlignedBlobBuf::zeroed();
bm.write_blob([0xEE; 16], &scratch).unwrap();
let _ = bm.pin([0xEE; 16]).unwrap();
assert_eq!(bm.cached_count(), 1);
for _ in 0..5 {
let _ = bm.pin([0xFF; 16]); let _ = bm.cached_count();
}
let cfg = CheckpointConfig {
eviction_interval: Duration::from_millis(20),
eviction_idle_ticks: 1, ..no_merge_cfg()
};
let ck = Checkpointer::spawn(
Arc::clone(&bm),
None,
maintenance_gate(),
commit_gate(),
cfg,
)
.expect("spawn");
let deadline = Instant::now() + Duration::from_secs(2);
loop {
if ck.evictions() >= 1 {
break;
}
assert!(
Instant::now() < deadline,
"eviction thread didn't drop a cold entry in time"
);
thread::sleep(Duration::from_millis(20));
}
drop(ck);
}
}