use crossbeam_channel::{bounded, Receiver, TryRecvError};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use crate::api::errors::{Error, Result};
use crate::engine;
use crate::layout::BlobGuid;
use crate::store::blob_store::BlobStore;
use crate::store::WriteThroughEntry;
use super::io::{CheckpointEpoch, CheckpointEpochReport, IoTask};
use super::Shared;
pub(super) struct Pipeline {
in_flight: VecDeque<PendingEpoch>,
max_in_flight: usize,
}
struct PendingEpoch {
rx: Receiver<CheckpointEpochReport>,
snap: HashMap<BlobGuid, u64>,
pending: HashMap<BlobGuid, u64>,
}
impl Pipeline {
pub(super) fn new(max_in_flight: usize) -> Self {
Self {
in_flight: VecDeque::new(),
max_in_flight: max_in_flight.max(1),
}
}
pub(super) fn has_room(&self) -> bool {
self.in_flight.len() < self.max_in_flight
}
pub(super) fn is_empty(&self) -> bool {
self.in_flight.is_empty()
}
pub(super) fn reap_ready(&mut self, shared: &Arc<Shared>) -> Result<()> {
while let Some(front) = self.in_flight.front() {
match front.rx.try_recv() {
Ok(report) => {
self.in_flight.pop_front().expect("front exists");
finish_epoch(shared, report)?;
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
let pending = self.in_flight.pop_front().expect("front exists");
restore_unreported_epoch(shared, pending);
return Err(Error::Internal(
"checkpoint: I/O worker dropped epoch completion",
));
}
}
}
self.maybe_truncate(shared)
}
fn wait_for_room(&mut self, shared: &Arc<Shared>) -> Result<()> {
if self.has_room() {
return Ok(());
}
self.wait_one(shared)
}
pub(super) fn drain(&mut self, shared: &Arc<Shared>) -> Result<()> {
let mut first_err = None;
while !self.in_flight.is_empty() {
if let Err(e) = self.wait_one(shared) {
first_err.get_or_insert(e);
}
}
if let Some(e) = first_err {
return Err(e);
}
self.maybe_truncate(shared)
}
fn wait_one(&mut self, shared: &Arc<Shared>) -> Result<()> {
let Some(pending) = self.in_flight.pop_front() else {
return Ok(());
};
if let Ok(report) = pending.rx.recv() {
finish_epoch(shared, report)
} else {
restore_unreported_epoch(shared, pending);
Err(Error::Internal(
"checkpoint: I/O worker dropped epoch completion",
))
}
}
fn push(&mut self, pending: PendingEpoch) {
debug_assert!(self.has_room());
self.in_flight.push_back(pending);
}
fn maybe_truncate(&self, shared: &Arc<Shared>) -> Result<()> {
if !self.in_flight.is_empty() {
return Ok(());
}
let Some(journal) = &shared.journal else {
return Ok(());
};
if !journal.needs_checkpoint() {
return Ok(());
}
let _commit = shared.commit_gate.enter_checkpoint();
if shared.bm.dirty_count() == 0 && shared.bm.pending_delete_count() == 0 {
journal.truncate()?;
use std::sync::atomic::Ordering;
shared.truncates.fetch_add(1, Ordering::Relaxed);
}
Ok(())
}
}
pub(super) fn run_round_sync(shared: &Arc<Shared>) -> Result<()> {
let mut pipeline = Pipeline::new(1);
run_round(shared, &mut pipeline)?;
pipeline.drain(shared)
}
#[allow(clippy::too_many_lines)]
pub(super) fn run_round(shared: &Arc<Shared>, pipeline: &mut Pipeline) -> Result<()> {
use std::sync::atomic::Ordering;
pipeline.reap_ready(shared)?;
pipeline.wait_for_room(shared)?;
shared.rounds_attempted.fetch_add(1, Ordering::Relaxed);
let merged = if shared.cfg.auto_merge {
match run_merge_pass(shared) {
Ok(n) => n,
Err(e) => {
eprintln!("holt: checkpoint merge pass failed: {e}");
0
}
}
} else {
0
};
shared.merges_total.fetch_add(merged, Ordering::Relaxed);
#[cfg(feature = "tracing")]
let round_start = std::time::Instant::now();
let (snap, pending, snap_bytes, wal_up_to) = if let Some(journal) = &shared.journal {
let _commit = shared.commit_gate.enter_checkpoint();
let snap = shared.bm.snapshot_dirty();
let pending = shared.bm.snapshot_pending_deletes();
let wal_up_to = journal.wal_work();
let mut snap_bytes = Vec::with_capacity(snap.len());
for (guid, seq) in &snap {
let Some(bytes) = shared.bm.snapshot_bytes(*guid) else {
shared.bm.restore_pending_deletes(pending);
shared.bm.restore_dirty(snap.clone());
return Err(Error::Internal(
"checkpoint: dirty entry lost cache image — invariant I1 violated",
));
};
snap_bytes.push((*guid, *seq, bytes));
}
(snap, pending, snap_bytes, Some(wal_up_to))
} else {
let snap = shared.bm.snapshot_dirty();
let pending = shared.bm.snapshot_pending_deletes();
let mut snap_bytes = Vec::with_capacity(snap.len());
for (guid, seq) in &snap {
let Some(bytes) = shared.bm.snapshot_bytes(*guid) else {
shared.bm.restore_pending_deletes(pending);
shared.bm.restore_dirty(snap.clone());
return Err(Error::Internal(
"checkpoint: dirty entry lost cache image — invariant I1 violated",
));
};
snap_bytes.push((*guid, *seq, bytes));
}
(snap, pending, snap_bytes, None)
};
if let (Some(journal), Some(up_to)) = (&shared.journal, wal_up_to) {
if let Err(e) = journal.flush_up_to(up_to) {
shared.bm.restore_pending_deletes(pending);
shared.bm.restore_dirty(snap);
return Err(e);
}
}
let snap_count = snap.len();
shared.last_dirty_count.store(snap_count, Ordering::Relaxed);
let needs_store_flush = pipeline.in_flight.is_empty() && shared.bm.needs_flush();
if snap.is_empty() && merged == 0 && pending.is_empty() && !needs_store_flush {
pipeline.maybe_truncate(shared)?;
shared.rounds_succeeded.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "tracing")]
tracing::trace!(target: "holt::checkpoint", "round skipped — nothing dirty");
return Ok(());
}
let entries: Vec<_> = snap_bytes
.into_iter()
.map(|(guid, seq, bytes)| WriteThroughEntry {
guid,
bytes,
expected_seq: seq,
})
.collect();
let pending_for_recovery = pending.clone();
let (tx, rx) = bounded(1);
let epoch = CheckpointEpoch { entries, pending };
if shared
.io_tx
.send(IoTask::CommitEpoch { epoch, on_done: tx })
.is_err()
{
shared.bm.restore_pending_deletes(pending_for_recovery);
shared.bm.restore_dirty(snap);
return Err(Error::Internal(
"checkpoint: I/O worker channel closed mid-round",
));
}
pipeline.push(PendingEpoch {
rx,
snap,
pending: pending_for_recovery,
});
#[cfg(feature = "tracing")]
{
let elapsed = round_start.elapsed();
tracing::info!(
target: "holt::checkpoint",
dirty_snapshot = snap_count,
merged = merged,
in_flight = pipeline.in_flight.len(),
elapsed_us = elapsed.as_micros() as u64,
"round submitted",
);
}
Ok(())
}
fn run_merge_pass(shared: &Arc<Shared>) -> Result<u64> {
use crate::store::STRUCTURAL_SEQ;
let parents = shared.bm.pop_merge_candidates(256);
let mut merged_total = 0u64;
for guid in parents {
let _maintenance = shared.maintenance_gate.enter_exclusive();
if !shared.bm.has_blob(guid)? {
continue;
}
let _commit = shared
.journal
.as_ref()
.map(|_| shared.commit_gate.enter_writer());
let pin = shared.bm.pin(guid)?;
let (stats, has_children) = {
let mut guard = pin.write();
let mut frame = guard.frame();
let stats = engine::try_merge_children(shared.bm.as_ref(), &mut frame, STRUCTURAL_SEQ)?;
(stats, frame.header().num_ext_blobs != 0)
};
if stats.merged > 0 {
shared.bm.mark_dirty(guid, STRUCTURAL_SEQ);
merged_total += u64::from(stats.merged);
if has_children {
shared.bm.note_merge_candidate(guid);
}
}
drop(pin);
}
shared.bm.note_merges(merged_total);
Ok(merged_total)
}
fn finish_epoch(shared: &Arc<Shared>, report: CheckpointEpochReport) -> Result<()> {
use std::sync::atomic::Ordering;
shared
.blobs_flushed
.fetch_add(report.dirty_flushed as u64, Ordering::Relaxed);
let dirty_total = report.dirty_total;
let dirty_flushed = report.dirty_flushed;
let pending_total = report.pending_total;
let applied_deletes = report.applied_deletes;
if let Err(e) = report.result {
eprintln!(
"holt: checkpoint epoch failed (dirty={dirty_flushed}/{dirty_total}, pending deleted={applied_deletes}/{pending_total}): {e}",
);
return Err(e);
}
shared.rounds_succeeded.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn restore_unreported_epoch(shared: &Arc<Shared>, pending: PendingEpoch) {
shared.bm.restore_pending_deletes(pending.pending);
shared.bm.restore_dirty(pending.snap);
}