use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use actix::{Actor, AsyncContext, Context};
use calimero_node_primitives::DagCompactionConfig;
use calimero_primitives::context::ContextId;
use dashmap::DashMap;
use tracing::{debug, info};
use crate::delta_store::DeltaStore;
#[derive(Clone)]
pub struct DagCompactor {
delta_stores: Arc<DashMap<ContextId, DeltaStore>>,
config: DagCompactionConfig,
sweep_in_progress: Arc<AtomicBool>,
}
impl DagCompactor {
pub fn new(
delta_stores: Arc<DashMap<ContextId, DeltaStore>>,
config: DagCompactionConfig,
) -> Self {
Self {
delta_stores,
config,
sweep_in_progress: Arc::new(AtomicBool::new(false)),
}
}
fn spawn_sweep(&self) {
if self
.sweep_in_progress
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
debug!("Skipping DAG compaction tick: previous sweep still running");
return;
}
let delta_stores = self.delta_stores.clone();
let config = self.config;
let in_progress = self.sweep_in_progress.clone();
actix::spawn(async move {
DagCompactor::compact_all(delta_stores, config).await;
in_progress.store(false, Ordering::Release);
});
}
async fn compact_all(
delta_stores: Arc<DashMap<ContextId, DeltaStore>>,
config: DagCompactionConfig,
) -> usize {
let stores: Vec<(ContextId, DeltaStore)> = delta_stores
.iter()
.map(|entry| (*entry.key(), entry.value().clone()))
.collect();
let contexts_scanned = stores.len();
let mut contexts_compacted = 0;
let mut total_pruned = 0;
for (context_id, store) in stores {
let pruned = store
.compact(config.min_deltas_before_compact, config.retain_recent_count)
.await;
if pruned > 0 {
debug!(%context_id, pruned, "Compacted context DAG history");
crate::node_metrics::observe_compaction_pruned(pruned);
contexts_compacted += 1;
total_pruned += pruned;
}
}
if total_pruned > 0 {
info!(
contexts_scanned,
contexts_compacted, total_pruned, "DAG compaction sweep completed"
);
}
total_pruned
}
}
impl Actor for DagCompactor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
info!(
interval_secs = self.config.check_interval.as_secs(),
min_deltas_before_compact = self.config.min_deltas_before_compact,
retain_recent_count = self.config.retain_recent_count,
"DAG compaction actor started"
);
self.spawn_sweep();
let interval = self.config.check_interval;
let _handle = ctx.run_interval(interval, |act, _ctx| act.spawn_sweep());
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
info!("DAG compaction actor stopped");
}
}