veclite-storage 1.1.0

VecLite embedded vector database component
Documentation
use redb::{Database, ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
use std::path::Path;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum StorageError {
    #[error("IO error: {0}")]
    Io(#[from] std::io::Error),
    #[error("Serialization error: {0}")]
    Serde(#[from] serde_json::Error),
    #[error("Database error: {0}")]
    Db(#[from] redb::DatabaseError),
    #[error("Transaction error: {0}")]
    Transaction(#[from] Box<redb::TransactionError>),
    #[error("Table error: {0}")]
    Table(#[from] Box<redb::TableError>),
    #[error("Commit error: {0}")]
    Commit(#[from] Box<redb::CommitError>),
    #[error("Storage error: {0}")]
    StorageError(#[from] Box<redb::StorageError>),
    #[error("Invalid format: {0}")]
    InvalidFormat(String),
}

pub type Result<T> = std::result::Result<T, StorageError>;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Record {
    pub id: String,
    pub vector: Vec<f32>,
    pub metadata: Option<serde_json::Value>,
    pub timestamp: Option<u64>,
}

const RECORDS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("records");

pub struct Storage {
    pub db: Database,
    // In a real ACID database, we wouldn't cache all records in memory like this.
    // But since the current HNSW index relies on array indices, we need an in-memory
    // vector cache to maintain API compatibility until the index is updated.
    pub records: Vec<Record>,
}

impl Storage {
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
        let db = Database::create(path.as_ref())?;

        let write_txn = db.begin_write().map_err(|e| StorageError::Transaction(Box::new(e)))?;
        write_txn.open_table(RECORDS_TABLE).map_err(|e| StorageError::Table(Box::new(e)))?;
        write_txn.commit().map_err(|e| StorageError::Commit(Box::new(e)))?;

        let mut records = Vec::new();

        let read_txn = db.begin_read().map_err(|e| StorageError::Transaction(Box::new(e)))?;
        if let Ok(table) = read_txn.open_table(RECORDS_TABLE) {
            for (_, value) in table.iter().map_err(|e| StorageError::StorageError(Box::new(e)))?.flatten() {
                let record: Record = serde_json::from_slice(value.value())?;
                records.push(record);
            }
        }

        Ok(Self { db, records })
    }

    pub fn append(&mut self, record: Record) -> Result<()> {
        let write_txn = self.db.begin_write().map_err(|e| StorageError::Transaction(Box::new(e)))?;
        {
            let mut table = write_txn.open_table(RECORDS_TABLE).map_err(|e| StorageError::Table(Box::new(e)))?;
            let value = serde_json::to_vec(&record)?;
            table.insert(record.id.as_str(), value.as_slice()).map_err(|e| StorageError::StorageError(Box::new(e)))?;
        }
        write_txn.commit().map_err(|e| StorageError::Commit(Box::new(e)))?;
        self.records.push(record);
        Ok(())
    }

    pub fn append_batch(&mut self, records_batch: Vec<Record>) -> Result<()> {
        let write_txn = self.db.begin_write().map_err(|e| StorageError::Transaction(Box::new(e)))?;
        {
            let mut table = write_txn.open_table(RECORDS_TABLE).map_err(|e| StorageError::Table(Box::new(e)))?;
            for record in &records_batch {
                let value = serde_json::to_vec(record)?;
                table.insert(record.id.as_str(), value.as_slice()).map_err(|e| StorageError::StorageError(Box::new(e)))?;
            }
        }
        write_txn.commit().map_err(|e| StorageError::Commit(Box::new(e)))?;
        self.records.extend(records_batch);
        Ok(())
    }

    pub fn delete(&mut self, id: &str) -> Result<bool> {
        let write_txn = self.db.begin_write().map_err(|e| StorageError::Transaction(Box::new(e)))?;
        let removed = {
            let mut table = write_txn.open_table(RECORDS_TABLE).map_err(|e| StorageError::Table(Box::new(e)))?;
            let x = table.remove(id).map_err(|e| StorageError::StorageError(Box::new(e)))?.is_some();
            x
        };
        write_txn.commit().map_err(|e| StorageError::Commit(Box::new(e)))?;

        if removed {
            self.records.retain(|r| r.id != id);
        }

        Ok(removed)
    }

    pub fn stats(&self) -> Result<(usize, usize)> {
        Ok((self.records.len(), 0)) // Size computation deferred
    }
}