use std::time::Duration;
use actix::{Actor, AsyncContext, Context, Handler, Message};
use calimero_primitives::context::ContextId;
use calimero_storage::constants::TOMBSTONE_RETENTION_NANOS;
use calimero_storage::index::EntityIndex;
use calimero_store::key::{self, ContextState};
use calimero_store::layer::{ReadLayer, WriteLayer};
use calimero_store::Store;
use eyre::Result as EyreResult;
use tracing::{debug, error, info, warn};
#[derive(Copy, Clone, Debug, Message)]
#[rtype(result = "()")]
pub struct RunGC;
#[derive(Clone, Debug)]
pub struct GarbageCollector {
store: Store,
interval: Duration,
}
impl GarbageCollector {
pub fn new(store: Store, interval: Duration) -> Self {
Self { store, interval }
}
fn collect_all(&self) -> EyreResult<GCStats> {
let start = std::time::Instant::now();
let contexts = self.list_contexts()?;
let context_count = contexts.len();
debug!(count = context_count, "Found contexts for GC");
let mut total_collected = 0;
for context_id in contexts {
match self.collect_for_context(&context_id) {
Ok(count) => {
if count > 0 {
debug!(
context_id = %context_id,
tombstones = count,
"Collected tombstones for context"
);
}
total_collected += count;
}
Err(e) => {
warn!(
context_id = %context_id,
error = ?e,
"Failed to collect garbage for context"
);
}
}
}
let duration_ms = start.elapsed().as_millis() as u64;
Ok(GCStats {
tombstones_collected: total_collected,
contexts_scanned: context_count,
duration_ms,
})
}
fn list_contexts(&self) -> EyreResult<Vec<ContextId>> {
let mut contexts = Vec::new();
let mut iter = self.store.iter::<key::ContextMeta>()?;
while let Some(meta) = iter.next()? {
contexts.push(meta.context_id());
}
Ok(contexts)
}
fn collect_for_context(&self, context_id: &ContextId) -> EyreResult<usize> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_nanos() as u64;
let retention = TOMBSTONE_RETENTION_NANOS;
let mut iter = self.store.iter::<ContextState>()?;
let mut keys_to_delete = Vec::new();
while let Some(state_entry) = iter.next()? {
if state_entry.context_id() != *context_id {
continue;
}
if let Some(value) = self.store.get(&state_entry)? {
if let Ok(index) = borsh::from_slice::<EntityIndex>(value.as_ref()) {
if let Some(deleted_at) = index.deleted_at {
let age = now.saturating_sub(deleted_at);
if age > retention {
keys_to_delete.push(state_entry);
}
}
}
}
}
let collected = keys_to_delete.len();
let mut store = self.store.clone();
for key in keys_to_delete {
store.delete(&key)?;
}
Ok(collected)
}
}
impl Actor for GarbageCollector {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
info!(
interval_secs = self.interval.as_secs(),
"Garbage collection actor started"
);
let interval = self.interval;
let _handle = ctx.run_interval(interval, |_act, ctx| {
ctx.notify(RunGC);
});
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
info!("Garbage collection actor stopped");
}
}
impl Handler<RunGC> for GarbageCollector {
type Result = ();
fn handle(&mut self, _msg: RunGC, _ctx: &mut Self::Context) -> Self::Result {
debug!("Starting garbage collection cycle");
match self.collect_all() {
Ok(stats) => {
info!(
tombstones_collected = stats.tombstones_collected,
contexts_scanned = stats.contexts_scanned,
duration_ms = stats.duration_ms,
"Garbage collection completed"
);
}
Err(e) => {
error!(error = ?e, "Garbage collection failed");
}
}
}
}
#[derive(Debug)]
struct GCStats {
tombstones_collected: usize,
contexts_scanned: usize,
duration_ms: u64,
}