use super::support::{
from_rhodium_put_result, map_rhodium_error, to_rhodium_put_options, validate_rhodium_blob_id,
};
use super::{BlobGetResult, BlobPrefixDeleteResult, BlobPutOptions, BlobReadOptions, BlobStore};
use crate::features::storage::api::Result;
use rhodium_cache::core::storage::blob::{
ArtifactHydrationMode, BlobId as RhodiumBlobId, BlobStackBuilder,
BlobStore as RhodiumBlobStoreApi, BlobTierSpec, CacheStackBlobStore, CompiledPlanCache,
CompiledPlanCacheDescriptor, IridiumArtifactCacheAdapter, IridiumArtifactRequest,
};
use rhodium_cache::core::storage::StorageError as RhodiumStorageError;
pub struct RhodiumBlobStore {
runtime: tokio::runtime::Runtime,
store: CacheStackBlobStore,
evaluator_cache: IridiumArtifactCacheAdapter,
compiled_plan_cache: CompiledPlanCache,
}
impl RhodiumBlobStore {
pub fn new(root: std::path::PathBuf) -> Result<Self> {
std::fs::create_dir_all(&root)?;
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.map_err(|err| {
crate::features::storage::api::StorageError::InvalidInput(format!(
"failed to build rhodium runtime: {err}"
))
})?;
let disk_root = root.join("rhodium-disk");
let built = runtime
.block_on(async {
BlobStackBuilder::new()
.with_policy_capacity(4096)
.add_tier(BlobTierSpec::Memory {
name: "memory".to_string(),
capacity_bytes: 16 * 1024 * 1024,
block_size: Some(4096),
})
.add_tier(BlobTierSpec::Disk {
name: "disk".to_string(),
root: disk_root,
capacity_bytes: 64 * 1024 * 1024,
use_compression: false,
use_native_io: false,
block_pool_block_size: None,
})
.build()
.await
})
.map_err(|err| {
crate::features::storage::api::StorageError::InvalidInput(format!(
"failed to initialize rhodium blob stack: {err}"
))
})?;
let store = built.store;
let evaluator_cache = IridiumArtifactCacheAdapter::new(std::sync::Arc::new(store.clone()));
let compiled_plan_cache = CompiledPlanCache::new(std::sync::Arc::new(store.clone()));
Ok(Self {
runtime,
store,
evaluator_cache,
compiled_plan_cache,
})
}
}
impl BlobStore for RhodiumBlobStore {
fn put_blob_with_options(
&mut self,
blob_id: &str,
bytes: &[u8],
options: BlobPutOptions,
) -> Result<super::BlobPutResult> {
validate_rhodium_blob_id(blob_id)?;
let id = RhodiumBlobId::from_bytes(blob_id.as_bytes().to_vec());
let out = self
.runtime
.block_on(
self.store
.put_blob(&id, bytes.to_vec(), to_rhodium_put_options(options)),
)
.map_err(map_rhodium_error)?;
Ok(from_rhodium_put_result(out))
}
fn get_blob_with_options(
&self,
blob_id: &str,
options: BlobReadOptions,
) -> Result<Option<BlobGetResult>> {
validate_rhodium_blob_id(blob_id)?;
let mode = if matches!(options.tier_policy, super::BlobReadTierPolicy::RemoteOnly)
&& !options.rehydrate_local
{
ArtifactHydrationMode::BypassCache
} else {
ArtifactHydrationMode::CachePreferred
};
match self.runtime.block_on(
self.evaluator_cache
.hydrate_artifact(IridiumArtifactRequest {
artifact_key: blob_id.to_string(),
mode,
}),
) {
Ok(result) => Ok(Some(BlobGetResult {
data: result.data,
served_tier: result.served_tier,
rehydrated_local: matches!(
result.disposition,
rhodium_cache::core::storage::blob::ArtifactHydrationDisposition::CacheMissRehydrated
),
})),
Err(RhodiumStorageError::NotFound) => Ok(None),
Err(err) => Err(map_rhodium_error(err)),
}
}
fn has_blob(&self, blob_id: &str) -> Result<bool> {
validate_rhodium_blob_id(blob_id)?;
let id = RhodiumBlobId::from_bytes(blob_id.as_bytes().to_vec());
self.runtime
.block_on(self.store.has_blob(&id))
.map_err(map_rhodium_error)
}
fn delete_blob(&mut self, blob_id: &str) -> Result<()> {
validate_rhodium_blob_id(blob_id)?;
let id = RhodiumBlobId::from_bytes(blob_id.as_bytes().to_vec());
match self.runtime.block_on(self.store.delete_blob(&id)) {
Ok(()) | Err(RhodiumStorageError::NotFound) => {}
Err(err) => return Err(map_rhodium_error(err)),
}
Ok(())
}
fn has_blobs(&self, blob_ids: &[String]) -> Result<Vec<bool>> {
let mut ids = Vec::with_capacity(blob_ids.len());
for blob_id in blob_ids {
validate_rhodium_blob_id(blob_id)?;
ids.push(RhodiumBlobId::from_bytes(blob_id.as_bytes().to_vec()));
}
self.runtime
.block_on(self.store.has_blobs(&ids))
.map_err(map_rhodium_error)
}
fn delete_blobs(&mut self, blob_ids: &[String]) -> Result<usize> {
let mut ids = Vec::with_capacity(blob_ids.len());
for blob_id in blob_ids {
validate_rhodium_blob_id(blob_id)?;
ids.push(RhodiumBlobId::from_bytes(blob_id.as_bytes().to_vec()));
}
self.runtime
.block_on(self.store.delete_blobs(&ids))
.map_err(map_rhodium_error)
}
fn list_prefix(&self, namespace: &str, prefix: &str, limit: usize) -> Result<Vec<String>> {
let ids = self
.runtime
.block_on(
self.store
.list_prefix(namespace.as_bytes(), prefix.as_bytes(), limit),
)
.map_err(map_rhodium_error)?;
let mut out = Vec::with_capacity(ids.len());
for id in ids {
let s = String::from_utf8(id.as_bytes().to_vec()).map_err(|_| {
crate::features::storage::api::StorageError::CorruptData(
"rhodium blob id is not valid utf-8".to_string(),
)
})?;
out.push(s);
}
Ok(out)
}
fn delete_prefix(
&mut self,
namespace: &str,
prefix: &str,
batch_limit: usize,
) -> Result<BlobPrefixDeleteResult> {
let out = self
.runtime
.block_on(self.store.delete_prefix(
namespace.as_bytes(),
prefix.as_bytes(),
batch_limit,
))
.map_err(map_rhodium_error)?;
Ok(BlobPrefixDeleteResult {
deleted: out.deleted,
truncated: out.truncated,
})
}
fn put_compiled_plan_with_options(
&mut self,
descriptor: &CompiledPlanCacheDescriptor,
bytes: &[u8],
options: BlobPutOptions,
) -> Result<super::BlobPutResult> {
self.runtime
.block_on(self.compiled_plan_cache.put_compiled_plan(
descriptor,
bytes.to_vec(),
to_rhodium_put_options(options),
))
.map_err(map_rhodium_error)?;
Ok(super::BlobPutResult {
inserted: true,
overwritten: false,
idempotent_noop: false,
})
}
fn get_compiled_plan_with_options(
&self,
descriptor: &CompiledPlanCacheDescriptor,
options: BlobReadOptions,
) -> Result<Option<BlobGetResult>> {
let mode = if matches!(options.tier_policy, super::BlobReadTierPolicy::RemoteOnly)
&& !options.rehydrate_local
{
ArtifactHydrationMode::BypassCache
} else {
ArtifactHydrationMode::CachePreferred
};
match self.runtime.block_on(
self.compiled_plan_cache
.hydrate_compiled_plan(descriptor, mode),
) {
Ok(result) => Ok(Some(BlobGetResult {
data: result.data,
served_tier: result.served_tier,
rehydrated_local: matches!(
result.disposition,
rhodium_cache::core::storage::blob::ArtifactHydrationDisposition::CacheMissRehydrated
),
})),
Err(RhodiumStorageError::NotFound) => Ok(None),
Err(err) => Err(map_rhodium_error(err)),
}
}
}