use crossbeam_channel::bounded;
use std::collections::HashMap;
use std::sync::Arc;
use crate::api::errors::{Error, Result};
use crate::engine;
use crate::layout::BlobGuid;
use crate::store::blob_store::BlobStore;
use crate::store::buffer_manager::WriteThroughEntry;
use super::io::IoTask;
use super::Shared;
#[allow(clippy::too_many_lines)]
pub(super) fn run_round(shared: &Arc<Shared>) -> Result<()> {
use std::sync::atomic::Ordering;
shared.rounds_attempted.fetch_add(1, Ordering::Relaxed);
let merged = if shared.cfg.auto_merge {
match run_merge_pass(shared) {
Ok(n) => n,
Err(e) => {
eprintln!("holt: checkpoint merge pass failed: {e}");
0
}
}
} else {
0
};
shared.merges_total.fetch_add(merged, Ordering::Relaxed);
#[cfg(feature = "tracing")]
let round_start = std::time::Instant::now();
let (snap, pending, snap_bytes) = if let Some(journal) = &shared.journal {
let _commit = shared.commit_gate.enter_checkpoint();
let snap = shared.bm.snapshot_dirty();
let pending = shared.bm.snapshot_pending_deletes();
if let Err(e) = journal.flush() {
shared.bm.restore_pending_deletes(pending);
shared.bm.restore_dirty(snap);
return Err(e);
}
let mut snap_bytes = Vec::with_capacity(snap.len());
for (guid, seq) in &snap {
let Some(bytes) = shared.bm.snapshot_bytes(*guid) else {
let mut failed = HashMap::new();
for (g, t) in &snap {
failed.entry(*g).or_insert(*t);
}
shared.bm.restore_pending_deletes(pending);
shared.bm.restore_dirty(failed);
return Err(Error::Internal(
"checkpoint: dirty entry lost cache image — invariant I1 violated",
));
};
snap_bytes.push((*guid, *seq, bytes));
}
(snap, pending, snap_bytes)
} else {
let snap = shared.bm.snapshot_dirty();
let pending = shared.bm.snapshot_pending_deletes();
let mut snap_bytes = Vec::with_capacity(snap.len());
for (guid, seq) in &snap {
let Some(bytes) = shared.bm.snapshot_bytes(*guid) else {
let mut failed = HashMap::new();
for (g, t) in &snap {
failed.entry(*g).or_insert(*t);
}
shared.bm.restore_pending_deletes(pending);
shared.bm.restore_dirty(failed);
return Err(Error::Internal(
"checkpoint: dirty entry lost cache image — invariant I1 violated",
));
};
snap_bytes.push((*guid, *seq, bytes));
}
(snap, pending, snap_bytes)
};
let snap_count = snap.len();
shared.last_dirty_count.store(snap_count, Ordering::Relaxed);
if snap.is_empty() && merged == 0 && pending.is_empty() && !shared.bm.needs_flush() {
if let Some(journal) = &shared.journal {
if journal.needs_checkpoint() {
let _commit = shared.commit_gate.enter_checkpoint();
if shared.bm.dirty_count() == 0 && shared.bm.pending_delete_count() == 0 {
journal.truncate()?;
shared.truncates.fetch_add(1, Ordering::Relaxed);
}
}
}
shared.rounds_succeeded.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "tracing")]
tracing::trace!(target: "holt::checkpoint", "round skipped — nothing dirty");
return Ok(());
}
let mut failed: HashMap<BlobGuid, u64> = HashMap::new();
let mut pre_delete_sync_result: Option<Result<()>> = None;
if !snap_bytes.is_empty() {
let mut entries = Vec::with_capacity(snap_bytes.len());
let mut expected = Vec::with_capacity(snap_bytes.len());
for (guid, seq, bytes) in snap_bytes {
expected.push((guid, seq));
entries.push(WriteThroughEntry {
guid,
bytes,
expected_seq: seq,
});
}
let (tx, rx) = bounded(1);
let task = IoTask::FlushBatchAndSync {
entries,
on_done: tx,
};
if shared.io_tx.send(task).is_err() {
for (g, t) in &snap {
failed.entry(*g).or_insert(*t);
}
shared.bm.restore_pending_deletes(pending);
shared.bm.restore_dirty(failed);
return Err(Error::Internal(
"checkpoint: I/O worker channel closed mid-round",
));
}
match rx.recv() {
Ok(report) => {
match report.write_result {
Ok(()) => {
shared
.blobs_flushed
.fetch_add(expected.len() as u64, Ordering::Relaxed);
}
Err(e) => {
eprintln!(
"holt: checkpoint flush batch failed ({} blobs): {e}",
expected.len()
);
for (guid, seq) in expected {
failed.insert(guid, seq);
}
}
}
pre_delete_sync_result = Some(report.sync_result);
}
Err(_) => {
for (guid, seq) in expected {
failed.insert(guid, seq);
}
}
}
}
let had_dirty_failure = !failed.is_empty();
if had_dirty_failure {
shared.bm.restore_dirty(failed.clone());
}
if let Some(sync_result) = pre_delete_sync_result {
if let Err(e) = sync_result {
eprintln!("holt: checkpoint store Sync failed: {e}");
shared.bm.restore_pending_deletes(pending);
return Err(e);
}
} else {
let (sync_tx, sync_rx) = bounded(1);
if shared
.io_tx
.send(IoTask::Sync { on_done: sync_tx })
.is_err()
{
shared.bm.restore_pending_deletes(pending);
return Err(Error::Internal(
"checkpoint: I/O worker channel closed before Sync",
));
}
match sync_rx.recv() {
Ok(Ok(())) => {}
Ok(Err(e)) => {
eprintln!("holt: checkpoint store Sync failed: {e}");
shared.bm.restore_pending_deletes(pending);
return Err(e);
}
Err(_) => {
shared.bm.restore_pending_deletes(pending);
return Err(Error::Internal(
"checkpoint: I/O worker dropped Sync completion",
));
}
}
}
if had_dirty_failure {
shared.bm.restore_pending_deletes(pending);
return Err(Error::Internal(
"checkpoint: dirty write failed — pending deletes deferred to next round",
));
}
let pending_count = pending.len();
let mut pending_failed: HashMap<BlobGuid, u64> = HashMap::new();
for (guid, seq) in &pending {
if let Err(e) = shared.bm.execute_pending_delete(*guid) {
eprintln!(
"holt: checkpoint deferred delete failed for blob {:02x?} (seq={seq}): {e}",
&guid[..4]
);
pending_failed.insert(*guid, *seq);
}
}
if !pending_failed.is_empty() {
shared.bm.restore_pending_deletes(pending_failed.clone());
}
let applied_deletes = pending_count - pending_failed.len();
let restore_applied = || -> HashMap<BlobGuid, u64> {
pending
.iter()
.filter(|(g, _)| !pending_failed.contains_key(*g))
.map(|(g, s)| (*g, *s))
.collect()
};
if applied_deletes > 0 {
let (sync_tx2, sync_rx2) = bounded(1);
if shared
.io_tx
.send(IoTask::Sync { on_done: sync_tx2 })
.is_err()
{
shared.bm.restore_pending_deletes(restore_applied());
return Err(Error::Internal(
"checkpoint: I/O worker channel closed before Sync (deletes)",
));
}
match sync_rx2.recv() {
Ok(Ok(())) => {}
Ok(Err(e)) => {
eprintln!("holt: checkpoint store Sync (deletes) failed: {e}");
shared.bm.restore_pending_deletes(restore_applied());
return Err(e);
}
Err(_) => {
shared.bm.restore_pending_deletes(restore_applied());
return Err(Error::Internal(
"checkpoint: I/O worker dropped Sync (deletes) completion",
));
}
}
}
if failed.is_empty() && pending_failed.is_empty() {
if let Some(journal) = &shared.journal {
let _commit = shared.commit_gate.enter_checkpoint();
if shared.bm.dirty_count() == 0 && shared.bm.pending_delete_count() == 0 {
journal.truncate()?;
shared.truncates.fetch_add(1, Ordering::Relaxed);
}
}
}
shared.rounds_succeeded.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "tracing")]
{
let elapsed = round_start.elapsed();
let truncated = failed.is_empty()
&& pending_failed.is_empty()
&& shared.journal.is_some()
&& shared.bm.dirty_count() == 0
&& shared.bm.pending_delete_count() == 0;
tracing::info!(
target: "holt::checkpoint",
dirty_snapshot = snap_count,
blobs_flushed = snap_count - failed.len(),
blobs_failed = failed.len(),
blobs_deleted = applied_deletes,
merged = merged,
truncated_wal = truncated,
elapsed_us = elapsed.as_micros() as u64,
"round complete",
);
}
Ok(())
}
fn run_merge_pass(shared: &Arc<Shared>) -> Result<u64> {
use crate::store::buffer_manager::STRUCTURAL_SEQ;
let parents = shared.bm.pop_merge_candidates(256);
let mut merged_total = 0u64;
for guid in parents {
let _maintenance = shared.maintenance_gate.enter_exclusive();
if !shared.bm.has_blob(guid)? {
continue;
}
let _commit = shared
.journal
.as_ref()
.map(|_| shared.commit_gate.enter_writer());
let pin = shared.bm.pin(guid)?;
let (stats, has_children) = {
let mut guard = pin.write();
let mut frame = guard.frame();
let stats = engine::try_merge_children(shared.bm.as_ref(), &mut frame, STRUCTURAL_SEQ)?;
(stats, frame.header().num_ext_blobs != 0)
};
if stats.merged > 0 {
shared.bm.mark_dirty(guid, STRUCTURAL_SEQ);
merged_total += u64::from(stats.merged);
if has_children {
shared.bm.note_merge_candidate(guid);
}
}
drop(pin);
}
shared.bm.note_merges(merged_total);
Ok(merged_total)
}