1use redb::{Database, ReadableTable, TableDefinition};
2use serde::{Deserialize, Serialize};
3use std::path::Path;
4use thiserror::Error;
5
6#[derive(Error, Debug)]
7pub enum StorageError {
8 #[error("IO error: {0}")]
9 Io(#[from] std::io::Error),
10 #[error("Serialization error: {0}")]
11 Serde(#[from] serde_json::Error),
12 #[error("Database error: {0}")]
13 Db(#[from] redb::DatabaseError),
14 #[error("Transaction error: {0}")]
15 Transaction(#[from] Box<redb::TransactionError>),
16 #[error("Table error: {0}")]
17 Table(#[from] Box<redb::TableError>),
18 #[error("Commit error: {0}")]
19 Commit(#[from] Box<redb::CommitError>),
20 #[error("Storage error: {0}")]
21 StorageError(#[from] Box<redb::StorageError>),
22 #[error("Invalid format: {0}")]
23 InvalidFormat(String),
24}
25
26pub type Result<T> = std::result::Result<T, StorageError>;
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct Record {
30 pub id: String,
31 pub vector: Vec<f32>,
32 pub metadata: Option<serde_json::Value>,
33 pub timestamp: Option<u64>,
34}
35
36const RECORDS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("records");
37
38pub struct Storage {
39 pub db: Database,
40 pub records: Vec<Record>,
44}
45
46impl Storage {
47 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
48 let db = Database::create(path.as_ref())?;
49
50 let write_txn = db
51 .begin_write()
52 .map_err(|e| StorageError::Transaction(Box::new(e)))?;
53 write_txn
54 .open_table(RECORDS_TABLE)
55 .map_err(|e| StorageError::Table(Box::new(e)))?;
56 write_txn
57 .commit()
58 .map_err(|e| StorageError::Commit(Box::new(e)))?;
59
60 let mut records = Vec::new();
61
62 let read_txn = db
63 .begin_read()
64 .map_err(|e| StorageError::Transaction(Box::new(e)))?;
65 if let Ok(table) = read_txn.open_table(RECORDS_TABLE) {
66 for (_, value) in table
67 .iter()
68 .map_err(|e| StorageError::StorageError(Box::new(e)))?
69 .flatten()
70 {
71 let record: Record = serde_json::from_slice(value.value())?;
72 records.push(record);
73 }
74 }
75
76 Ok(Self { db, records })
77 }
78
79 pub fn append(&mut self, record: Record) -> Result<()> {
80 let write_txn = self
81 .db
82 .begin_write()
83 .map_err(|e| StorageError::Transaction(Box::new(e)))?;
84 {
85 let mut table = write_txn
86 .open_table(RECORDS_TABLE)
87 .map_err(|e| StorageError::Table(Box::new(e)))?;
88 let value = serde_json::to_vec(&record)?;
89 table
90 .insert(record.id.as_str(), value.as_slice())
91 .map_err(|e| StorageError::StorageError(Box::new(e)))?;
92 }
93 write_txn
94 .commit()
95 .map_err(|e| StorageError::Commit(Box::new(e)))?;
96 self.records.push(record);
97 Ok(())
98 }
99
100 pub fn append_batch(&mut self, records_batch: Vec<Record>) -> Result<()> {
101 let write_txn = self
102 .db
103 .begin_write()
104 .map_err(|e| StorageError::Transaction(Box::new(e)))?;
105 {
106 let mut table = write_txn
107 .open_table(RECORDS_TABLE)
108 .map_err(|e| StorageError::Table(Box::new(e)))?;
109 for record in &records_batch {
110 let value = serde_json::to_vec(record)?;
111 table
112 .insert(record.id.as_str(), value.as_slice())
113 .map_err(|e| StorageError::StorageError(Box::new(e)))?;
114 }
115 }
116 write_txn
117 .commit()
118 .map_err(|e| StorageError::Commit(Box::new(e)))?;
119 self.records.extend(records_batch);
120 Ok(())
121 }
122
123 pub fn delete(&mut self, id: &str) -> Result<bool> {
124 let write_txn = self
125 .db
126 .begin_write()
127 .map_err(|e| StorageError::Transaction(Box::new(e)))?;
128 let removed = {
129 let mut table = write_txn
130 .open_table(RECORDS_TABLE)
131 .map_err(|e| StorageError::Table(Box::new(e)))?;
132 let x = table
133 .remove(id)
134 .map_err(|e| StorageError::StorageError(Box::new(e)))?
135 .is_some();
136 x
137 };
138 write_txn
139 .commit()
140 .map_err(|e| StorageError::Commit(Box::new(e)))?;
141
142 if removed {
143 self.records.retain(|r| r.id != id);
144 }
145
146 Ok(removed)
147 }
148
149 pub fn stats(&self) -> Result<(usize, usize)> {
150 Ok((self.records.len(), 0)) }
152}