use std::sync::Arc;
use async_trait::async_trait;
use tokio::io::AsyncRead;
use tracing::instrument;
use crate::blob_store::BlobStore;
use crate::cleanup::visitor::CleanupVisitor;
use crate::error::Result;
use crate::list_filter::{ListFilter, PrefixFilter};
use crate::types::{BlobInput, BlobMeta, CleanupResult, PutResult};
pub(crate) mod visitor;
pub type CleanupPredicate = Box<dyn Fn(&str, &BlobMeta) -> bool + Send + Sync>;
pub struct BlobCleanup {
inner: Arc<dyn BlobStore>,
predicate: CleanupPredicate,
batch_size: usize,
}
impl BlobCleanup {
const DEFAULT_BATCH_SIZE: usize = 1000;
pub fn new(inner: Arc<dyn BlobStore>, predicate: CleanupPredicate) -> Self {
Self {
inner,
predicate,
batch_size: Self::DEFAULT_BATCH_SIZE,
}
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
#[instrument(skip(self))]
pub async fn cleanup(&self) -> Result<CleanupResult> {
let mut visitor = CleanupVisitor {
store: &*self.inner,
predicate: &self.predicate,
batch: Vec::with_capacity(self.batch_size),
batch_size: self.batch_size,
deleted_count: 0u64,
};
tracing::debug!(batch_size = %self.batch_size, "Starting cleanup");
self.inner
.visit(&PrefixFilter::new(""), &mut visitor)
.await?;
visitor.flush().await?;
tracing::debug!(deleted_count = %visitor.deleted_count, "Cleanup completed");
Ok(CleanupResult {
deleted_count: visitor.deleted_count,
})
}
}
#[async_trait]
impl BlobStore for BlobCleanup {
#[instrument(skip(self, blobs))]
async fn put(&self, blobs: Vec<BlobInput>) -> Result<PutResult> {
tracing::debug!(count = %blobs.len(), "Put via cleanup layer");
self.inner.put(blobs).await
}
#[instrument(skip(self))]
async fn get(&self, key: &str) -> Result<Box<dyn AsyncRead + Send + Unpin>> {
tracing::debug!(key, "Get via cleanup layer");
self.inner.get(key).await
}
#[instrument(skip(self))]
async fn delete(&self, keys: &[&str]) -> Result<()> {
tracing::debug!(count = %keys.len(), "Delete via cleanup layer");
self.inner.delete(keys).await
}
#[instrument(skip(self, filter))]
async fn list(&self, filter: &dyn ListFilter) -> Result<Vec<String>> {
self.inner.list(filter).await
}
#[instrument(skip(self))]
async fn exists(&self, key: &str) -> Result<bool> {
self.inner.exists(key).await
}
#[instrument(skip(self))]
async fn get_with_metadata(
&self,
key: &str,
) -> Result<(BlobMeta, Box<dyn AsyncRead + Send + Unpin>)> {
self.inner.get_with_metadata(key).await
}
#[instrument(skip(self, filter))]
async fn list_with_metadata(&self, filter: &dyn ListFilter) -> Result<Vec<BlobMeta>> {
self.inner.list_with_metadata(filter).await
}
}