iridium-db 0.4.0

A high-performance vector-graph hybrid storage and indexing engine
use super::support::validate_local_blob_id;
use super::{
    BlobGetResult, BlobPrefixDeleteResult, BlobPutOptions, BlobPutResult, BlobReadOptions,
    BlobStore,
};
use crate::features::storage::api::{Result, StorageError};

#[derive(Debug)]
pub struct LocalBlobStore {
    root: std::path::PathBuf,
}

impl LocalBlobStore {
    pub fn new(root: std::path::PathBuf) -> Result<Self> {
        std::fs::create_dir_all(&root)?;
        Ok(Self { root })
    }

    fn blob_path(&self, blob_id: &str) -> Result<std::path::PathBuf> {
        validate_local_blob_id(blob_id)?;
        Ok(self.root.join(blob_id))
    }

    fn prefixed_ids(&self, namespace: &str, prefix: &str, limit: usize) -> Result<Vec<String>> {
        let combined = format!("{namespace}{prefix}");
        let mut out = Vec::new();
        for entry_res in std::fs::read_dir(&self.root)? {
            let entry = entry_res?;
            let path = entry.path();
            if !path.is_file() {
                continue;
            }
            let Some(name) = entry.file_name().to_str().map(str::to_string) else {
                continue;
            };
            if name.starts_with(&combined) {
                out.push(name);
                if limit > 0 && out.len() >= limit {
                    break;
                }
            }
        }
        out.sort();
        Ok(out)
    }
}

impl BlobStore for LocalBlobStore {
    fn put_blob_with_options(
        &mut self,
        blob_id: &str,
        bytes: &[u8],
        options: BlobPutOptions,
    ) -> Result<BlobPutResult> {
        if options.verify_content_hash {
            return Err(StorageError::InvalidInput(
                "local blob backend does not support verify_content_hash".to_string(),
            ));
        }
        let existing = self.get_blob(blob_id)?;
        if options.deny_if_exists && existing.is_some() {
            return Err(StorageError::InvalidInput(
                "blob already exists and deny_if_exists is enabled".to_string(),
            ));
        }
        if options.idempotent && existing.as_deref() == Some(bytes) {
            return Ok(BlobPutResult {
                inserted: false,
                overwritten: false,
                idempotent_noop: true,
            });
        }

        let path = self.blob_path(blob_id)?;
        std::fs::write(path, bytes)?;
        Ok(BlobPutResult {
            inserted: existing.is_none(),
            overwritten: existing.is_some(),
            idempotent_noop: false,
        })
    }

    fn get_blob_with_options(
        &self,
        blob_id: &str,
        _options: BlobReadOptions,
    ) -> Result<Option<BlobGetResult>> {
        let path = self.blob_path(blob_id)?;
        match std::fs::read(path) {
            Ok(bytes) => Ok(Some(BlobGetResult {
                data: bytes,
                served_tier: Some("local".to_string()),
                rehydrated_local: false,
            })),
            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
            Err(err) => Err(StorageError::Io(err)),
        }
    }

    fn has_blob(&self, blob_id: &str) -> Result<bool> {
        let path = self.blob_path(blob_id)?;
        Ok(path.exists())
    }

    fn delete_blob(&mut self, blob_id: &str) -> Result<()> {
        let path = self.blob_path(blob_id)?;
        match std::fs::remove_file(path) {
            Ok(()) => Ok(()),
            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
            Err(err) => Err(StorageError::Io(err)),
        }
    }

    fn delete_blobs(&mut self, blob_ids: &[String]) -> Result<usize> {
        let mut deleted = 0usize;
        for blob_id in blob_ids {
            if self.has_blob(blob_id)? {
                self.delete_blob(blob_id)?;
                deleted += 1;
            }
        }
        Ok(deleted)
    }

    fn list_prefix(&self, namespace: &str, prefix: &str, limit: usize) -> Result<Vec<String>> {
        self.prefixed_ids(namespace, prefix, limit)
    }

    fn delete_prefix(
        &mut self,
        namespace: &str,
        prefix: &str,
        batch_limit: usize,
    ) -> Result<BlobPrefixDeleteResult> {
        let probe_limit = if batch_limit == 0 {
            0
        } else {
            batch_limit.saturating_add(1)
        };
        let probe = self.prefixed_ids(namespace, prefix, probe_limit)?;
        let truncated = batch_limit > 0 && probe.len() > batch_limit;
        let to_delete = if truncated {
            probe[..batch_limit].to_vec()
        } else {
            probe
        };
        let deleted = self.delete_blobs(&to_delete)?;
        Ok(BlobPrefixDeleteResult { deleted, truncated })
    }
}