use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use nodedb_array::sync::gc::collapse_below;
use nodedb_array::sync::hlc::Hlc;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
use super::ack_registry::ArrayAckRegistry;
use super::op_log::OriginOpLog;
use super::snapshot_store::OriginSnapshotStore;
use crate::control::shutdown::{ShutdownReceiver, ShutdownWatch};
pub const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60);
pub fn spawn(
op_log: Arc<OriginOpLog>,
snapshots: Arc<OriginSnapshotStore>,
ack_registry: Arc<ArrayAckRegistry>,
array_snapshot_hlcs: Arc<RwLock<HashMap<String, Hlc>>>,
shutdown: Arc<ShutdownWatch>,
interval: Duration,
) -> Option<JoinHandle<()>> {
let Ok(handle) = tokio::runtime::Handle::try_current() else {
debug!("array_gc_task: no tokio runtime; skipping spawn (test or non-async context)");
return None;
};
Some(handle.spawn(async move {
let mut shutdown_rx: ShutdownReceiver = shutdown.subscribe();
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {
run_gc(
&op_log,
&snapshots,
&ack_registry,
&array_snapshot_hlcs,
);
}
_ = shutdown_rx.wait_cancelled() => {
debug!("array_gc_task: shutdown received — exiting");
return;
}
}
}
}))
}
fn run_gc(
op_log: &OriginOpLog,
snapshots: &OriginSnapshotStore,
ack_registry: &ArrayAckRegistry,
array_snapshot_hlcs: &RwLock<HashMap<String, Hlc>>,
) {
let arrays = ack_registry.known_arrays();
if arrays.is_empty() {
debug!("array_gc_task: no arrays with acks — skipping");
return;
}
for array in &arrays {
let ack_vector = ack_registry.ack_vector(array);
if ack_vector.min_ack_hlc().is_none() {
debug!(array = %array, "array_gc_task: no min_ack for array — skipping");
continue;
}
let array_name = array.clone();
let snapshots_ref = snapshots;
let report = collapse_below(op_log, &ack_vector, snapshots_ref, |arr, frontier| {
if let Some(mut snap) = snapshots_ref.latest_for_array(arr) {
snap.snapshot_hlc = frontier;
Ok(Some(snap))
} else {
Ok(None)
}
});
match report {
Ok(r) => {
if r.ops_dropped > 0 || r.snapshots_written > 0 {
info!(
array = %array_name,
ops_dropped = r.ops_dropped,
snapshots_written = r.snapshots_written,
frontier = ?r.frontier,
"array_gc_task: GC run complete"
);
}
if let Some(frontier) = ack_vector.min_ack_hlc() {
snapshots_ref.delete_older_than(&array_name, frontier);
match array_snapshot_hlcs.write() {
Ok(mut map) => {
map.insert(array_name.clone(), frontier);
}
Err(e) => {
error!(
array = %array_name,
error = %e,
"array_gc_task: snapshot_hlc map poisoned"
);
}
}
}
}
Err(e) => {
warn!(
array = %array_name,
error = %e,
"array_gc_task: GC error — skipping this array"
);
}
}
}
}