veclite-storage 1.1.1

VecLite embedded vector database component
Documentation
use redb::{Database, ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
use std::path::Path;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum StorageError {
    #[error("IO error: {0}")]
    Io(#[from] std::io::Error),
    #[error("Serialization error: {0}")]
    Serde(#[from] serde_json::Error),
    #[error("Database error: {0}")]
    Db(#[from] redb::DatabaseError),
    #[error("Transaction error: {0}")]
    Transaction(#[from] Box<redb::TransactionError>),
    #[error("Table error: {0}")]
    Table(#[from] Box<redb::TableError>),
    #[error("Commit error: {0}")]
    Commit(#[from] Box<redb::CommitError>),
    #[error("Storage error: {0}")]
    StorageError(#[from] Box<redb::StorageError>),
    #[error("Invalid format: {0}")]
    InvalidFormat(String),
}

pub type Result<T> = std::result::Result<T, StorageError>;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Record {
    pub id: String,
    pub vector: Vec<f32>,
    pub metadata: Option<serde_json::Value>,
    pub timestamp: Option<u64>,
}

const RECORDS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("records");

pub struct Storage {
    pub db: Database,
    // In a real ACID database, we wouldn't cache all records in memory like this.
    // But since the current HNSW index relies on array indices, we need an in-memory
    // vector cache to maintain API compatibility until the index is updated.
    pub records: Vec<Record>,
}

impl Storage {
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
        let db = Database::create(path.as_ref())?;

        let write_txn = db
            .begin_write()
            .map_err(|e| StorageError::Transaction(Box::new(e)))?;
        write_txn
            .open_table(RECORDS_TABLE)
            .map_err(|e| StorageError::Table(Box::new(e)))?;
        write_txn
            .commit()
            .map_err(|e| StorageError::Commit(Box::new(e)))?;

        let mut records = Vec::new();

        let read_txn = db
            .begin_read()
            .map_err(|e| StorageError::Transaction(Box::new(e)))?;
        if let Ok(table) = read_txn.open_table(RECORDS_TABLE) {
            for (_, value) in table
                .iter()
                .map_err(|e| StorageError::StorageError(Box::new(e)))?
                .flatten()
            {
                let record: Record = serde_json::from_slice(value.value())?;
                records.push(record);
            }
        }

        Ok(Self { db, records })
    }

    pub fn append(&mut self, record: Record) -> Result<()> {
        let write_txn = self
            .db
            .begin_write()
            .map_err(|e| StorageError::Transaction(Box::new(e)))?;
        {
            let mut table = write_txn
                .open_table(RECORDS_TABLE)
                .map_err(|e| StorageError::Table(Box::new(e)))?;
            let value = serde_json::to_vec(&record)?;
            table
                .insert(record.id.as_str(), value.as_slice())
                .map_err(|e| StorageError::StorageError(Box::new(e)))?;
        }
        write_txn
            .commit()
            .map_err(|e| StorageError::Commit(Box::new(e)))?;
        self.records.push(record);
        Ok(())
    }

    pub fn append_batch(&mut self, records_batch: Vec<Record>) -> Result<()> {
        let write_txn = self
            .db
            .begin_write()
            .map_err(|e| StorageError::Transaction(Box::new(e)))?;
        {
            let mut table = write_txn
                .open_table(RECORDS_TABLE)
                .map_err(|e| StorageError::Table(Box::new(e)))?;
            for record in &records_batch {
                let value = serde_json::to_vec(record)?;
                table
                    .insert(record.id.as_str(), value.as_slice())
                    .map_err(|e| StorageError::StorageError(Box::new(e)))?;
            }
        }
        write_txn
            .commit()
            .map_err(|e| StorageError::Commit(Box::new(e)))?;
        self.records.extend(records_batch);
        Ok(())
    }

    pub fn delete(&mut self, id: &str) -> Result<bool> {
        let write_txn = self
            .db
            .begin_write()
            .map_err(|e| StorageError::Transaction(Box::new(e)))?;
        let removed = {
            let mut table = write_txn
                .open_table(RECORDS_TABLE)
                .map_err(|e| StorageError::Table(Box::new(e)))?;
            let x = table
                .remove(id)
                .map_err(|e| StorageError::StorageError(Box::new(e)))?
                .is_some();
            x
        };
        write_txn
            .commit()
            .map_err(|e| StorageError::Commit(Box::new(e)))?;

        if removed {
            self.records.retain(|r| r.id != id);
        }

        Ok(removed)
    }

    pub fn stats(&self) -> Result<(usize, usize)> {
        Ok((self.records.len(), 0)) // Size computation deferred
    }
}