use crate::store::StoreError;
use serde::{Deserialize, Serialize};
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct CacheCapabilities {
pub supports_prefetch: bool,
}
impl CacheCapabilities {
pub const fn none() -> Self {
Self {
supports_prefetch: false,
}
}
pub const fn prefetch_hints() -> Self {
Self {
supports_prefetch: true,
}
}
}
pub trait ProjectionCache: Send + Sync + 'static {
fn capabilities(&self) -> CacheCapabilities;
fn get(&self, key: &[u8]) -> Result<Option<(Vec<u8>, CacheMeta)>, StoreError>;
fn put(&self, key: &[u8], value: &[u8], meta: CacheMeta) -> Result<(), StoreError>;
fn delete_prefix(&self, prefix: &[u8]) -> Result<u64, StoreError>;
fn sync(&self) -> Result<(), StoreError>;
fn prefetch(&self, _key: &[u8], _predicted_meta: CacheMeta) -> Result<(), StoreError> {
Ok(()) }
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CacheMeta {
pub watermark: u64,
pub cached_at_us: i64,
}
const CACHE_META_SIZE: usize = 16;
impl CacheMeta {
pub(crate) fn encode_with_value(&self, value: &[u8]) -> Vec<u8> {
let mut buf = Vec::with_capacity(value.len() + CACHE_META_SIZE);
buf.extend_from_slice(value);
buf.extend_from_slice(&self.watermark.to_le_bytes());
buf.extend_from_slice(&self.cached_at_us.to_le_bytes());
buf
}
pub(crate) fn decode_from_bytes(bytes: &[u8]) -> Result<(Vec<u8>, Self), StoreError> {
if bytes.len() < CACHE_META_SIZE {
return Err(StoreError::cache_msg("corrupt cache metadata: too short"));
}
let (value, meta_bytes) = bytes.split_at(bytes.len() - CACHE_META_SIZE);
let watermark = u64::from_le_bytes(
meta_bytes[..8]
.try_into()
.map_err(|_| StoreError::cache_msg("corrupt cache metadata"))?,
);
let cached_at_us = i64::from_le_bytes(
meta_bytes[8..16]
.try_into()
.map_err(|_| StoreError::cache_msg("corrupt cache metadata"))?,
);
Ok((
value.to_vec(),
Self {
watermark,
cached_at_us,
},
))
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum Freshness {
Consistent,
BestEffort {
max_stale_ms: u64,
},
}
pub struct NoCache;
impl ProjectionCache for NoCache {
fn capabilities(&self) -> CacheCapabilities {
CacheCapabilities::none()
}
fn get(&self, _key: &[u8]) -> Result<Option<(Vec<u8>, CacheMeta)>, StoreError> {
Ok(None) }
fn put(&self, _key: &[u8], _value: &[u8], _meta: CacheMeta) -> Result<(), StoreError> {
Ok(()) }
fn delete_prefix(&self, _prefix: &[u8]) -> Result<u64, StoreError> {
Ok(0) }
fn sync(&self) -> Result<(), StoreError> {
Ok(()) }
}
#[cfg(feature = "redb")]
pub struct RedbCache {
db: redb::Database,
}
#[cfg(feature = "redb")]
const CACHE_TABLE: redb::TableDefinition<&[u8], &[u8]> =
redb::TableDefinition::new("projection_cache");
#[cfg(feature = "redb")]
impl RedbCache {
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StoreError> {
let db = redb::Database::create(path.as_ref())
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
Ok(Self { db })
}
}
#[cfg(feature = "redb")]
impl ProjectionCache for RedbCache {
fn capabilities(&self) -> CacheCapabilities {
CacheCapabilities::none()
}
fn get(&self, key: &[u8]) -> Result<Option<(Vec<u8>, CacheMeta)>, StoreError> {
let txn = self
.db
.begin_read()
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
let table = txn
.open_table(CACHE_TABLE)
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
match table.get(key) {
Ok(Some(guard)) => {
let bytes = guard.value().to_vec();
let (value, meta) = CacheMeta::decode_from_bytes(&bytes)?;
Ok(Some((value, meta)))
}
Ok(None) => Ok(None),
Err(e) => Err(StoreError::CacheFailed(Box::new(e))),
}
}
fn put(&self, key: &[u8], value: &[u8], meta: CacheMeta) -> Result<(), StoreError> {
let buf = meta.encode_with_value(value);
let txn = self
.db
.begin_write()
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
{
let mut table = txn
.open_table(CACHE_TABLE)
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
table
.insert(key, buf.as_slice())
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
}
txn.commit()
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
Ok(())
}
fn delete_prefix(&self, prefix: &[u8]) -> Result<u64, StoreError> {
use redb::ReadableTable;
let txn = self
.db
.begin_write()
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
let mut count = 0u64;
{
let mut table = txn
.open_table(CACHE_TABLE)
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
let end = prefix_successor(prefix);
let keys: Vec<Vec<u8>> = if let Some(end) = end {
table
.range(prefix..end.as_slice())
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?
.filter_map(|r| match r {
Ok(v) => Some(v),
Err(e) => {
tracing::warn!("cache iteration error (skipping row): {e}");
None
}
})
.map(|(k, _)| k.value().to_vec())
.collect()
} else {
table
.range(prefix..)
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?
.filter_map(|r| match r {
Ok(v) => Some(v),
Err(e) => {
tracing::warn!("cache iteration error (skipping row): {e}");
None
}
})
.map(|(k, _)| k.value().to_vec())
.collect()
};
for key in &keys {
table
.remove(key.as_slice())
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
count += 1;
}
}
txn.commit()
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
Ok(count)
}
fn sync(&self) -> Result<(), StoreError> {
Ok(()) }
}
#[cfg(feature = "lmdb")]
pub struct LmdbCache {
env: heed::Env,
db: heed::Database<heed::types::Bytes, heed::types::Bytes>,
}
#[cfg(feature = "lmdb")]
impl LmdbCache {
pub fn open(path: impl AsRef<std::path::Path>, map_size: usize) -> Result<Self, StoreError> {
std::fs::create_dir_all(path.as_ref()).map_err(StoreError::Io)?;
let env = open_lmdb_env(path.as_ref(), map_size)?;
let mut wtxn = env
.write_txn()
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
let db = env
.create_database(&mut wtxn, Some("projection_cache"))
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
wtxn.commit()
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
Ok(Self { env, db })
}
}
#[cfg(feature = "lmdb")]
impl ProjectionCache for LmdbCache {
fn capabilities(&self) -> CacheCapabilities {
CacheCapabilities::none()
}
fn get(&self, key: &[u8]) -> Result<Option<(Vec<u8>, CacheMeta)>, StoreError> {
let txn = self
.env
.read_txn()
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
match self
.db
.get(&txn, key)
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?
{
Some(bytes) if bytes.len() >= CACHE_META_SIZE => {
let (value, meta) = CacheMeta::decode_from_bytes(bytes)?;
Ok(Some((value, meta)))
}
_ => Ok(None),
}
}
fn put(&self, key: &[u8], value: &[u8], meta: CacheMeta) -> Result<(), StoreError> {
let buf = meta.encode_with_value(value);
let mut txn = self
.env
.write_txn()
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
self.db
.put(&mut txn, key, &buf)
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
txn.commit()
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
Ok(())
}
fn delete_prefix(&self, prefix: &[u8]) -> Result<u64, StoreError> {
let mut txn = self
.env
.write_txn()
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
let mut iter = self
.db
.prefix_iter_mut(&mut txn, prefix)
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
let mut count = 0u64;
while iter
.next()
.transpose()
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?
.is_some()
{
unsafe {
iter.del_current()
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
}
count += 1;
}
drop(iter);
txn.commit()
.map_err(|e| StoreError::CacheFailed(Box::new(e)))?;
Ok(count)
}
fn sync(&self) -> Result<(), StoreError> {
self.env
.force_sync()
.map_err(|e| StoreError::CacheFailed(Box::new(e)))
}
}
#[cfg(feature = "redb")]
fn prefix_successor(prefix: &[u8]) -> Option<Vec<u8>> {
if prefix.is_empty() {
return None; }
let mut end = prefix.to_vec();
for i in (0..end.len()).rev() {
if end[i] < 0xFF {
end[i] += 1;
end.truncate(i + 1);
return Some(end);
}
}
None
}
#[cfg(feature = "lmdb")]
fn open_lmdb_env(path: &std::path::Path, map_size: usize) -> Result<heed::Env, StoreError> {
unsafe {
heed::EnvOpenOptions::new()
.map_size(map_size)
.max_dbs(1)
.open(path)
.map_err(|e| StoreError::CacheFailed(Box::new(e)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cache_meta_encode_decode_roundtrip() {
let meta = CacheMeta {
watermark: 42,
cached_at_us: 1_700_000_000_000,
};
let value = b"hello world";
let encoded = meta.encode_with_value(value);
let (decoded_value, decoded_meta) =
CacheMeta::decode_from_bytes(&encoded).expect("decode should succeed");
assert_eq!(decoded_value, value);
assert_eq!(decoded_meta.watermark, 42);
assert_eq!(decoded_meta.cached_at_us, 1_700_000_000_000);
}
#[test]
fn cache_meta_decode_rejects_short_buffer() {
let short = [0u8; 8];
let result = CacheMeta::decode_from_bytes(&short);
assert!(result.is_err());
}
#[test]
fn cache_meta_roundtrip_empty_value() {
let meta = CacheMeta {
watermark: 0,
cached_at_us: 0,
};
let encoded = meta.encode_with_value(b"");
let (decoded_value, decoded_meta) =
CacheMeta::decode_from_bytes(&encoded).expect("decode should succeed");
assert!(decoded_value.is_empty());
assert_eq!(decoded_meta.watermark, 0);
assert_eq!(decoded_meta.cached_at_us, 0);
}
}