iridium-db 0.4.0

A high-performance vector-graph hybrid storage and indexing engine
use super::support::{
    from_rhodium_put_result, map_rhodium_error, to_rhodium_put_options, validate_rhodium_blob_id,
};
use super::{BlobGetResult, BlobPrefixDeleteResult, BlobPutOptions, BlobReadOptions, BlobStore};
use crate::features::storage::api::Result;

use rhodium_cache::core::storage::blob::{
    ArtifactHydrationMode, BlobId as RhodiumBlobId, BlobStackBuilder,
    BlobStore as RhodiumBlobStoreApi, BlobTierSpec, CacheStackBlobStore, CompiledPlanCache,
    CompiledPlanCacheDescriptor, IridiumArtifactCacheAdapter, IridiumArtifactRequest,
};
use rhodium_cache::core::storage::StorageError as RhodiumStorageError;

pub struct RhodiumBlobStore {
    runtime: tokio::runtime::Runtime,
    store: CacheStackBlobStore,
    evaluator_cache: IridiumArtifactCacheAdapter,
    compiled_plan_cache: CompiledPlanCache,
}

impl RhodiumBlobStore {
    pub fn new(root: std::path::PathBuf) -> Result<Self> {
        std::fs::create_dir_all(&root)?;
        let runtime = tokio::runtime::Builder::new_multi_thread()
            .enable_all()
            .build()
            .map_err(|err| {
                crate::features::storage::api::StorageError::InvalidInput(format!(
                    "failed to build rhodium runtime: {err}"
                ))
            })?;
        let disk_root = root.join("rhodium-disk");
        let built = runtime
            .block_on(async {
                BlobStackBuilder::new()
                    .with_policy_capacity(4096)
                    .add_tier(BlobTierSpec::Memory {
                        name: "memory".to_string(),
                        capacity_bytes: 16 * 1024 * 1024,
                        block_size: Some(4096),
                    })
                    .add_tier(BlobTierSpec::Disk {
                        name: "disk".to_string(),
                        root: disk_root,
                        capacity_bytes: 64 * 1024 * 1024,
                        use_compression: false,
                        use_native_io: false,
                        block_pool_block_size: None,
                    })
                    .build()
                    .await
            })
            .map_err(|err| {
                crate::features::storage::api::StorageError::InvalidInput(format!(
                    "failed to initialize rhodium blob stack: {err}"
                ))
            })?;
        let store = built.store;
        let evaluator_cache = IridiumArtifactCacheAdapter::new(std::sync::Arc::new(store.clone()));
        let compiled_plan_cache = CompiledPlanCache::new(std::sync::Arc::new(store.clone()));
        Ok(Self {
            runtime,
            store,
            evaluator_cache,
            compiled_plan_cache,
        })
    }
}

impl BlobStore for RhodiumBlobStore {
    fn put_blob_with_options(
        &mut self,
        blob_id: &str,
        bytes: &[u8],
        options: BlobPutOptions,
    ) -> Result<super::BlobPutResult> {
        validate_rhodium_blob_id(blob_id)?;
        let id = RhodiumBlobId::from_bytes(blob_id.as_bytes().to_vec());
        let out = self
            .runtime
            .block_on(
                self.store
                    .put_blob(&id, bytes.to_vec(), to_rhodium_put_options(options)),
            )
            .map_err(map_rhodium_error)?;
        Ok(from_rhodium_put_result(out))
    }

    fn get_blob_with_options(
        &self,
        blob_id: &str,
        options: BlobReadOptions,
    ) -> Result<Option<BlobGetResult>> {
        validate_rhodium_blob_id(blob_id)?;
        let mode = if matches!(options.tier_policy, super::BlobReadTierPolicy::RemoteOnly)
            && !options.rehydrate_local
        {
            ArtifactHydrationMode::BypassCache
        } else {
            ArtifactHydrationMode::CachePreferred
        };

        match self.runtime.block_on(
            self.evaluator_cache
                .hydrate_artifact(IridiumArtifactRequest {
                    artifact_key: blob_id.to_string(),
                    mode,
                }),
        ) {
            Ok(result) => Ok(Some(BlobGetResult {
                data: result.data,
                served_tier: result.served_tier,
                rehydrated_local: matches!(
                    result.disposition,
                    rhodium_cache::core::storage::blob::ArtifactHydrationDisposition::CacheMissRehydrated
                ),
            })),
            Err(RhodiumStorageError::NotFound) => Ok(None),
            Err(err) => Err(map_rhodium_error(err)),
        }
    }

