use super::types::{
BlobAckMode, BlobDurabilityTarget, BlobGetResult, BlobPrefixDeleteResult, BlobPutOptions,
BlobPutResult, BlobReadOptions, BlobReadTierPolicy, BlobStore,
};
use crate::features::storage::api::{Result, StorageError};
use alloy_storage::{
AckPolicy, BlobId as AlloyBlobId, BlobStore as AlloyBlobStore, DurabilityTier, OverwritePolicy,
PutOptions as AlloyPutOptions, ReadOptions as AlloyReadOptions, ReadTierPolicy,
StorageError as AlloyStorageError,
};
use std::sync::Arc;
pub struct InjectedBlobStoreAdapter {
runtime: tokio::runtime::Runtime,
store: Arc<dyn AlloyBlobStore>,
}
impl InjectedBlobStoreAdapter {
pub fn new(store: Arc<dyn AlloyBlobStore>) -> Result<Self> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.map_err(|err| {
StorageError::InvalidInput(format!(
"failed to build tokio runtime for injected blob store: {err}"
))
})?;
Ok(Self { runtime, store })
}
}
fn to_alloy_id(blob_id: &str) -> AlloyBlobId {
AlloyBlobId::from_bytes(blob_id.as_bytes().to_vec())
}
fn to_alloy_put_options(opts: BlobPutOptions) -> AlloyPutOptions {
AlloyPutOptions {
overwrite_policy: if opts.deny_if_exists {
OverwritePolicy::DenyIfExists
} else {
OverwritePolicy::AllowOverwrite
},
ack: match opts.ack_mode {
BlobAckMode::FireAndForget => AckPolicy::FireAndForget,
BlobAckMode::Flush => AckPolicy::Flush {
target: match opts.durability_target {
BlobDurabilityTarget::Memory => DurabilityTier::Memory,
BlobDurabilityTarget::Disk => DurabilityTier::Disk,
BlobDurabilityTarget::Remote => DurabilityTier::Remote,
},
timeout_ms: opts.timeout_ms,
},
},
idempotent: opts.idempotent,
verify_content_hash: opts.verify_content_hash,
ttl_secs: None,
}
}
fn to_alloy_read_options(opts: BlobReadOptions) -> AlloyReadOptions {
AlloyReadOptions {
tier_policy: match opts.tier_policy {
BlobReadTierPolicy::Default => ReadTierPolicy::Default,
BlobReadTierPolicy::LocalFirst => ReadTierPolicy::LocalFirst,
BlobReadTierPolicy::RemoteOnly => ReadTierPolicy::RemoteOnly,
},
rehydrate_local: opts.rehydrate_local,
}
}
fn map_alloy_error(err: AlloyStorageError) -> StorageError {
match err {
AlloyStorageError::NotFound => StorageError::InvalidInput("blob not found".to_string()),
other => StorageError::InvalidInput(other.to_string()),
}
}
impl BlobStore for InjectedBlobStoreAdapter {
fn put_blob_with_options(
&mut self,
blob_id: &str,
bytes: &[u8],
options: BlobPutOptions,
) -> Result<BlobPutResult> {
let id = to_alloy_id(blob_id);
let result = self
.runtime
.block_on(
self.store
.put_blob(&id, bytes.to_vec(), to_alloy_put_options(options)),
)
.map_err(map_alloy_error)?;
Ok(BlobPutResult {
inserted: result.inserted,
overwritten: result.overwritten,
idempotent_noop: result.idempotent_noop,
})
}
fn get_blob_with_options(
&self,
blob_id: &str,
options: BlobReadOptions,
) -> Result<Option<BlobGetResult>> {
let id = to_alloy_id(blob_id);
match self
.runtime
.block_on(self.store.get_blob(&id, to_alloy_read_options(options)))
{
Ok(result) => Ok(Some(BlobGetResult {
data: result.data,
served_tier: result.served_tier,
rehydrated_local: result.rehydrated_local,
})),
Err(AlloyStorageError::NotFound) => Ok(None),
Err(err) => Err(map_alloy_error(err)),
}
}
fn has_blob(&self, blob_id: &str) -> Result<bool> {
let id = to_alloy_id(blob_id);
self.runtime
.block_on(self.store.has_blob(&id))
.map_err(map_alloy_error)
}
fn delete_blob(&mut self, blob_id: &str) -> Result<()> {
let id = to_alloy_id(blob_id);
self.runtime
.block_on(self.store.delete_blob(&id))
.map_err(map_alloy_error)
}
fn has_blobs(&self, blob_ids: &[String]) -> Result<Vec<bool>> {
let ids: Vec<AlloyBlobId> = blob_ids.iter().map(|s| to_alloy_id(s)).collect();
self.runtime
.block_on(self.store.has_blobs(&ids))
.map_err(map_alloy_error)
}
fn delete_blobs(&mut self, blob_ids: &[String]) -> Result<usize> {
let ids: Vec<AlloyBlobId> = blob_ids.iter().map(|s| to_alloy_id(s)).collect();
self.runtime
.block_on(self.store.delete_blobs(&ids))
.map_err(map_alloy_error)
}
fn list_prefix(&self, namespace: &str, prefix: &str, limit: usize) -> Result<Vec<String>> {
let ids = self
.runtime
.block_on(
self.store
.list_prefix(namespace.as_bytes(), prefix.as_bytes(), limit),
)
.map_err(map_alloy_error)?;
Ok(ids
.into_iter()
.map(|id| String::from_utf8_lossy(id.as_bytes()).into_owned())
.collect())
}
fn delete_prefix(
&mut self,
namespace: &str,
prefix: &str,
batch_limit: usize,
) -> Result<BlobPrefixDeleteResult> {
let result = self
.runtime
.block_on(self.store.delete_prefix(
namespace.as_bytes(),
prefix.as_bytes(),
batch_limit,
))
.map_err(map_alloy_error)?;
Ok(BlobPrefixDeleteResult {
deleted: result.deleted,
truncated: result.truncated,
})
}
}