iridium-db 0.4.0

A high-performance vector-graph hybrid storage and indexing engine
use super::types::{
    BlobAckMode, BlobDurabilityTarget, BlobGetResult, BlobPrefixDeleteResult, BlobPutOptions,
    BlobPutResult, BlobReadOptions, BlobReadTierPolicy, BlobStore,
};
use crate::features::storage::api::{Result, StorageError};
use alloy_storage::{
    AckPolicy, BlobId as AlloyBlobId, BlobStore as AlloyBlobStore, DurabilityTier, OverwritePolicy,
    PutOptions as AlloyPutOptions, ReadOptions as AlloyReadOptions, ReadTierPolicy,
    StorageError as AlloyStorageError,
};
use std::sync::Arc;

pub struct InjectedBlobStoreAdapter {
    runtime: tokio::runtime::Runtime,
    store: Arc<dyn AlloyBlobStore>,
}

impl InjectedBlobStoreAdapter {
    pub fn new(store: Arc<dyn AlloyBlobStore>) -> Result<Self> {
        let runtime = tokio::runtime::Builder::new_multi_thread()
            .enable_all()
            .build()
            .map_err(|err| {
                StorageError::InvalidInput(format!(
                    "failed to build tokio runtime for injected blob store: {err}"
                ))
            })?;
        Ok(Self { runtime, store })
    }
}

// -- type conversions --------------------------------------------------------

fn to_alloy_id(blob_id: &str) -> AlloyBlobId {
    AlloyBlobId::from_bytes(blob_id.as_bytes().to_vec())
}

fn to_alloy_put_options(opts: BlobPutOptions) -> AlloyPutOptions {
    AlloyPutOptions {
        overwrite_policy: if opts.deny_if_exists {
            OverwritePolicy::DenyIfExists
        } else {
            OverwritePolicy::AllowOverwrite
        },
        ack: match opts.ack_mode {
            BlobAckMode::FireAndForget => AckPolicy::FireAndForget,
            BlobAckMode::Flush => AckPolicy::Flush {
                target: match opts.durability_target {
                    BlobDurabilityTarget::Memory => DurabilityTier::Memory,
                    BlobDurabilityTarget::Disk => DurabilityTier::Disk,
                    BlobDurabilityTarget::Remote => DurabilityTier::Remote,
                },
                timeout_ms: opts.timeout_ms,
            },
        },
        idempotent: opts.idempotent,
        verify_content_hash: opts.verify_content_hash,
        ttl_secs: None,
    }
}

fn to_alloy_read_options(opts: BlobReadOptions) -> AlloyReadOptions {
    AlloyReadOptions {
        tier_policy: match opts.tier_policy {
            BlobReadTierPolicy::Default => ReadTierPolicy::Default,
            BlobReadTierPolicy::LocalFirst => ReadTierPolicy::LocalFirst,
            BlobReadTierPolicy::RemoteOnly => ReadTierPolicy::RemoteOnly,
        },
        rehydrate_local: opts.rehydrate_local,
    }
}

fn map_alloy_error(err: AlloyStorageError) -> StorageError {
    match err {
        AlloyStorageError::NotFound => StorageError::InvalidInput("blob not found".to_string()),
        other => StorageError::InvalidInput(other.to_string()),
    }
}

// -- BlobStore impl ----------------------------------------------------------

impl BlobStore for InjectedBlobStoreAdapter {
    fn put_blob_with_options(
        &mut self,
        blob_id: &str,
        bytes: &[u8],
        options: BlobPutOptions,
    ) -> Result<BlobPutResult> {
        let id = to_alloy_id(blob_id);
        let result = self
            .runtime
            .block_on(
                self.store
                    .put_blob(&id, bytes.to_vec(), to_alloy_put_options(options)),
            )
            .map_err(map_alloy_error)?;
        Ok(BlobPutResult {
            inserted: result.inserted,
            overwritten: result.overwritten,
            idempotent_noop: result.idempotent_noop,
        })
    }

    fn get_blob_with_options(
        &self,
        blob_id: &str,
        options: BlobReadOptions,
    ) -> Result<Option<BlobGetResult>> {
        let id = to_alloy_id(blob_id);
        match self
            .runtime
            .block_on(self.store.get_blob(&id, to_alloy_read_options(options)))
        {
            Ok(result) => Ok(Some(BlobGetResult {
                data: result.data,
                served_tier: result.served_tier,
                rehydrated_local: result.rehydrated_local,
            })),
            Err(AlloyStorageError::NotFound) => Ok(None),
            Err(err) => Err(map_alloy_error(err)),
        }
    }

    fn has_blob(&self, blob_id: &str) -> Result<bool> {
        let id = to_alloy_id(blob_id);
        self.runtime
            .block_on(self.store.has_blob(&id))
            .map_err(map_alloy_error)
    }

    fn delete_blob(&mut self, blob_id: &str) -> Result<()> {
        let id = to_alloy_id(blob_id);
        self.runtime
            .block_on(self.store.delete_blob(&id))
            .map_err(map_alloy_error)
    }

    fn has_blobs(&self, blob_ids: &[String]) -> Result<Vec<bool>> {
        let ids: Vec<AlloyBlobId> = blob_ids.iter().map(|s| to_alloy_id(s)).collect();
        self.runtime
            .block_on(self.store.has_blobs(&ids))
            .map_err(map_alloy_error)
    }

    fn delete_blobs(&mut self, blob_ids: &[String]) -> Result<usize> {
        let ids: Vec<AlloyBlobId> = blob_ids.iter().map(|s| to_alloy_id(s)).collect();
        self.runtime
            .block_on(self.store.delete_blobs(&ids))
            .map_err(map_alloy_error)
    }

    fn list_prefix(&self, namespace: &str, prefix: &str, limit: usize) -> Result<Vec<String>> {
        let ids = self
            .runtime
            .block_on(
                self.store
                    .list_prefix(namespace.as_bytes(), prefix.as_bytes(), limit),
            )
            .map_err(map_alloy_error)?;
        Ok(ids
            .into_iter()
            .map(|id| String::from_utf8_lossy(id.as_bytes()).into_owned())
            .collect())
    }

    fn delete_prefix(
        &mut self,
        namespace: &str,
        prefix: &str,
        batch_limit: usize,
    ) -> Result<BlobPrefixDeleteResult> {
        let result = self
            .runtime
            .block_on(self.store.delete_prefix(
                namespace.as_bytes(),
                prefix.as_bytes(),
                batch_limit,
            ))
            .map_err(map_alloy_error)?;
        Ok(BlobPrefixDeleteResult {
            deleted: result.deleted,
            truncated: result.truncated,
        })
    }
}