use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock};
use std::time::Duration;
use dashmap::DashMap;
use rusqlite::{ffi::ErrorCode, Connection, OpenFlags};
use crate::world;
pub(crate) const DEFAULT_READ_CACHE_MAX_ENTRIES: usize = 5000;
const READ_CONN_BUSY_TIMEOUT_MS: u64 = 1000;
pub(crate) struct TrackedReadConnection(Connection);
impl TrackedReadConnection {
fn from_raw(conn: Connection) -> Self {
Self(conn)
}
pub(crate) fn as_mut_conn(&mut self) -> &mut Connection {
&mut self.0
}
}
enum SlotState {
Opening,
Ready(StdMutex<TrackedReadConnection>),
Tombstone,
}
struct ReadSlot {
inner: StdRwLock<SlotState>,
}
struct OpeningTransition<'a> {
state: &'a mut SlotState,
finalized: bool,
}
impl<'a> OpeningTransition<'a> {
fn new(state: &'a mut SlotState) -> Self {
Self {
state,
finalized: false,
}
}
fn promote(mut self, conn: Connection) {
let tracked = TrackedReadConnection::from_raw(conn);
*self.state = SlotState::Ready(StdMutex::new(tracked));
self.finalized = true;
}
fn fail(mut self) {
*self.state = SlotState::Tombstone;
self.finalized = true;
}
}
impl<'a> Drop for OpeningTransition<'a> {
fn drop(&mut self) {
if !self.finalized {
*self.state = SlotState::Tombstone;
}
}
}
#[cfg(test)]
pub(crate) fn test_only_wrap_raw_connection(conn: Connection) -> TrackedReadConnection {
TrackedReadConnection::from_raw(conn)
}
#[derive(Default)]
pub(crate) struct ReadCacheMetrics {
pub(crate) read_cache_hits: AtomicUsize,
pub(crate) read_cache_misses: AtomicUsize,
pub(crate) read_cache_capped: AtomicUsize,
pub(crate) read_cache_open_fails: AtomicUsize,
}
pub(crate) struct ReadCache {
read_conns: DashMap<String, Arc<ReadSlot>>,
pub(crate) max_entries: usize,
pub(crate) metrics: ReadCacheMetrics,
}
impl ReadCache {
pub(crate) fn new(max_entries: usize) -> Self {
Self {
read_conns: DashMap::new(),
max_entries,
metrics: ReadCacheMetrics::default(),
}
}
pub(crate) fn cached_read_with_hmac(
&self,
data: &std::path::Path,
world: &str,
) -> rusqlite::Result<Option<(world::Stage, Option<String>)>> {
self.with_tracked_conn(data, world, world::read_with_hmac_via_conn)
}
pub(crate) fn cached_verify_chain(
&self,
data: &std::path::Path,
world: &str,
key: &[u8],
) -> rusqlite::Result<Option<crate::audit::VerifyReport>> {
let key = key.to_vec();
self.with_tracked_conn(data, world, move |conn| {
crate::audit::verify_chain_via_conn(conn, &key)
})
}
fn with_tracked_conn<F, R>(
&self,
data: &std::path::Path,
world: &str,
f: F,
) -> rusqlite::Result<Option<R>>
where
F: FnOnce(&mut TrackedReadConnection) -> rusqlite::Result<R>,
{
let path = world::world_db(data, world);
if let Some(arc) = self.read_conns.get(world).map(|e| e.value().clone()) {
self.metrics.read_cache_hits.fetch_add(1, Ordering::Relaxed);
return self.invoke_via_slot(arc, f);
}
self.metrics
.read_cache_misses
.fetch_add(1, Ordering::Relaxed);
if self.read_conns.len() >= self.max_entries {
self.metrics
.read_cache_capped
.fetch_add(1, Ordering::Relaxed);
return self.invoke_transient(&path, world, f);
}
match path.try_exists() {
Ok(false) => return Ok(None),
Ok(true) => {}
Err(_) => {
}
}
let new_slot = Arc::new(ReadSlot {
inner: StdRwLock::new(SlotState::Opening),
});
let arc = self
.read_conns
.entry(world.to_string())
.or_insert_with(|| new_slot.clone())
.value()
.clone();
let we_own_slot = Arc::ptr_eq(&arc, &new_slot);
if we_own_slot {
let mut g = arc.inner.write().unwrap_or_else(|p| p.into_inner());
if matches!(&*g, SlotState::Opening) {
let transition = OpeningTransition::new(&mut g);
let init: rusqlite::Result<Connection> = (|| {
let c = Connection::open_with_flags(&path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
c.busy_timeout(Duration::from_millis(READ_CONN_BUSY_TIMEOUT_MS))?;
c.pragma_update(None, "cache_size", -200)?;
Ok(c)
})();
match init {
Ok(c) => transition.promote(c),
Err(e) => {
transition.fail();
drop(g);
self.read_conns
.remove_if(world, |_k, v| Arc::ptr_eq(v, &arc));
self.metrics
.read_cache_open_fails
.fetch_add(1, Ordering::Relaxed);
if matches!(e.sqlite_error_code(), Some(ErrorCode::CannotOpen))
&& matches!(path.try_exists(), Ok(false))
{
return Ok(None);
}
return Err(e);
}
}
}
}
self.invoke_via_slot(arc, f)
}
fn invoke_transient<F, R>(
&self,
path: &std::path::Path,
world: &str,
f: F,
) -> rusqlite::Result<Option<R>>
where
F: FnOnce(&mut TrackedReadConnection) -> rusqlite::Result<R>,
{
if let Some(arc) = self.read_conns.get(world).map(|e| e.value().clone()) {
return self.invoke_via_slot(arc, f);
}
let transient_slot = Arc::new(ReadSlot {
inner: StdRwLock::new(SlotState::Opening),
});
let arc = self
.read_conns
.entry(world.to_string())
.or_insert_with(|| transient_slot.clone())
.value()
.clone();
let we_own_slot = Arc::ptr_eq(&arc, &transient_slot);
if we_own_slot {
let mut g = arc.inner.write().unwrap_or_else(|p| p.into_inner());
if matches!(&*g, SlotState::Opening) {
let transition = OpeningTransition::new(&mut g);
let init: rusqlite::Result<Connection> = (|| {
let c = Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
c.busy_timeout(Duration::from_millis(READ_CONN_BUSY_TIMEOUT_MS))?;
c.pragma_update(None, "cache_size", -200)?;
Ok(c)
})();
match init {
Ok(c) => transition.promote(c),
Err(e) => {
transition.fail();
drop(g);
self.read_conns
.remove_if(world, |_k, v| Arc::ptr_eq(v, &arc));
self.metrics
.read_cache_open_fails
.fetch_add(1, Ordering::Relaxed);
if matches!(e.sqlite_error_code(), Some(ErrorCode::CannotOpen))
&& matches!(path.try_exists(), Ok(false))
{
return Ok(None);
}
return Err(e);
}
}
}
}
let result = self.invoke_via_slot(arc.clone(), f);
if we_own_slot {
{
let mut g = arc.inner.write().unwrap_or_else(|p| p.into_inner());
let old = std::mem::replace(&mut *g, SlotState::Tombstone);
drop(old);
drop(g);
}
self.read_conns
.remove_if(world, |_k, v| Arc::ptr_eq(v, &arc));
}
result
}
fn invoke_via_slot<F, R>(&self, arc: Arc<ReadSlot>, f: F) -> rusqlite::Result<Option<R>>
where
F: FnOnce(&mut TrackedReadConnection) -> rusqlite::Result<R>,
{
let read_guard = arc.inner.read().unwrap_or_else(|p| p.into_inner());
match &*read_guard {
SlotState::Ready(tracked_mutex) => {
let mut tracked = tracked_mutex.lock().unwrap_or_else(|p| p.into_inner());
f(&mut tracked).map(Some)
}
SlotState::Tombstone => Ok(None),
SlotState::Opening => Ok(None),
}
}
pub(crate) fn install_tombstone_blocking(&self, world: &str) {
let new_tombstone = Arc::new(ReadSlot {
inner: StdRwLock::new(SlotState::Tombstone),
});
let prev = self.read_conns.insert(world.to_string(), new_tombstone);
if let Some(prev_slot) = prev {
let mut g = prev_slot.inner.write().unwrap_or_else(|p| p.into_inner());
let old = std::mem::replace(&mut *g, SlotState::Tombstone);
drop(old);
drop(g);
}
}
pub(crate) fn clear_tombstone(&self, world: &str) {
self.read_conns.remove(world);
}
#[allow(dead_code)]
pub(crate) fn snapshot_entries(&self) -> usize {
self.read_conns.len()
}
#[allow(dead_code)]
pub(crate) fn snapshot_tombstones(&self) -> usize {
self.read_conns
.iter()
.filter(|e| {
e.value()
.inner
.try_read()
.map(|g| matches!(*g, SlotState::Tombstone))
.unwrap_or(false)
})
.count()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
fn scratch_dir(label: &str) -> PathBuf {
let mut d = std::env::temp_dir();
d.push(format!(
"elastik-readcache-test-{label}-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
std::fs::create_dir_all(&d).unwrap();
d
}
#[test]
fn cache_hit_on_second_read() {
let dir = scratch_dir("hit");
let world = "home/cache-hit";
let _c = world::open(&dir, world).unwrap();
world::write(&dir, world, b"hello", "text/plain", &[]).unwrap();
let cache = ReadCache::new(DEFAULT_READ_CACHE_MAX_ENTRIES);
let r1 = cache.cached_read_with_hmac(&dir, world).unwrap();
assert!(r1.is_some());
assert_eq!(cache.metrics.read_cache_hits.load(Ordering::Relaxed), 0);
assert_eq!(cache.metrics.read_cache_misses.load(Ordering::Relaxed), 1);
let r2 = cache.cached_read_with_hmac(&dir, world).unwrap();
assert!(r2.is_some());
assert_eq!(cache.metrics.read_cache_hits.load(Ordering::Relaxed), 1);
assert_eq!(cache.metrics.read_cache_misses.load(Ordering::Relaxed), 1);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn missing_world_returns_none_via_phase3() {
let dir = scratch_dir("missing");
let cache = ReadCache::new(DEFAULT_READ_CACHE_MAX_ENTRIES);
let r = cache.cached_read_with_hmac(&dir, "home/none").unwrap();
assert!(r.is_none());
assert!(cache.read_conns.get("home/none").is_none());
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn install_tombstone_short_circuits_read_to_404() {
let dir = scratch_dir("tombstone");
let world = "home/tomb";
let _c = world::open(&dir, world).unwrap();
world::write(&dir, world, b"x", "text/plain", &[]).unwrap();
let cache = ReadCache::new(DEFAULT_READ_CACHE_MAX_ENTRIES);
let _ = cache.cached_read_with_hmac(&dir, world).unwrap();
cache.install_tombstone_blocking(world);
let r = cache.cached_read_with_hmac(&dir, world).unwrap();
assert!(r.is_none());
cache.clear_tombstone(world);
let r2 = cache.cached_read_with_hmac(&dir, world).unwrap();
assert!(r2.is_some());
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn cap_uses_transient_slot_then_drains_and_removes() {
let dir = scratch_dir("cap-transient");
for w in ["home/a", "home/b", "home/c"] {
let _c = world::open(&dir, w).unwrap();
world::write(&dir, w, b"x", "text/plain", &[]).unwrap();
}
let cache = ReadCache::new(2);
let _ = cache.cached_read_with_hmac(&dir, "home/a").unwrap();
let _ = cache.cached_read_with_hmac(&dir, "home/b").unwrap();
assert_eq!(cache.read_conns.len(), 2);
assert_eq!(cache.metrics.read_cache_capped.load(Ordering::Relaxed), 0);
let r = cache.cached_read_with_hmac(&dir, "home/c").unwrap();
assert!(r.is_some());
assert!(cache.read_conns.get("home/c").is_none());
assert!(cache.read_conns.get("home/a").is_some());
assert!(cache.read_conns.get("home/b").is_some());
assert_eq!(cache.metrics.read_cache_capped.load(Ordering::Relaxed), 1);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn cache_hit_serves_from_cache_even_at_cap() {
let dir = scratch_dir("cap-hit");
for w in ["home/a", "home/b"] {
let _c = world::open(&dir, w).unwrap();
world::write(&dir, w, b"x", "text/plain", &[]).unwrap();
}
let cache = ReadCache::new(2);
let _ = cache.cached_read_with_hmac(&dir, "home/a").unwrap();
let _ = cache.cached_read_with_hmac(&dir, "home/b").unwrap();
let hits_before = cache.metrics.read_cache_hits.load(Ordering::Relaxed);
let _ = cache.cached_read_with_hmac(&dir, "home/a").unwrap();
let hits_after = cache.metrics.read_cache_hits.load(Ordering::Relaxed);
assert_eq!(hits_after, hits_before + 1);
assert_eq!(cache.metrics.read_cache_capped.load(Ordering::Relaxed), 0);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn opening_transition_drop_sets_tombstone_on_panic_path() {
let slot = Arc::new(ReadSlot {
inner: StdRwLock::new(SlotState::Opening),
});
{
let mut g = slot.inner.write().unwrap();
let _t = OpeningTransition::new(&mut g);
}
let g = slot.inner.read().unwrap();
assert!(matches!(*g, SlotState::Tombstone));
}
#[test]
fn cached_verify_chain_uses_slot_protocol() {
let dir = scratch_dir("verify-slot");
let world = "home/audited";
match world::write_with_audit_checked(
&dir,
world,
b"hello",
"text/plain",
&[],
b"key",
None,
) {
Ok(_) => {}
Err(_) => panic!("seed write_with_audit_checked failed"),
}
let cache = ReadCache::new(DEFAULT_READ_CACHE_MAX_ENTRIES);
let r1 = cache.cached_verify_chain(&dir, world, b"key").unwrap();
assert!(matches!(r1, Some(crate::audit::VerifyReport::Valid(_))));
assert!(
cache.read_conns.get(world).is_some(),
"expected the verify to populate the SlotState cache; \
a regression to the bare audit::verify_chain path would \
leave the map empty"
);
let r2 = cache.cached_verify_chain(&dir, world, b"key").unwrap();
assert!(matches!(r2, Some(crate::audit::VerifyReport::Valid(_))));
cache.install_tombstone_blocking(world);
let r3 = cache.cached_verify_chain(&dir, world, b"key").unwrap();
assert!(r3.is_none());
cache.clear_tombstone(world);
let r4 = cache.cached_verify_chain(&dir, world, b"key").unwrap();
assert!(matches!(r4, Some(crate::audit::VerifyReport::Valid(_))));
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn cap_transient_slot_concurrent_readers_safe_under_cleanup() {
use std::sync::Arc as StdArc;
use std::thread;
let dir = StdArc::new(scratch_dir("cap-transient-concurrent"));
for w in ["home/a", "home/b", "home/c"] {
let _c = world::open(&dir, w).unwrap();
world::write(&dir, w, b"hello world", "text/plain", &[]).unwrap();
}
let cache = StdArc::new(ReadCache::new(2));
let _ = cache.cached_read_with_hmac(&dir, "home/a").unwrap();
let _ = cache.cached_read_with_hmac(&dir, "home/b").unwrap();
let mut handles = Vec::new();
for _ in 0..4 {
let cache = cache.clone();
let dir = dir.clone();
handles.push(thread::spawn(move || {
cache.cached_read_with_hmac(&dir, "home/c").expect("read")
}));
}
let mut bodies = 0usize;
let mut spurious_404s = 0usize;
for h in handles {
match h.join().expect("thread") {
Some((stage, _hmac)) => {
assert_eq!(stage.body, b"hello world", "body must not be torn");
assert_eq!(stage.content_type, "text/plain");
bodies += 1;
}
None => spurious_404s += 1,
}
}
assert!(
bodies >= 1,
"transient slot owner must read body before drain; \
got {bodies} body / {spurious_404s} Ok(None)"
);
assert!(cache.read_conns.get("home/c").is_none());
assert!(cache.read_conns.get("home/a").is_some());
assert!(cache.read_conns.get("home/b").is_some());
let _ = std::fs::remove_dir_all(&*dir);
}
#[cfg(unix)]
#[test]
fn cantopen_with_existing_unreadable_file_propagates_500_not_404() {
use std::os::unix::fs::PermissionsExt;
let dir = scratch_dir("cantopen-existing");
let world = "home/locked";
let _c = world::open(&dir, world).unwrap();
world::write(&dir, world, b"hello", "text/plain", &[]).unwrap();
let db_path = world::world_db(&dir, world);
let mut perms = std::fs::metadata(&db_path).unwrap().permissions();
perms.set_mode(0o000);
std::fs::set_permissions(&db_path, perms).unwrap();
let cache = ReadCache::new(DEFAULT_READ_CACHE_MAX_ENTRIES);
let result = cache.cached_read_with_hmac(&dir, world);
let outcome = match &result {
Ok(Some(_)) => "Ok(Some(_)) <- spurious read",
Ok(None) => "Ok(None) <- spurious 404",
Err(_) => "Err(_)",
};
assert!(
result.is_err(),
"CANTOPEN on existing-but-unreadable file must propagate as Err(_); \
got {outcome} (a future regression to bare path.exists() would land here)"
);
let mut perms = std::fs::metadata(&db_path).unwrap().permissions();
perms.set_mode(0o644);
std::fs::set_permissions(&db_path, perms).unwrap();
let _ = std::fs::remove_dir_all(&dir);
}
}