use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use crate::blob_store::BlobStore;
use crate::cleanup::BlobCleanup;
use crate::cleanup::CleanupPredicate;
use crate::encrypt::EncryptionProvider;
use crate::encrypt::store::EncryptedBlobStore;
use crate::error::Result;
use crate::prefix::PrefixBlobStore;
use self::maintenance::MaintenanceTask;
pub(crate) mod backend;
pub(crate) mod maintenance;
pub(crate) mod strategies;
pub use strategies::{
BackgroundCancellation, BackgroundContext, BackgroundStrategy, MaintenanceTrigger, Manual,
OnStart, Periodic,
};
pub(crate) type TaskFactory =
Arc<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
struct ShutdownGuard {
inner: Arc<dyn BlobStore>,
cancellation: tokio_util::sync::CancellationToken,
}
impl Drop for ShutdownGuard {
fn drop(&mut self) {
self.cancellation.cancel();
}
}
#[async_trait::async_trait]
impl BlobStore for ShutdownGuard {
async fn put(&self, blobs: Vec<crate::types::BlobInput>) -> Result<crate::types::PutResult> {
self.inner.put(blobs).await
}
async fn get(&self, key: &str) -> Result<Box<dyn tokio::io::AsyncRead + Send + Unpin>> {
self.inner.get(key).await
}
async fn delete(&self, keys: &[&str]) -> Result<()> {
self.inner.delete(keys).await
}
async fn list(&self, filter: &dyn crate::list_filter::ListFilter) -> Result<Vec<String>> {
self.inner.list(filter).await
}
async fn exists(&self, key: &str) -> Result<bool> {
self.inner.exists(key).await
}
async fn get_with_metadata(
&self,
key: &str,
) -> Result<(
crate::types::BlobMeta,
Box<dyn tokio::io::AsyncRead + Send + Unpin>,
)> {
self.inner.get_with_metadata(key).await
}
async fn list_with_metadata(
&self,
filter: &dyn crate::list_filter::ListFilter,
) -> Result<Vec<crate::types::BlobMeta>> {
self.inner.list_with_metadata(filter).await
}
async fn visit(
&self,
filter: &dyn crate::list_filter::ListFilter,
visitor: &mut dyn crate::visitor::BlobVisitor,
) -> Result<()> {
self.inner.visit(filter, visitor).await
}
}
pub struct NoBackend;
pub struct FsChosen;
#[cfg(feature = "s3")]
pub struct S3Chosen;
pub struct CustomChosen;
pub(crate) type CustomLayerFn = Arc<dyn Fn(Arc<dyn BlobStore>) -> Arc<dyn BlobStore> + Send + Sync>;
pub(crate) enum LayerKind {
Prefix(String),
Encryption {
provider: Arc<dyn EncryptionProvider>,
rekey_strategy: Option<Arc<dyn BackgroundStrategy>>,
},
Cleanup {
predicate: CleanupPredicate,
batch_size: usize,
strategy: Option<Arc<dyn BackgroundStrategy>>,
},
Custom(CustomLayerFn),
}
impl LayerKind {
fn wrap(self, inner: Arc<dyn BlobStore>) -> (Arc<dyn BlobStore>, Vec<MaintenanceTask>) {
match self {
LayerKind::Prefix(prefix) => (Arc::new(PrefixBlobStore::new(inner, prefix)), vec![]),
LayerKind::Encryption {
provider,
rekey_strategy,
} => {
let enc = Arc::new(EncryptedBlobStore::new(inner, provider));
let mut tasks = vec![];
if let Some(strategy) = rekey_strategy {
let enc_weak: Weak<EncryptedBlobStore> = Arc::downgrade(&enc);
let factory: TaskFactory = Arc::new(move || {
let enc_weak = enc_weak.clone();
Box::pin(async move {
let Some(enc) = enc_weak.upgrade() else {
tracing::debug!("Rekey task skipped: store already dropped");
return;
};
match enc.rekey().await {
Ok(result) => {
tracing::info!(
"Rekey completed: {} headers rekeyed",
result.rekeyed_count
);
}
Err(e) => {
tracing::error!("Rekey failed: {e}");
}
}
})
});
tasks.push(MaintenanceTask { factory, strategy });
}
(enc as Arc<dyn BlobStore>, tasks)
}
LayerKind::Cleanup {
predicate,
batch_size,
strategy,
} => {
let mut c = BlobCleanup::new(inner, predicate);
c = c.with_batch_size(batch_size);
let c = Arc::new(c);
let mut tasks = vec![];
if let Some(strategy) = strategy {
let c_weak: Weak<BlobCleanup> = Arc::downgrade(&c);
let factory: TaskFactory = Arc::new(move || {
let c_weak = c_weak.clone();
Box::pin(async move {
let Some(c) = c_weak.upgrade() else {
tracing::debug!("Cleanup task skipped: store already dropped");
return;
};
match c.cleanup().await {
Ok(result) => {
tracing::info!(
"Cleanup completed: {} blobs deleted",
result.deleted_count
);
}
Err(e) => {
tracing::error!("Cleanup failed: {e}");
}
}
})
});
tasks.push(MaintenanceTask { factory, strategy });
}
(c as Arc<dyn BlobStore>, tasks)
}
LayerKind::Custom(f) => (f(inner), vec![]),
}
}
}
pub struct BlobStoreBuilder<B = NoBackend> {
backend: Option<backend::BackendKind>,
layers: Vec<LayerKind>,
_phantom: PhantomData<B>,
}
impl BlobStoreBuilder<NoBackend> {
pub fn new() -> Self {
Self {
backend: None,
layers: Vec::new(),
_phantom: PhantomData,
}
}
#[cfg(feature = "fs")]
pub fn with_fs(self, root: impl Into<std::path::PathBuf>) -> BlobStoreBuilder<FsChosen> {
BlobStoreBuilder {
backend: Some(backend::BackendKind::Fs(root.into())),
layers: self.layers,
_phantom: PhantomData,
}
}
#[cfg(feature = "s3")]
pub fn with_s3(
self,
client: aws_sdk_s3::Client,
bucket: impl Into<String>,
) -> BlobStoreBuilder<S3Chosen> {
BlobStoreBuilder {
backend: Some(backend::BackendKind::S3 {
client,
bucket: bucket.into(),
part_size: crate::s3::DEFAULT_MULTIPART_PART_SIZE,
}),
layers: self.layers,
_phantom: PhantomData,
}
}
pub fn with_backend(self, store: Arc<dyn BlobStore>) -> BlobStoreBuilder<CustomChosen> {
BlobStoreBuilder {
backend: Some(backend::BackendKind::Custom(store)),
layers: self.layers,
_phantom: PhantomData,
}
}
}
impl Default for BlobStoreBuilder<NoBackend> {
fn default() -> Self {
Self::new()
}
}
impl<B> BlobStoreBuilder<B> {
pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
self.layers.push(LayerKind::Prefix(prefix.into()));
self
}
pub fn with_encryption(mut self, provider: Arc<dyn EncryptionProvider>) -> Self {
self.layers.push(LayerKind::Encryption {
provider,
rekey_strategy: None,
});
self
}
pub fn with_rekey(mut self, strategy: Arc<dyn BackgroundStrategy>) -> Self {
if let Some(LayerKind::Encryption { rekey_strategy, .. }) = self.layers.last_mut() {
*rekey_strategy = Some(strategy);
} else {
tracing::warn!(
"with_rekey() called but the most recent layer is not an encryption layer \
— ignoring rekey strategy. Call with_rekey() directly after with_encryption()."
);
}
self
}
pub fn with_clean(
mut self,
predicate: CleanupPredicate,
strategy: Arc<dyn BackgroundStrategy>,
) -> Self {
self.layers.push(LayerKind::Cleanup {
predicate,
batch_size: 1000,
strategy: Some(strategy),
});
self
}
pub fn with_clean_batch_size(mut self, batch_size: usize) -> Self {
if let Some(LayerKind::Cleanup { batch_size: bs, .. }) = self.layers.last_mut() {
*bs = batch_size;
}
self
}
pub fn with_layer<F>(mut self, f: F) -> Self
where
F: Fn(Arc<dyn BlobStore>) -> Arc<dyn BlobStore> + Send + Sync + 'static,
{
self.layers.push(LayerKind::Custom(Arc::new(f)));
self
}
fn validate_strategies(&self) -> Result<()> {
for layer in &self.layers {
match layer {
LayerKind::Encryption {
rekey_strategy: Some(strategy),
..
} => {
strategy.validate()?;
}
LayerKind::Cleanup {
strategy: Some(strategy),
..
} => {
strategy.validate()?;
}
_ => {}
}
}
Ok(())
}
}
#[cfg(feature = "s3")]
impl BlobStoreBuilder<S3Chosen> {
pub fn with_multipart_part_size(mut self, size: u64) -> Self {
if let Some(backend::BackendKind::S3 { part_size, .. }) = &mut self.backend {
*part_size = size.max(crate::s3::MIN_MULTIPART_PART_SIZE);
}
self
}
}
impl BlobStoreBuilder<FsChosen> {
pub async fn build(self) -> Result<Arc<dyn BlobStore>> {
self.do_build().await
}
}
#[cfg(feature = "s3")]
impl BlobStoreBuilder<S3Chosen> {
pub async fn build(self) -> Result<Arc<dyn BlobStore>> {
self.do_build().await
}
}
impl BlobStoreBuilder<CustomChosen> {
pub async fn build(self) -> Result<Arc<dyn BlobStore>> {
self.do_build().await
}
}
impl<B> BlobStoreBuilder<B> {
async fn do_build(self) -> Result<Arc<dyn BlobStore>> {
self.validate_strategies()?;
let backend = self.backend.expect("backend always set at this point");
let mut store = backend.build_raw_arc().await?;
let mut tasks: Vec<MaintenanceTask> = Vec::new();
for layer in self.layers {
let (new_store, mut layer_tasks) = layer.wrap(store);
store = new_store;
tasks.append(&mut layer_tasks);
}
let cancellation = tokio_util::sync::CancellationToken::new();
let guard = Arc::new(ShutdownGuard {
inner: store,
cancellation: cancellation.clone(),
});
if !tasks.is_empty() {
maintenance::spawn_tasks(tasks, cancellation)?;
}
Ok(guard)
}
}