xtax_blob_storage/cleanup.rs
1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::io::AsyncRead;
5use tracing::instrument;
6
7use crate::blob_store::BlobStore;
8use crate::cleanup::visitor::CleanupVisitor;
9use crate::error::Result;
10use crate::list_filter::{ListFilter, PrefixFilter};
11use crate::types::{BlobInput, BlobMeta, CleanupResult, PutResult};
12
13pub(crate) mod visitor;
14
15/// Predicate for deciding which blobs to delete during cleanup.
16///
17/// Return `true` to delete the blob.
18pub type CleanupPredicate = Box<dyn Fn(&str, &BlobMeta) -> bool + Send + Sync>;
19
20/// Layer that provides cleanup functionality over a [`BlobStore`].
21///
22/// `BlobCleanup` is itself a [`BlobStore`] — it delegates all operations
23/// to the inner store transparently, but also exposes a `cleanup()` method
24/// that calls a predicate to decide which blobs to delete.
25///
26/// For full documentation see the
27/// [Cleanup guide](https://github.com/cz-jcode/xtax/blob/main/crates/xtax-blob-storage/docs/cleanup.md).
28///
29/// # Example
30///
31/// ```rust,no_run
32/// use std::sync::Arc;
33/// use xtax_blob_storage::{BlobInput, BlobStore, BlobStoreBuilder, BlobCleanup, CleanupPredicate};
34///
35/// # #[cfg(feature = "fs")]
36/// # #[tokio::main]
37/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
38/// # #[cfg(feature = "fs")]
39/// # {
40/// let inner = BlobStoreBuilder::new()
41/// .with_fs("/tmp/data")
42/// .build()
43/// .await?;
44///
45/// // Delete blobs whose key starts with "tmp-"
46/// let predicate: xtax_blob_storage::CleanupPredicate =
47/// Box::new(|key, _meta| key.starts_with("tmp-"));
48///
49/// let store = BlobCleanup::new(inner, predicate);
50///
51/// // Regular blob operations work transparently:
52/// store.put(vec![BlobInput::new("hello.txt", b"data".as_slice())]).await?;
53///
54/// // Cleanup deletes blobs matching the predicate:
55/// let result = store.cleanup().await?;
56/// println!("deleted {} blobs", result.deleted_count);
57/// # Ok(())
58/// # }
59/// # }
60/// # #[cfg(not(feature = "fs"))]
61/// # fn main() {}
62/// ```
63pub struct BlobCleanup {
64 inner: Arc<dyn BlobStore>,
65 predicate: CleanupPredicate,
66 /// Maximum number of keys to accumulate before issuing a batch delete.
67 batch_size: usize,
68}
69
70impl BlobCleanup {
71 /// Default batch size for delete operations.
72 const DEFAULT_BATCH_SIZE: usize = 1000;
73
74 /// Create a new cleanup wrapper around an inner blob store.
75 ///
76 /// The `predicate` is called for each blob during `cleanup()`.
77 /// Return `true` to delete the blob.
78 ///
79 /// Uses the default batch size of 1000 for delete operations.
80 pub fn new(inner: Arc<dyn BlobStore>, predicate: CleanupPredicate) -> Self {
81 Self {
82 inner,
83 predicate,
84 batch_size: Self::DEFAULT_BATCH_SIZE,
85 }
86 }
87
88 /// Set the batch size for delete operations.
89 ///
90 /// Keys are accumulated until `batch_size` is reached, then deleted
91 /// in a single batch. This reduces the number of round-trips to the
92 /// backend while keeping memory usage bounded.
93 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
94 self.batch_size = batch_size;
95 self
96 }
97
98 /// Run cleanup: visit all blobs and delete those matching the predicate.
99 ///
100 /// Uses streaming `visit()` to process blobs as they become available,
101 /// deleting in batches of [`batch_size`](Self::with_batch_size).
102 ///
103 /// Uses an empty prefix filter to list everything. For scoped cleanup,
104 /// wrap the store with a `PrefixBlobStore` (via the builder) first.
105 #[instrument(skip(self))]
106 pub async fn cleanup(&self) -> Result<CleanupResult> {
107 let mut visitor = CleanupVisitor {
108 store: &*self.inner,
109 predicate: &self.predicate,
110 batch: Vec::with_capacity(self.batch_size),
111 batch_size: self.batch_size,
112 deleted_count: 0u64,
113 };
114 tracing::debug!(batch_size = %self.batch_size, "Starting cleanup");
115 self.inner
116 .visit(&PrefixFilter::new(""), &mut visitor)
117 .await?;
118 // Flush remaining keys
119 visitor.flush().await?;
120 tracing::debug!(deleted_count = %visitor.deleted_count, "Cleanup completed");
121 Ok(CleanupResult {
122 deleted_count: visitor.deleted_count,
123 })
124 }
125}
126
127#[async_trait]
128impl BlobStore for BlobCleanup {
129 #[instrument(skip(self, blobs))]
130 async fn put(&self, blobs: Vec<BlobInput>) -> Result<PutResult> {
131 tracing::debug!(count = %blobs.len(), "Put via cleanup layer");
132 self.inner.put(blobs).await
133 }
134
135 #[instrument(skip(self))]
136 async fn get(&self, key: &str) -> Result<Box<dyn AsyncRead + Send + Unpin>> {
137 tracing::debug!(key, "Get via cleanup layer");
138 self.inner.get(key).await
139 }
140
141 #[instrument(skip(self))]
142 async fn delete(&self, keys: &[&str]) -> Result<()> {
143 tracing::debug!(count = %keys.len(), "Delete via cleanup layer");
144 self.inner.delete(keys).await
145 }
146
147 #[instrument(skip(self, filter))]
148 async fn list(&self, filter: &dyn ListFilter) -> Result<Vec<String>> {
149 self.inner.list(filter).await
150 }
151
152 #[instrument(skip(self))]
153 async fn exists(&self, key: &str) -> Result<bool> {
154 self.inner.exists(key).await
155 }
156
157 #[instrument(skip(self))]
158 async fn get_with_metadata(
159 &self,
160 key: &str,
161 ) -> Result<(BlobMeta, Box<dyn AsyncRead + Send + Unpin>)> {
162 self.inner.get_with_metadata(key).await
163 }
164
165 #[instrument(skip(self, filter))]
166 async fn list_with_metadata(&self, filter: &dyn ListFilter) -> Result<Vec<BlobMeta>> {
167 self.inner.list_with_metadata(filter).await
168 }
169}