Skip to main content

veclite_storage/
lib.rs

1use redb::{Database, ReadableTable, TableDefinition};
2use serde::{Deserialize, Serialize};
3use std::path::Path;
4use thiserror::Error;
5
6#[derive(Error, Debug)]
7pub enum StorageError {
8    #[error("IO error: {0}")]
9    Io(#[from] std::io::Error),
10    #[error("Serialization error: {0}")]
11    Serde(#[from] serde_json::Error),
12    #[error("Database error: {0}")]
13    Db(#[from] redb::DatabaseError),
14    #[error("Transaction error: {0}")]
15    Transaction(#[from] Box<redb::TransactionError>),
16    #[error("Table error: {0}")]
17    Table(#[from] Box<redb::TableError>),
18    #[error("Commit error: {0}")]
19    Commit(#[from] Box<redb::CommitError>),
20    #[error("Storage error: {0}")]
21    StorageError(#[from] Box<redb::StorageError>),
22    #[error("Invalid format: {0}")]
23    InvalidFormat(String),
24}
25
26pub type Result<T> = std::result::Result<T, StorageError>;
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct Record {
30    pub id: String,
31    pub vector: Vec<f32>,
32    pub metadata: Option<serde_json::Value>,
33    pub timestamp: Option<u64>,
34}
35
36const RECORDS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("records");
37
38pub struct Storage {
39    pub db: Database,
40    // In a real ACID database, we wouldn't cache all records in memory like this.
41    // But since the current HNSW index relies on array indices, we need an in-memory
42    // vector cache to maintain API compatibility until the index is updated.
43    pub records: Vec<Record>,
44}
45
46impl Storage {
47    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
48        let db = Database::create(path.as_ref())?;
49
50        let write_txn = db
51            .begin_write()
52            .map_err(|e| StorageError::Transaction(Box::new(e)))?;
53        write_txn
54            .open_table(RECORDS_TABLE)
55            .map_err(|e| StorageError::Table(Box::new(e)))?;
56        write_txn
57            .commit()
58            .map_err(|e| StorageError::Commit(Box::new(e)))?;
59
60        let mut records = Vec::new();
61
62        let read_txn = db
63            .begin_read()
64            .map_err(|e| StorageError::Transaction(Box::new(e)))?;
65        if let Ok(table) = read_txn.open_table(RECORDS_TABLE) {
66            for (_, value) in table
67                .iter()
68                .map_err(|e| StorageError::StorageError(Box::new(e)))?
69                .flatten()
70            {
71                let record: Record = serde_json::from_slice(value.value())?;
72                records.push(record);
73            }
74        }
75
76        Ok(Self { db, records })
77    }
78
79    pub fn append(&mut self, record: Record) -> Result<()> {
80        let write_txn = self
81            .db
82            .begin_write()
83            .map_err(|e| StorageError::Transaction(Box::new(e)))?;
84        {
85            let mut table = write_txn
86                .open_table(RECORDS_TABLE)
87                .map_err(|e| StorageError::Table(Box::new(e)))?;
88            let value = serde_json::to_vec(&record)?;
89            table
90                .insert(record.id.as_str(), value.as_slice())
91                .map_err(|e| StorageError::StorageError(Box::new(e)))?;
92        }
93        write_txn
94            .commit()
95            .map_err(|e| StorageError::Commit(Box::new(e)))?;
96        self.records.push(record);
97        Ok(())
98    }
99
100    pub fn append_batch(&mut self, records_batch: Vec<Record>) -> Result<()> {
101        let write_txn = self
102            .db
103            .begin_write()
104            .map_err(|e| StorageError::Transaction(Box::new(e)))?;
105        {
106            let mut table = write_txn
107                .open_table(RECORDS_TABLE)
108                .map_err(|e| StorageError::Table(Box::new(e)))?;
109            for record in &records_batch {
110                let value = serde_json::to_vec(record)?;
111                table
112                    .insert(record.id.as_str(), value.as_slice())
113                    .map_err(|e| StorageError::StorageError(Box::new(e)))?;
114            }
115        }
116        write_txn
117            .commit()
118            .map_err(|e| StorageError::Commit(Box::new(e)))?;
119        self.records.extend(records_batch);
120        Ok(())
121    }
122
123    pub fn delete(&mut self, id: &str) -> Result<bool> {
124        let write_txn = self
125            .db
126            .begin_write()
127            .map_err(|e| StorageError::Transaction(Box::new(e)))?;
128        let removed = {
129            let mut table = write_txn
130                .open_table(RECORDS_TABLE)
131                .map_err(|e| StorageError::Table(Box::new(e)))?;
132            let x = table
133                .remove(id)
134                .map_err(|e| StorageError::StorageError(Box::new(e)))?
135                .is_some();
136            x
137        };
138        write_txn
139            .commit()
140            .map_err(|e| StorageError::Commit(Box::new(e)))?;
141
142        if removed {
143            self.records.retain(|r| r.id != id);
144        }
145
146        Ok(removed)
147    }
148
149    pub fn stats(&self) -> Result<(usize, usize)> {
150        Ok((self.records.len(), 0)) // Size computation deferred
151    }
152}