use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use crabka_metadata::NodeId;
use crate::partition::Partition;
use crate::partition_registry::PartitionRegistry;
#[derive(Debug, Clone)]
pub(crate) struct CleanerConfig {
pub interval: Duration,
}
impl Default for CleanerConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(30),
}
}
}
pub(crate) async fn run(
partitions: Arc<PartitionRegistry>,
node_id: NodeId,
cfg: CleanerConfig,
shutdown: CancellationToken,
) {
let mut ticker = tokio::time::interval(cfg.interval);
loop {
tokio::select! {
_ = ticker.tick() => {}
() = shutdown.cancelled() => {
debug!("cleaner task shutting down");
return;
}
}
tick_all(&partitions, node_id).await;
}
}
pub(crate) async fn tick_all(partitions: &PartitionRegistry, node_id: NodeId) {
let snapshot: Vec<Arc<Partition>> = partitions.arcs();
for partition in snapshot {
let leader = partition.current_leader.load(Ordering::Relaxed);
if leader != node_id {
continue;
}
let policy = {
let log = partition
.log
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
log.config_snapshot().cleanup_policy
};
if policy != crabka_log::CleanupPolicy::Compact {
continue;
}
if let Err(e) = partition.compact_log().await {
warn!(
topic = %partition.topic,
partition_id = partition.partition_id,
error = %e,
"compaction failed for partition",
);
}
}
}