Skip to main content

xtax_blob_storage/
cleanup.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::io::AsyncRead;
5use tracing::instrument;
6
7use crate::blob_store::BlobStore;
8use crate::cleanup::visitor::CleanupVisitor;
9use crate::error::Result;
10use crate::list_filter::{ListFilter, PrefixFilter};
11use crate::types::{BlobInput, BlobMeta, CleanupResult, PutResult};
12
13pub(crate) mod visitor;
14
15/// Predicate for deciding which blobs to delete during cleanup.
16///
17/// Return `true` to delete the blob.
18pub type CleanupPredicate = Box<dyn Fn(&str, &BlobMeta) -> bool + Send + Sync>;
19
20/// Layer that provides cleanup functionality over a [`BlobStore`].
21///
22/// `BlobCleanup` is itself a [`BlobStore`] — it delegates all operations
23/// to the inner store transparently, but also exposes a `cleanup()` method
24/// that calls a predicate to decide which blobs to delete.
25///
26/// For full documentation see the
27/// [Cleanup guide](https://github.com/cz-jcode/xtax/blob/main/crates/xtax-blob-storage/docs/cleanup.md).
28///
29/// # Example
30///
31/// ```rust,no_run
32/// use std::sync::Arc;
33/// use xtax_blob_storage::{BlobInput, BlobStore, BlobStoreBuilder, BlobCleanup, CleanupPredicate};
34///
35/// # #[cfg(feature = "fs")]
36/// # #[tokio::main]
37/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
38/// # #[cfg(feature = "fs")]
39/// # {
40/// let inner = BlobStoreBuilder::new()
41///     .with_fs("/tmp/data")
42///     .build()
43///     .await?;
44///
45/// // Delete blobs whose key starts with "tmp-"
46/// let predicate: xtax_blob_storage::CleanupPredicate =
47///     Box::new(|key, _meta| key.starts_with("tmp-"));
48///
49/// let store = BlobCleanup::new(inner, predicate);
50///
51/// // Regular blob operations work transparently:
52/// store.put(vec![BlobInput::new("hello.txt", b"data".as_slice())]).await?;
53///
54/// // Cleanup deletes blobs matching the predicate:
55/// let result = store.cleanup().await?;
56/// println!("deleted {} blobs", result.deleted_count);
57/// # Ok(())
58/// # }
59/// # }
60/// # #[cfg(not(feature = "fs"))]
61/// # fn main() {}
62/// ```
63pub struct BlobCleanup {
64    inner: Arc<dyn BlobStore>,
65    predicate: CleanupPredicate,
66    /// Maximum number of keys to accumulate before issuing a batch delete.
67    batch_size: usize,
68}
69
70impl BlobCleanup {
71    /// Default batch size for delete operations.
72    const DEFAULT_BATCH_SIZE: usize = 1000;
73
74    /// Create a new cleanup wrapper around an inner blob store.
75    ///
76    /// The `predicate` is called for each blob during `cleanup()`.
77    /// Return `true` to delete the blob.
78    ///
79    /// Uses the default batch size of 1000 for delete operations.
80    pub fn new(inner: Arc<dyn BlobStore>, predicate: CleanupPredicate) -> Self {
81        Self {
82            inner,
83            predicate,
84            batch_size: Self::DEFAULT_BATCH_SIZE,
85        }
86    }
87
88    /// Set the batch size for delete operations.
89    ///
90    /// Keys are accumulated until `batch_size` is reached, then deleted
91    /// in a single batch. This reduces the number of round-trips to the
92    /// backend while keeping memory usage bounded.
93    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
94        self.batch_size = batch_size;
95        self
96    }
97
98    /// Run cleanup: visit all blobs and delete those matching the predicate.
99    ///
100    /// Uses streaming `visit()` to process blobs as they become available,
101    /// deleting in batches of [`batch_size`](Self::with_batch_size).
102    ///
103    /// Uses an empty prefix filter to list everything. For scoped cleanup,
104    /// wrap the store with a `PrefixBlobStore` (via the builder) first.
105    #[instrument(skip(self))]
106    pub async fn cleanup(&self) -> Result<CleanupResult> {
107        let mut visitor = CleanupVisitor {
108            store: &*self.inner,
109            predicate: &self.predicate,
110            batch: Vec::with_capacity(self.batch_size),
111            batch_size: self.batch_size,
112            deleted_count: 0u64,
113        };
114        tracing::debug!(batch_size = %self.batch_size, "Starting cleanup");
115        self.inner
116            .visit(&PrefixFilter::new(""), &mut visitor)
117            .await?;
118        // Flush remaining keys
119        visitor.flush().await?;
120        tracing::debug!(deleted_count = %visitor.deleted_count, "Cleanup completed");
121        Ok(CleanupResult {
122            deleted_count: visitor.deleted_count,
123        })
124    }
125}
126
127#[async_trait]
128impl BlobStore for BlobCleanup {
129    #[instrument(skip(self, blobs))]
130    async fn put(&self, blobs: Vec<BlobInput>) -> Result<PutResult> {
131        tracing::debug!(count = %blobs.len(), "Put via cleanup layer");
132        self.inner.put(blobs).await
133    }
134
135    #[instrument(skip(self))]
136    async fn get(&self, key: &str) -> Result<Box<dyn AsyncRead + Send + Unpin>> {
137        tracing::debug!(key, "Get via cleanup layer");
138        self.inner.get(key).await
139    }
140
141    #[instrument(skip(self))]
142    async fn delete(&self, keys: &[&str]) -> Result<()> {
143        tracing::debug!(count = %keys.len(), "Delete via cleanup layer");
144        self.inner.delete(keys).await
145    }
146
147    #[instrument(skip(self, filter))]
148    async fn list(&self, filter: &dyn ListFilter) -> Result<Vec<String>> {
149        self.inner.list(filter).await
150    }
151
152    #[instrument(skip(self))]
153    async fn exists(&self, key: &str) -> Result<bool> {
154        self.inner.exists(key).await
155    }
156
157    #[instrument(skip(self))]
158    async fn get_with_metadata(
159        &self,
160        key: &str,
161    ) -> Result<(BlobMeta, Box<dyn AsyncRead + Send + Unpin>)> {
162        self.inner.get_with_metadata(key).await
163    }
164
165    #[instrument(skip(self, filter))]
166    async fn list_with_metadata(&self, filter: &dyn ListFilter) -> Result<Vec<BlobMeta>> {
167        self.inner.list_with_metadata(filter).await
168    }
169}