    fn has_blob(&self, blob_id: &str) -> Result<bool> {
        validate_rhodium_blob_id(blob_id)?;
        let id = RhodiumBlobId::from_bytes(blob_id.as_bytes().to_vec());
        self.runtime
            .block_on(self.store.has_blob(&id))
            .map_err(map_rhodium_error)
    }

    fn delete_blob(&mut self, blob_id: &str) -> Result<()> {
        validate_rhodium_blob_id(blob_id)?;
        let id = RhodiumBlobId::from_bytes(blob_id.as_bytes().to_vec());
        match self.runtime.block_on(self.store.delete_blob(&id)) {
            Ok(()) | Err(RhodiumStorageError::NotFound) => {}
            Err(err) => return Err(map_rhodium_error(err)),
        }
        Ok(())
    }

    fn has_blobs(&self, blob_ids: &[String]) -> Result<Vec<bool>> {
        let mut ids = Vec::with_capacity(blob_ids.len());
        for blob_id in blob_ids {
            validate_rhodium_blob_id(blob_id)?;
            ids.push(RhodiumBlobId::from_bytes(blob_id.as_bytes().to_vec()));
        }
        self.runtime
            .block_on(self.store.has_blobs(&ids))
            .map_err(map_rhodium_error)
    }

    fn delete_blobs(&mut self, blob_ids: &[String]) -> Result<usize> {
        let mut ids = Vec::with_capacity(blob_ids.len());
        for blob_id in blob_ids {
            validate_rhodium_blob_id(blob_id)?;
            ids.push(RhodiumBlobId::from_bytes(blob_id.as_bytes().to_vec()));
        }
        self.runtime
            .block_on(self.store.delete_blobs(&ids))
            .map_err(map_rhodium_error)
    }

    fn list_prefix(&self, namespace: &str, prefix: &str, limit: usize) -> Result<Vec<String>> {
        let ids = self
            .runtime
            .block_on(
                self.store
                    .list_prefix(namespace.as_bytes(), prefix.as_bytes(), limit),
            )
            .map_err(map_rhodium_error)?;
        let mut out = Vec::with_capacity(ids.len());
        for id in ids {
            let s = String::from_utf8(id.as_bytes().to_vec()).map_err(|_| {
                crate::features::storage::api::StorageError::CorruptData(
                    "rhodium blob id is not valid utf-8".to_string(),
                )
            })?;
            out.push(s);
        }
        Ok(out)
    }

    fn delete_prefix(
        &mut self,
        namespace: &str,
        prefix: &str,
        batch_limit: usize,
    ) -> Result<BlobPrefixDeleteResult> {
        let out = self
            .runtime
            .block_on(self.store.delete_prefix(
                namespace.as_bytes(),
                prefix.as_bytes(),
                batch_limit,
            ))
            .map_err(map_rhodium_error)?;
        Ok(BlobPrefixDeleteResult {
            deleted: out.deleted,
            truncated: out.truncated,
        })
    }

    fn put_compiled_plan_with_options(
        &mut self,
        descriptor: &CompiledPlanCacheDescriptor,
        bytes: &[u8],
        options: BlobPutOptions,
    ) -> Result<super::BlobPutResult> {
        self.runtime
            .block_on(self.compiled_plan_cache.put_compiled_plan(
                descriptor,
                bytes.to_vec(),
                to_rhodium_put_options(options),
            ))
            .map_err(map_rhodium_error)?;
        Ok(super::BlobPutResult {
            inserted: true,
            overwritten: false,
            idempotent_noop: false,
        })
    }

    fn get_compiled_plan_with_options(
        &self,
        descriptor: &CompiledPlanCacheDescriptor,
        options: BlobReadOptions,
    ) -> Result<Option<BlobGetResult>> {
        let mode = if matches!(options.tier_policy, super::BlobReadTierPolicy::RemoteOnly)
            && !options.rehydrate_local
        {
            ArtifactHydrationMode::BypassCache
        } else {
            ArtifactHydrationMode::CachePreferred
        };

        match self.runtime.block_on(
            self.compiled_plan_cache
                .hydrate_compiled_plan(descriptor, mode),
        ) {
            Ok(result) => Ok(Some(BlobGetResult {
                data: result.data,
                served_tier: result.served_tier,
                rehydrated_local: matches!(
                    result.disposition,
                    rhodium_cache::core::storage::blob::ArtifactHydrationDisposition::CacheMissRehydrated
                ),
            })),
            Err(RhodiumStorageError::NotFound) => Ok(None),
            Err(err) => Err(map_rhodium_error(err)),
        }
    }
}