use crate::store::StoreError;
use serde::{Deserialize, Serialize};
pub trait ProjectionCache: Send + Sync + 'static {
fn get(&self, key: &[u8]) -> Result<Option<(Vec<u8>, CacheMeta)>, StoreError>;
fn put(&self, key: &[u8], value: &[u8], meta: CacheMeta) -> Result<(), StoreError>;
fn delete_prefix(&self, prefix: &[u8]) -> Result<u64, StoreError>;
fn sync(&self) -> Result<(), StoreError>;
fn prefetch(&self, _key: &[u8], _predicted_meta: CacheMeta) -> Result<(), StoreError> {
Ok(()) }
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CacheMeta {
pub watermark: u64,
pub cached_at_us: i64,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum Freshness {
Consistent,
BestEffort { max_stale_ms: u64 },
}
pub struct NoCache;
impl ProjectionCache for NoCache {
fn get(&self, _key: &[u8]) -> Result<Option<(Vec<u8>, CacheMeta)>, StoreError> {
Ok(None) }
fn put(&self, _key: &[u8], _value: &[u8], _meta: CacheMeta) -> Result<(), StoreError> {
Ok(()) }
fn delete_prefix(&self, _prefix: &[u8]) -> Result<u64, StoreError> {
Ok(0) }
fn sync(&self) -> Result<(), StoreError> {
Ok(()) }
}
#[cfg(feature = "redb")]
pub struct RedbCache {
db: redb::Database,
}
#[cfg(feature = "redb")]
const CACHE_TABLE: redb::TableDefinition<&[u8], &[u8]> =
redb::TableDefinition::new("projection_cache");
#[cfg(feature = "redb")]
impl RedbCache {
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StoreError> {
let db = redb::Database::create(path.as_ref())
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
Ok(Self { db })
}
}
#[cfg(feature = "redb")]
impl ProjectionCache for RedbCache {
fn get(&self, key: &[u8]) -> Result<Option<(Vec<u8>, CacheMeta)>, StoreError> {
let txn = self
.db
.begin_read()
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
let table = txn
.open_table(CACHE_TABLE)
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
match table.get(key) {
Ok(Some(guard)) => {
let bytes = guard.value().to_vec();
if bytes.len() < 16 {
return Ok(None);
}
let (value, meta_bytes) = bytes.split_at(bytes.len() - 16);
let watermark = u64::from_le_bytes(
meta_bytes[..8]
.try_into()
.map_err(|_| StoreError::CacheFailed("corrupt cache metadata".into()))?,
);
let cached_at_us = i64::from_le_bytes(
meta_bytes[8..16]
.try_into()
.map_err(|_| StoreError::CacheFailed("corrupt cache metadata".into()))?,
);
Ok(Some((
value.to_vec(),
CacheMeta {
watermark,
cached_at_us,
},
)))
}
Ok(None) => Ok(None),
Err(e) => Err(StoreError::CacheFailed(e.to_string())),
}
}
fn put(&self, key: &[u8], value: &[u8], meta: CacheMeta) -> Result<(), StoreError> {
let mut buf = Vec::with_capacity(value.len() + 16);
buf.extend_from_slice(value);
buf.extend_from_slice(&meta.watermark.to_le_bytes());
buf.extend_from_slice(&meta.cached_at_us.to_le_bytes());
let txn = self
.db
.begin_write()
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
{
let mut table = txn
.open_table(CACHE_TABLE)
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
table
.insert(key, buf.as_slice())
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
}
txn.commit()
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
Ok(())
}
fn delete_prefix(&self, prefix: &[u8]) -> Result<u64, StoreError> {
use redb::ReadableTable;
let txn = self
.db
.begin_write()
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
let mut count = 0u64;
{
let mut table = txn
.open_table(CACHE_TABLE)
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
let end = prefix_successor(prefix);
let keys: Vec<Vec<u8>> = table
.range(prefix..end.as_slice())
.map_err(|e| StoreError::CacheFailed(e.to_string()))?
.filter_map(|r| match r {
Ok(v) => Some(v),
Err(e) => {
tracing::warn!("cache iteration error (skipping row): {e}");
None
}
})
.map(|(k, _)| k.value().to_vec())
.collect();
for key in &keys {
table
.remove(key.as_slice())
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
count += 1;
}
}
txn.commit()
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
Ok(count)
}
fn sync(&self) -> Result<(), StoreError> {
Ok(()) }
}
#[cfg(feature = "lmdb")]
pub struct LmdbCache {
env: heed::Env,
db: heed::Database<heed::types::Bytes, heed::types::Bytes>,
}
#[cfg(feature = "lmdb")]
impl LmdbCache {
pub fn open(path: impl AsRef<std::path::Path>, map_size: usize) -> Result<Self, StoreError> {
std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
let env = unsafe {
heed::EnvOpenOptions::new()
.map_size(map_size)
.max_dbs(1)
.open(path.as_ref())
.map_err(|e| StoreError::CacheFailed(e.to_string()))?
};
let mut wtxn = env
.write_txn()
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
let db = env
.create_database(&mut wtxn, Some("projection_cache"))
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
wtxn.commit()
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
Ok(Self { env, db })
}
}
#[cfg(feature = "lmdb")]
impl ProjectionCache for LmdbCache {
fn get(&self, key: &[u8]) -> Result<Option<(Vec<u8>, CacheMeta)>, StoreError> {
let txn = self
.env
.read_txn()
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
match self
.db
.get(&txn, key)
.map_err(|e| StoreError::CacheFailed(e.to_string()))?
{
Some(bytes) if bytes.len() >= 16 => {
let (value, meta_bytes) = bytes.split_at(bytes.len() - 16);
let watermark = u64::from_le_bytes(
meta_bytes[..8]
.try_into()
.map_err(|_| StoreError::CacheFailed("corrupt cache metadata".into()))?,
);
let cached_at_us = i64::from_le_bytes(
meta_bytes[8..16]
.try_into()
.map_err(|_| StoreError::CacheFailed("corrupt cache metadata".into()))?,
);
Ok(Some((
value.to_vec(),
CacheMeta {
watermark,
cached_at_us,
},
)))
}
_ => Ok(None),
}
}
fn put(&self, key: &[u8], value: &[u8], meta: CacheMeta) -> Result<(), StoreError> {
let mut buf = Vec::with_capacity(value.len() + 16);
buf.extend_from_slice(value);
buf.extend_from_slice(&meta.watermark.to_le_bytes());
buf.extend_from_slice(&meta.cached_at_us.to_le_bytes());
let mut txn = self
.env
.write_txn()
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
self.db
.put(&mut txn, key, &buf)
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
txn.commit()
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
Ok(())
}
fn delete_prefix(&self, prefix: &[u8]) -> Result<u64, StoreError> {
let mut txn = self
.env
.write_txn()
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
let mut iter = self
.db
.prefix_iter_mut(&mut txn, prefix)
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
let mut count = 0u64;
while iter
.next()
.transpose()
.map_err(|e| StoreError::CacheFailed(e.to_string()))?
.is_some()
{
unsafe {
iter.del_current()
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
}
count += 1;
}
drop(iter);
txn.commit()
.map_err(|e| StoreError::CacheFailed(e.to_string()))?;
Ok(count)
}
fn sync(&self) -> Result<(), StoreError> {
self.env
.force_sync()
.map_err(|e| StoreError::CacheFailed(e.to_string()))
}
}
#[cfg(feature = "redb")]
fn prefix_successor(prefix: &[u8]) -> Vec<u8> {
let mut end = prefix.to_vec();
for i in (0..end.len()).rev() {
if end[i] < 0xFF {
end[i] += 1;
end.truncate(i + 1);
return end;
}
}
end.push(0x00);
end
}