use redb::{Database, ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
use std::path::Path;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum StorageError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("Database error: {0}")]
Db(#[from] redb::DatabaseError),
#[error("Transaction error: {0}")]
Transaction(#[from] Box<redb::TransactionError>),
#[error("Table error: {0}")]
Table(#[from] Box<redb::TableError>),
#[error("Commit error: {0}")]
Commit(#[from] Box<redb::CommitError>),
#[error("Storage error: {0}")]
StorageError(#[from] Box<redb::StorageError>),
#[error("Invalid format: {0}")]
InvalidFormat(String),
}
pub type Result<T> = std::result::Result<T, StorageError>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Record {
pub id: String,
pub vector: Vec<f32>,
pub metadata: Option<serde_json::Value>,
pub timestamp: Option<u64>,
}
const RECORDS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("records");
pub struct Storage {
pub db: Database,
pub records: Vec<Record>,
}
impl Storage {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let db = Database::create(path.as_ref())?;
let write_txn = db
.begin_write()
.map_err(|e| StorageError::Transaction(Box::new(e)))?;
write_txn
.open_table(RECORDS_TABLE)
.map_err(|e| StorageError::Table(Box::new(e)))?;
write_txn
.commit()
.map_err(|e| StorageError::Commit(Box::new(e)))?;
let mut records = Vec::new();
let read_txn = db
.begin_read()
.map_err(|e| StorageError::Transaction(Box::new(e)))?;
if let Ok(table) = read_txn.open_table(RECORDS_TABLE) {
for (_, value) in table
.iter()
.map_err(|e| StorageError::StorageError(Box::new(e)))?
.flatten()
{
let record: Record = serde_json::from_slice(value.value())?;
records.push(record);
}
}
Ok(Self { db, records })
}
pub fn append(&mut self, record: Record) -> Result<()> {
let write_txn = self
.db
.begin_write()
.map_err(|e| StorageError::Transaction(Box::new(e)))?;
{
let mut table = write_txn
.open_table(RECORDS_TABLE)
.map_err(|e| StorageError::Table(Box::new(e)))?;
let value = serde_json::to_vec(&record)?;
table
.insert(record.id.as_str(), value.as_slice())
.map_err(|e| StorageError::StorageError(Box::new(e)))?;
}
write_txn
.commit()
.map_err(|e| StorageError::Commit(Box::new(e)))?;
self.records.push(record);
Ok(())
}
pub fn append_batch(&mut self, records_batch: Vec<Record>) -> Result<()> {
let write_txn = self
.db
.begin_write()
.map_err(|e| StorageError::Transaction(Box::new(e)))?;
{
let mut table = write_txn
.open_table(RECORDS_TABLE)
.map_err(|e| StorageError::Table(Box::new(e)))?;
for record in &records_batch {
let value = serde_json::to_vec(record)?;
table
.insert(record.id.as_str(), value.as_slice())
.map_err(|e| StorageError::StorageError(Box::new(e)))?;
}
}
write_txn
.commit()
.map_err(|e| StorageError::Commit(Box::new(e)))?;
self.records.extend(records_batch);
Ok(())
}
pub fn delete(&mut self, id: &str) -> Result<bool> {
let write_txn = self
.db
.begin_write()
.map_err(|e| StorageError::Transaction(Box::new(e)))?;
let removed = {
let mut table = write_txn
.open_table(RECORDS_TABLE)
.map_err(|e| StorageError::Table(Box::new(e)))?;
let x = table
.remove(id)
.map_err(|e| StorageError::StorageError(Box::new(e)))?
.is_some();
x
};
write_txn
.commit()
.map_err(|e| StorageError::Commit(Box::new(e)))?;
if removed {
self.records.retain(|r| r.id != id);
}
Ok(removed)
}
pub fn stats(&self) -> Result<(usize, usize)> {
Ok((self.records.len(), 0)) }
}