use std::sync::Arc;
use std::time::Duration;
use futures::stream::StreamExt;
use object_store::{ObjectStore, ObjectStoreExt};
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};
use crate::control::state::SharedState;
const TICK_INTERVAL: Duration = Duration::from_secs(30);
#[derive(Debug)]
pub struct L2CleanupWorker {
pub handle: JoinHandle<()>,
}
pub fn spawn_l2_cleanup(shared: Arc<SharedState>) -> L2CleanupWorker {
let handle = tokio::spawn(async move { run_loop(shared).await });
L2CleanupWorker { handle }
}
async fn run_loop(shared: Arc<SharedState>) {
if shared.cold_storage.is_none() {
info!("l2 cleanup worker: cold storage not configured — exiting");
return;
}
info!(
tick_secs = TICK_INTERVAL.as_secs(),
"l2 cleanup worker started"
);
tokio::time::sleep(TICK_INTERVAL).await;
loop {
drain_once(&shared).await;
tokio::time::sleep(TICK_INTERVAL).await;
}
}
pub async fn drain_once(shared: &SharedState) {
let Some(cold) = shared.cold_storage.as_ref() else {
return;
};
let Some(catalog) = shared.credentials.catalog() else {
return;
};
let queue = match catalog.load_l2_cleanup_queue() {
Ok(q) => q,
Err(e) => {
warn!(error = %e, "l2 cleanup: failed to load queue");
return;
}
};
if let Some(metrics) = shared.system_metrics.as_ref() {
let mut depths: std::collections::HashMap<u64, u64> = std::collections::HashMap::new();
for e in &queue {
*depths.entry(e.tenant_id).or_insert(0) += 1;
}
metrics.purge.set_l2_cleanup_queue_depth(depths);
}
if queue.is_empty() {
return;
}
let store = cold.object_store();
for entry in queue {
let prefix = format!("{}/{}/", entry.tenant_id, entry.name);
match delete_prefix(store.clone(), &prefix).await {
Ok(bytes_deleted) => {
if let Err(e) = catalog.remove_l2_cleanup(entry.tenant_id, &entry.name) {
warn!(
tenant = entry.tenant_id,
collection = %entry.name,
error = %e,
"l2 cleanup: removed L2 bytes but failed to reap queue entry"
);
continue;
}
if let Some(metrics) = shared.system_metrics.as_ref()
&& bytes_deleted > 0
{
metrics.purge.add_bytes_reclaimed(
entry.tenant_id,
"unknown",
"l2",
bytes_deleted,
);
}
debug!(
tenant = entry.tenant_id,
collection = %entry.name,
purge_lsn = entry.purge_lsn,
bytes_deleted,
"l2 cleanup: drained queue entry"
);
}
Err(e) => {
let msg = e.to_string();
if let Err(update_err) =
catalog.record_l2_cleanup_attempt(entry.tenant_id, &entry.name, &msg)
{
warn!(
tenant = entry.tenant_id,
collection = %entry.name,
error = %update_err,
"l2 cleanup: failed to record attempt"
);
}
warn!(
tenant = entry.tenant_id,
collection = %entry.name,
attempts = entry.attempts + 1,
error = %msg,
"l2 cleanup: delete failed; will retry next tick"
);
}
}
}
}
async fn delete_prefix(
store: Arc<dyn ObjectStore>,
prefix: &str,
) -> Result<u64, object_store::Error> {
let path = object_store::path::Path::from(prefix);
let mut list = store.list(Some(&path));
let mut total_bytes: u64 = 0;
let mut first_err: Option<object_store::Error> = None;
while let Some(meta) = list.next().await {
match meta {
Ok(m) => {
total_bytes += m.size;
if let Err(e) = store.delete(&m.location).await
&& first_err.is_none()
{
first_err = Some(e);
}
}
Err(e) => {
if first_err.is_none() {
first_err = Some(e);
}
}
}
}
match first_err {
Some(e) => Err(e),
None => Ok(total_bytes),
}
}
#[cfg(test)]
mod tests {
}