use crossbeam_channel::bounded;
use std::collections::HashMap;
use std::sync::Arc;
use crate::api::errors::{Error, Result};
use crate::engine;
use crate::layout::BlobGuid;
use crate::store::backend::Backend;
use crate::store::BlobFrame;
use super::io::IoTask;
use super::Shared;
#[allow(clippy::too_many_lines)]
pub(super) fn run_round(shared: &Arc<Shared>) -> Result<()> {
use std::sync::atomic::Ordering;
shared.rounds_attempted.fetch_add(1, Ordering::Relaxed);
let merged = if shared.cfg.auto_merge {
match run_merge_pass(shared) {
Ok(n) => n,
Err(e) => {
eprintln!("holt: checkpoint merge pass failed: {e}");
0
}
}
} else {
0
};
shared.merges_total.fetch_add(merged, Ordering::Relaxed);
#[cfg(feature = "tracing")]
let round_start = std::time::Instant::now();
let (snap, pending) = if let Some(wal) = &shared.wal {
let mut w = wal.lock().unwrap();
let snap = shared.bm.snapshot_dirty();
let pending = shared.bm.snapshot_pending_deletes();
if let Err(e) = w.flush() {
shared.bm.restore_dirty(snap);
shared.bm.restore_pending_deletes(pending);
return Err(e);
}
(snap, pending)
} else {
(
shared.bm.snapshot_dirty(),
shared.bm.snapshot_pending_deletes(),
)
};
let snap_count = snap.len();
shared.last_dirty_count.store(snap_count, Ordering::Relaxed);
if snap.is_empty() && merged == 0 && pending.is_empty() {
shared.rounds_succeeded.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "tracing")]
tracing::trace!(target: "holt::checkpoint", "round skipped — nothing dirty");
return Ok(());
}
let mut completions: Vec<(BlobGuid, u64, crossbeam_channel::Receiver<Result<()>>)> =
Vec::with_capacity(snap.len());
let mut failed: HashMap<BlobGuid, u64> = HashMap::new();
for (guid, txn_id) in &snap {
let Some(bytes) = shared.bm.snapshot_bytes(*guid) else {
continue;
};
let (tx, rx) = bounded(1);
let task = IoTask::Flush {
guid: *guid,
bytes,
expected_seq: *txn_id,
on_done: tx,
};
if shared.io_tx.send(task).is_err() {
for (g, t) in &snap {
failed.entry(*g).or_insert(*t);
}
shared.bm.restore_dirty(failed);
shared.bm.restore_pending_deletes(pending);
return Err(Error::Internal("checkpoint: I/O worker channel closed mid-round",
));
}
completions.push((*guid, *txn_id, rx));
}
for (guid, txn_id, rx) in completions {
match rx.recv() {
Ok(Ok(())) => {
shared.blobs_flushed.fetch_add(1, Ordering::Relaxed);
}
Ok(Err(e)) => {
eprintln!(
"holt: checkpoint flush failed for blob {:02x?} (min_txn={txn_id}): {e}",
&guid[..4]
);
failed.insert(guid, txn_id);
}
Err(_) => {
failed.insert(guid, txn_id);
}
}
}
let had_dirty_failure = !failed.is_empty();
if had_dirty_failure {
shared.bm.restore_dirty(failed.clone());
}
let (sync_tx, sync_rx) = bounded(1);
if shared
.io_tx
.send(IoTask::Sync { on_done: sync_tx })
.is_err()
{
shared.bm.restore_pending_deletes(pending);
return Err(Error::Internal("checkpoint: I/O worker channel closed before Sync",
));
}
match sync_rx.recv() {
Ok(Ok(())) => {}
Ok(Err(e)) => {
eprintln!("holt: checkpoint backend Sync failed: {e}");
shared.bm.restore_pending_deletes(pending);
return Err(e);
}
Err(_) => {
shared.bm.restore_pending_deletes(pending);
return Err(Error::Internal("checkpoint: I/O worker dropped Sync completion",
));
}
}
if had_dirty_failure {
shared.bm.restore_pending_deletes(pending);
return Err(Error::Internal("checkpoint: dirty write failed — pending deletes deferred to next round"));
}
let pending_count = pending.len();
let mut pending_failed: HashMap<BlobGuid, u64> = HashMap::new();
for (guid, seq) in &pending {
if let Err(e) = shared.bm.execute_pending_delete(*guid) {
eprintln!(
"holt: checkpoint deferred delete failed for blob {:02x?} (seq={seq}): {e}",
&guid[..4]
);
pending_failed.insert(*guid, *seq);
}
}
if !pending_failed.is_empty() {
shared.bm.restore_pending_deletes(pending_failed.clone());
}
let applied_deletes = pending_count - pending_failed.len();
let restore_applied = || -> HashMap<BlobGuid, u64> {
pending
.iter()
.filter(|(g, _)| !pending_failed.contains_key(*g))
.map(|(g, s)| (*g, *s))
.collect()
};
if applied_deletes > 0 {
let (sync_tx2, sync_rx2) = bounded(1);
if shared
.io_tx
.send(IoTask::Sync { on_done: sync_tx2 })
.is_err()
{
shared.bm.restore_pending_deletes(restore_applied());
return Err(Error::Internal("checkpoint: I/O worker channel closed before Sync (deletes)",
));
}
match sync_rx2.recv() {
Ok(Ok(())) => {}
Ok(Err(e)) => {
eprintln!("holt: checkpoint backend Sync (deletes) failed: {e}");
shared.bm.restore_pending_deletes(restore_applied());
return Err(e);
}
Err(_) => {
shared.bm.restore_pending_deletes(restore_applied());
return Err(Error::Internal("checkpoint: I/O worker dropped Sync (deletes) completion",
));
}
}
}
if failed.is_empty() && pending_failed.is_empty() {
if let Some(wal) = &shared.wal {
let mut w = wal.lock().unwrap();
if shared.bm.dirty_count() == 0 && shared.bm.pending_delete_count() == 0 {
w.truncate()?;
shared.truncates.fetch_add(1, Ordering::Relaxed);
}
}
}
shared.rounds_succeeded.fetch_add(1, Ordering::Relaxed);
#[cfg(feature = "tracing")]
{
let elapsed = round_start.elapsed();
let truncated = failed.is_empty()
&& pending_failed.is_empty()
&& shared.wal.is_some()
&& shared.bm.dirty_count() == 0
&& shared.bm.pending_delete_count() == 0;
tracing::info!(
target: "holt::checkpoint",
dirty_snapshot = snap_count,
blobs_flushed = snap_count - failed.len(),
blobs_failed = failed.len(),
blobs_deleted = applied_deletes,
merged = merged,
truncated_wal = truncated,
elapsed_us = elapsed.as_micros() as u64,
"round complete",
);
}
Ok(())
}
fn run_merge_pass(shared: &Arc<Shared>) -> Result<u64> {
use crate::store::buffer_manager::STRUCTURAL_SEQ;
let parents = engine::collect_blob_guids(shared.bm.as_ref(), shared.root_guid)?;
let mut merged_total = 0u64;
for guid in parents {
if !shared.bm.has_blob(guid)? {
continue;
}
let pin = shared.bm.pin(guid)?;
let stats = {
let mut guard = pin.write();
let mut frame = BlobFrame::wrap(guard.as_mut_slice());
engine::try_merge_children(shared.bm.as_ref(), &mut frame, STRUCTURAL_SEQ)?
};
drop(pin);
if stats.merged > 0 {
shared.bm.mark_dirty(guid, STRUCTURAL_SEQ);
merged_total += u64::from(stats.merged);
}
}
Ok(merged_total)
}