use nodedb_types::DatabaseId;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};
use crate::control::catalog_entry::CatalogEntry;
use crate::control::state::SharedState;
use super::policy::{PurgeDecision, resolve_retention};
#[derive(Debug)]
pub struct CollectionGcSweeper {
pub handle: JoinHandle<()>,
}
pub fn spawn_collection_gc(shared: Arc<SharedState>) -> CollectionGcSweeper {
let handle = tokio::spawn(async move { run_loop(shared).await });
CollectionGcSweeper { handle }
}
fn load_settings(shared: &SharedState) -> (Duration, Duration, u32) {
let guard = shared
.retention_settings
.read()
.unwrap_or_else(|p| p.into_inner());
(
guard.sweep_interval(),
guard.retention_window(),
guard.deactivated_collection_retention_days,
)
}
async fn run_loop(shared: Arc<SharedState>) {
let (initial_interval, _, initial_days) = load_settings(&shared);
info!(
sweep_interval_secs = initial_interval.as_secs(),
retention_days = initial_days,
"collection-gc sweeper started"
);
tokio::time::sleep(initial_interval).await;
loop {
let (interval, retention, _) = load_settings(&shared);
if let Err(e) = sweep_once(&shared, retention) {
warn!(error = %e, "collection-gc sweep failed; will retry next tick");
}
tokio::time::sleep(interval).await;
}
}
fn effective_retention_for_tenant(
shared: &SharedState,
tenant_id: u64,
fallback: Duration,
) -> Duration {
let tenants = match shared.tenants.lock() {
Ok(t) => t,
Err(p) => p.into_inner(),
};
let quota = tenants.quota(crate::types::TenantId::new(tenant_id));
match quota.deactivated_collection_retention_days {
Some(days) => Duration::from_secs(u64::from(days) * 24 * 60 * 60),
None => fallback,
}
}
pub fn sweep_once(shared: &SharedState, retention: Duration) -> crate::Result<()> {
let Some(catalog) = shared.credentials.catalog() else {
return Ok(());
};
let now_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let dropped = catalog.load_dropped_collections(DatabaseId::DEFAULT)?;
if let Some(metrics) = shared.system_metrics.as_ref() {
let mut pending: HashMap<u64, u64> = HashMap::new();
for coll in &dropped {
*pending.entry(coll.tenant_id).or_insert(0) += 1;
}
metrics.purge.set_pending_by_tenant(pending);
}
if dropped.is_empty() {
return Ok(());
}
let mut proposed = 0usize;
let mut waiting = 0usize;
for coll in &dropped {
let effective = effective_retention_for_tenant(shared, coll.tenant_id, retention);
match resolve_retention(coll, now_ns, effective) {
PurgeDecision::Purge => {
let entry = CatalogEntry::PurgeCollection {
tenant_id: coll.tenant_id,
name: coll.name.clone(),
};
match crate::control::metadata_proposer::propose_catalog_entry(shared, &entry) {
Ok(_) => {
proposed += 1;
debug!(
tenant = coll.tenant_id,
collection = %coll.name,
"collection-gc: proposed PurgeCollection"
);
}
Err(e) => {
debug!(
tenant = coll.tenant_id,
collection = %coll.name,
error = %e,
"collection-gc: propose failed (expected on follower)"
);
}
}
}
PurgeDecision::Wait { .. } => waiting += 1,
PurgeDecision::NotDeactivated => {}
}
}
if proposed > 0 || waiting > 0 {
info!(
proposed,
waiting,
total = dropped.len(),
"collection-gc sweep complete"
);
}
Ok(())
}
#[cfg(test)]
mod tests {
}