pub(crate) mod flow;
pub(crate) mod registry;
pub(crate) mod watch;
pub use watch::{CursorWatcherError, ProjectionWatcher, WatcherError};
use crate::store::platform::fs as platform_fs;
use crate::store::StoreError;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct CacheCapabilities {
pub supports_prefetch: bool,
pub is_noop: bool,
}
impl CacheCapabilities {
pub const fn none() -> Self {
Self {
supports_prefetch: false,
is_noop: false,
}
}
pub const fn prefetch_hints() -> Self {
Self {
supports_prefetch: true,
is_noop: false,
}
}
}
pub trait ProjectionCache: Send + Sync + 'static {
fn capabilities(&self) -> CacheCapabilities;
fn get(&self, key: &[u8]) -> Result<Option<(Vec<u8>, CacheMeta)>, StoreError>;
fn put(&self, key: &[u8], value: &[u8], meta: CacheMeta) -> Result<(), StoreError>;
fn delete_prefix(&self, prefix: &[u8]) -> Result<u64, StoreError>;
fn sync(&self) -> Result<(), StoreError>;
fn prefetch(&self, _key: &[u8], _predicted_meta: CacheMeta) -> Result<(), StoreError> {
Ok(()) }
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CacheMeta {
pub watermark: u64,
pub cached_at_us: i64,
#[serde(default)]
pub cached_at_mono_ns: Option<i64>,
#[serde(default)]
pub process_boot_ns: Option<u64>,
}
const CACHE_META_LEGACY_SIZE: usize = 16;
const CACHE_META_CURRENT_SIZE: usize = 40;
const CACHE_META_MAGIC: u64 = 0xCA_CB_CC_CD_CE_CF_D0_D1;
impl CacheMeta {
pub(crate) fn encode_with_value(&self, value: &[u8]) -> Vec<u8> {
let mut buf = Vec::with_capacity(value.len() + CACHE_META_CURRENT_SIZE);
buf.extend_from_slice(value);
buf.extend_from_slice(&self.watermark.to_le_bytes());
buf.extend_from_slice(&self.cached_at_us.to_le_bytes());
buf.extend_from_slice(&self.cached_at_mono_ns.unwrap_or(0).to_le_bytes());
buf.extend_from_slice(&self.process_boot_ns.unwrap_or(0).to_le_bytes());
buf.extend_from_slice(&CACHE_META_MAGIC.to_le_bytes());
buf
}
pub(crate) fn decode_from_bytes(bytes: &[u8]) -> Result<(Vec<u8>, Self), StoreError> {
if bytes.len() >= CACHE_META_CURRENT_SIZE {
let magic_bytes: [u8; 8] = bytes[bytes.len() - 8..]
.try_into()
.map_err(|_| StoreError::cache_msg("corrupt cache metadata"))?;
if u64::from_le_bytes(magic_bytes) == CACHE_META_MAGIC {
let (value, meta_bytes) = bytes.split_at(bytes.len() - CACHE_META_CURRENT_SIZE);
let watermark = u64::from_le_bytes(
meta_bytes[0..8]
.try_into()
.map_err(|_| StoreError::cache_msg("corrupt cache metadata"))?,
);
let cached_at_us = i64::from_le_bytes(
meta_bytes[8..16]
.try_into()
.map_err(|_| StoreError::cache_msg("corrupt cache metadata"))?,
);
let cached_at_mono_ns = i64::from_le_bytes(
meta_bytes[16..24]
.try_into()
.map_err(|_| StoreError::cache_msg("corrupt cache metadata"))?,
);
let process_boot_ns = u64::from_le_bytes(
meta_bytes[24..32]
.try_into()
.map_err(|_| StoreError::cache_msg("corrupt cache metadata"))?,
);
return Ok((
value.to_vec(),
Self {
watermark,
cached_at_us,
cached_at_mono_ns: Some(cached_at_mono_ns),
process_boot_ns: Some(process_boot_ns),
},
));
}
}
if bytes.len() < CACHE_META_LEGACY_SIZE {
return Err(StoreError::cache_msg("corrupt cache metadata: too short"));
}
let (value, meta_bytes) = bytes.split_at(bytes.len() - CACHE_META_LEGACY_SIZE);
let watermark = u64::from_le_bytes(
meta_bytes[..8]
.try_into()
.map_err(|_| StoreError::cache_msg("corrupt cache metadata"))?,
);
let cached_at_us = i64::from_le_bytes(
meta_bytes[8..16]
.try_into()
.map_err(|_| StoreError::cache_msg("corrupt cache metadata"))?,
);
Ok((
value.to_vec(),
Self {
watermark,
cached_at_us,
cached_at_mono_ns: None,
process_boot_ns: None,
},
))
}
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum Freshness {
Consistent,
MaybeStale {
max_stale_ms: u64,
},
}
pub struct NoCache;
impl ProjectionCache for NoCache {
fn capabilities(&self) -> CacheCapabilities {
CacheCapabilities {
is_noop: true,
..CacheCapabilities::none()
}
}
fn get(&self, _key: &[u8]) -> Result<Option<(Vec<u8>, CacheMeta)>, StoreError> {
Ok(None) }
fn put(&self, _key: &[u8], _value: &[u8], _meta: CacheMeta) -> Result<(), StoreError> {
Ok(()) }
fn delete_prefix(&self, _prefix: &[u8]) -> Result<u64, StoreError> {
Ok(0) }
fn sync(&self) -> Result<(), StoreError> {
Ok(()) }
}
pub struct NativeCache {
root: PathBuf,
}
impl NativeCache {
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self, StoreError> {
let root = path.as_ref().to_path_buf();
platform_fs::reject_cache_symlink_leaf(&root)?;
platform_fs::create_dir_all(&root).map_err(StoreError::cache_error)?;
Ok(Self { root })
}
fn key_path(&self, key: &[u8]) -> (PathBuf, PathBuf) {
let hex = to_hex(key);
let shard = if hex.len() >= 2 { &hex[..2] } else { "00" };
let shard_dir = self.root.join(shard);
let file_path = shard_dir.join(format!("{hex}.bin"));
(shard_dir, file_path)
}
}
impl ProjectionCache for NativeCache {
fn capabilities(&self) -> CacheCapabilities {
CacheCapabilities::none()
}
fn get(&self, key: &[u8]) -> Result<Option<(Vec<u8>, CacheMeta)>, StoreError> {
let (shard, path) = self.key_path(key);
match platform_fs::metadata(&shard) {
Ok(meta) if meta.is_dir() => {}
Ok(_) => {
return Err(StoreError::CacheFailed(Box::new(std::io::Error::other(
format!("cache shard path is not a directory: {}", shard.display()),
))));
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(StoreError::cache_error(e)),
}
match platform_fs::read(&path) {
Ok(bytes) => match CacheMeta::decode_from_bytes(&bytes) {
Ok((value, meta)) => Ok(Some((value, meta))),
Err(_) => {
tracing::warn!("corrupt cache file, deleting: {}", path.display());
platform_fs::remove_file_if_present(&path).map_err(StoreError::cache_error)?;
Ok(None)
}
},
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(StoreError::cache_error(e)),
}
}
fn put(&self, key: &[u8], value: &[u8], meta: CacheMeta) -> Result<(), StoreError> {
let (shard_dir, final_path) = self.key_path(key);
platform_fs::reject_cache_symlink_leaf(&shard_dir)?;
platform_fs::create_dir_all(&shard_dir).map_err(StoreError::cache_error)?;
platform_fs::reject_cache_symlink_leaf(&final_path)?;
let buf = meta.encode_with_value(value);
platform_fs::write_derivative_file_atomically(
&shard_dir,
&final_path,
"projection cache file",
&buf,
)
.map_err(StoreError::cache_error)
}
fn delete_prefix(&self, prefix: &[u8]) -> Result<u64, StoreError> {
let hex_prefix = to_hex(prefix);
let mut count = 0u64;
let entries = match platform_fs::read_dir(&self.root) {
Ok(e) => e,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
Err(e) => return Err(StoreError::cache_error(e)),
};
for dir_entry in entries {
let dir_entry = dir_entry.map_err(StoreError::cache_error)?;
let shard_path = dir_entry.path();
if !shard_path.is_dir() {
continue;
}
if hex_prefix.len() >= 2 {
if let Some(shard_name) = shard_path.file_name().and_then(|n| n.to_str()) {
if !hex_prefix.starts_with(shard_name)
&& !shard_name.starts_with(&hex_prefix[..2])
{
continue;
}
}
}
let shard_entries =
platform_fs::read_dir(&shard_path).map_err(StoreError::cache_error)?;
for file_entry in shard_entries {
let file_entry = file_entry.map_err(StoreError::cache_error)?;
let file_name = file_entry.file_name();
let name = match file_name.to_str() {
Some(n) if n.ends_with(".bin") => &n[..n.len() - 4],
_ => continue,
};
if name.starts_with(&hex_prefix)
&& platform_fs::remove_file_if_present(&file_entry.path())
.map_err(StoreError::cache_error)?
{
count += 1;
}
}
}
Ok(count)
}
fn sync(&self) -> Result<(), StoreError> {
Ok(())
}
}
fn to_hex(bytes: &[u8]) -> String {
bytes.iter().map(|b| format!("{b:02x}")).collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cache_meta_encode_decode_roundtrip() {
let meta = CacheMeta {
watermark: 42,
cached_at_us: 1_700_000_000_000,
cached_at_mono_ns: Some(123_456_789),
process_boot_ns: Some(987_654_321),
};
let value = b"hello world";
let encoded = meta.encode_with_value(value);
let (decoded_value, decoded_meta) =
CacheMeta::decode_from_bytes(&encoded).expect("decode should succeed");
assert_eq!(decoded_value, value);
assert_eq!(decoded_meta.watermark, 42);
assert_eq!(decoded_meta.cached_at_us, 1_700_000_000_000);
assert_eq!(decoded_meta.cached_at_mono_ns, Some(123_456_789));
assert_eq!(decoded_meta.process_boot_ns, Some(987_654_321));
}
#[test]
fn cache_meta_decode_rejects_short_buffer() {
let short = [0u8; 8];
let result = CacheMeta::decode_from_bytes(&short);
assert!(result.is_err());
}
#[test]
fn cache_meta_roundtrip_empty_value() {
let meta = CacheMeta {
watermark: 0,
cached_at_us: 0,
cached_at_mono_ns: Some(0),
process_boot_ns: Some(0),
};
let encoded = meta.encode_with_value(b"");
let (decoded_value, decoded_meta) =
CacheMeta::decode_from_bytes(&encoded).expect("decode should succeed");
assert!(decoded_value.is_empty());
assert_eq!(decoded_meta.watermark, 0);
assert_eq!(decoded_meta.cached_at_us, 0);
assert_eq!(decoded_meta.cached_at_mono_ns, Some(0));
assert_eq!(decoded_meta.process_boot_ns, Some(0));
}
#[test]
fn cache_meta_legacy_trailer_decodes_as_none_mono() {
let mut buf = Vec::new();
buf.extend_from_slice(b"legacy payload");
buf.extend_from_slice(&99u64.to_le_bytes());
buf.extend_from_slice(&1_234_567i64.to_le_bytes());
let (value, meta) =
CacheMeta::decode_from_bytes(&buf).expect("legacy decode should succeed");
assert_eq!(value, b"legacy payload");
assert_eq!(meta.watermark, 99);
assert_eq!(meta.cached_at_us, 1_234_567);
assert!(meta.cached_at_mono_ns.is_none());
assert!(meta.process_boot_ns.is_none());
}
#[test]
fn native_cache_delete_prefix_reports_removed_entries() {
fn meta() -> CacheMeta {
CacheMeta {
watermark: 7,
cached_at_us: 11,
cached_at_mono_ns: Some(13),
process_boot_ns: Some(17),
}
}
let dir = tempfile::tempdir().expect("temp dir");
let cache = NativeCache::open(dir.path()).expect("open cache");
cache
.put(&[0xab, 0x01], b"first", meta())
.expect("put first");
cache
.put(&[0xab, 0x02], b"second", meta())
.expect("put second");
cache
.put(&[0xcd, 0x03], b"third", meta())
.expect("put third");
let removed = cache.delete_prefix(&[0xab]).expect("delete prefix");
assert_eq!(removed, 2);
assert!(cache.get(&[0xab, 0x01]).expect("get first").is_none());
assert!(cache.get(&[0xab, 0x02]).expect("get second").is_none());
assert!(cache.get(&[0xcd, 0x03]).expect("get third").is_some());
}
}