use crate::DbContext;
use anyhow::{Context, Result};
use rocksdb::IteratorMode;
use serde::{de::DeserializeOwned, Serialize};
fn serialize_to_bytes<T: Serialize>(value: &T) -> Result<Vec<u8>> {
serde_json::to_vec(value).context("Failed to serialize value")
}
fn deserialize_from_bytes<T: DeserializeOwned>(bytes: &[u8]) -> Result<T> {
serde_json::from_slice(bytes).context("Failed to deserialize value")
}
pub trait Keyable: serde::Serialize + serde::de::DeserializeOwned {
fn key(&self) -> String;
fn column_family() -> &'static str;
}
pub struct ColumnFamily<T: Keyable> {
_phantom: std::marker::PhantomData<T>,
}
impl<T: Keyable> Default for ColumnFamily<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Keyable> ColumnFamily<T> {
pub fn new() -> Self {
Self {
_phantom: std::marker::PhantomData,
}
}
pub fn get_all(&self) -> Result<Vec<T>> {
let cf_handle = DbContext::get_instance()
.db
.cf_handle(T::column_family())
.context(format!(
"Failed to get {} column family handle",
T::column_family()
))?;
let mut items = Vec::new();
let iter = DbContext::get_instance()
.db
.iterator_cf(&cf_handle, IteratorMode::Start);
for item in iter {
let (_key, value) = item.context("Failed to read database entry")?;
let item: T = deserialize_from_bytes(&value)?;
items.push(item);
}
Ok(items)
}
pub fn get(&self, key: &str) -> Result<Option<T>> {
let cf_handle = DbContext::get_instance()
.db
.cf_handle(T::column_family())
.context(format!(
"Failed to get {} column family handle",
T::column_family()
))?;
match DbContext::get_instance()
.db
.get_cf(&cf_handle, key)
.context("Failed to read database entry")?
{
Some(value) => {
let item: T = deserialize_from_bytes(&value)?;
Ok(Some(item))
}
None => Ok(None),
}
}
pub fn del(&self, key: &str) -> Result<()> {
let cf_handle = DbContext::get_instance()
.db
.cf_handle(T::column_family())
.context(format!(
"Failed to get {} column family handle",
T::column_family()
))?;
DbContext::get_instance()
.db
.delete_cf(&cf_handle, key)
.context("Failed to delete item")
}
pub fn set(&self, item: &T) -> Result<()> {
let cf_handle = DbContext::get_instance()
.db
.cf_handle(T::column_family())
.context(format!(
"Failed to get {} column family handle",
T::column_family()
))?;
let key = item.key();
let value = serialize_to_bytes(item)?;
DbContext::get_instance()
.db
.put_cf(&cf_handle, key, value)
.context("Failed to write item to database")
}
pub fn count_all(&self) -> Result<usize> {
let cf_handle = DbContext::get_instance()
.db
.cf_handle(T::column_family())
.context(format!(
"Failed to get {} column family handle",
T::column_family()
))?;
let iter = DbContext::get_instance()
.db
.iterator_cf(&cf_handle, IteratorMode::Start);
let mut count = 0;
for item in iter {
item.context("Failed to read database entry")?;
count += 1;
}
Ok(count)
}
pub fn keep_size(&self, size: usize) -> Result<()> {
let current_count = self.count_all()?;
if current_count <= size {
return Ok(());
}
let cf_handle = DbContext::get_instance()
.db
.cf_handle(T::column_family())
.context(format!(
"Failed to get {} column family handle",
T::column_family()
))?;
let db = &DbContext::get_instance().db;
let mut batch = rocksdb::WriteBatch::default();
let iter = db.iterator_cf(&cf_handle, IteratorMode::Start);
let mut keys_to_delete = Vec::new();
let items_to_keep = current_count - size;
for (i, item) in iter.enumerate() {
let (key, _) = item.context("Failed to read database entry")?;
if i < items_to_keep {
keys_to_delete.push(key.to_vec());
} else {
break;
}
}
for key in keys_to_delete {
batch.delete_cf(&cf_handle, &key);
}
db.write(batch)
.context("Failed to execute batch delete operation")?;
Ok(())
}
pub fn filter_by_time_index(&self, start_time: u64, end_time: u64) -> Result<Vec<T>> {
let cf_handle = DbContext::get_instance()
.db
.cf_handle(T::column_family())
.context(format!(
"Failed to get {} column family handle",
T::column_family()
))?;
let max_timestamp = i64::MAX as u64;
let start_key = (max_timestamp - end_time).to_string();
let end_key = (max_timestamp - start_time).to_string();
let mut items = Vec::new();
let iter = DbContext::get_instance()
.db
.iterator_cf(&cf_handle, IteratorMode::Start);
for item in iter {
let (key, value) = item.context("Failed to read database entry")?;
let key_str = String::from_utf8(key.to_vec()).context("Invalid UTF-8 in key")?;
if key_str >= start_key && key_str <= end_key {
let item: T = deserialize_from_bytes(&value)?;
items.push(item);
}
}
Ok(items)
}
}