use crate::error::PicanteError;
use crate::key::{Dep, Key, QueryKindId};
use crate::revision::Revision;
use crate::runtime::RuntimeId;
use dashmap::DashMap;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use tokio::sync::Notify;
use tracing::trace;
pub(crate) type ArcAny = Arc<dyn std::any::Any + Send + Sync>;
static IN_FLIGHT_REGISTRY: std::sync::LazyLock<DashMap<InFlightKey, Arc<InFlightEntry>>> =
std::sync::LazyLock::new(DashMap::new);
#[derive(Clone)]
pub(crate) struct SharedCacheRecord {
pub(crate) value: ArcAny,
pub(crate) deps: Arc<[Dep]>,
pub(crate) changed_at: Revision,
pub(crate) verified_at: Revision,
pub(crate) insert_id: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct SharedCacheKey {
runtime_id: RuntimeId,
kind: QueryKindId,
key: Key,
}
static SHARED_CACHE: std::sync::LazyLock<DashMap<SharedCacheKey, SharedCacheRecord>> =
std::sync::LazyLock::new(DashMap::new);
static SHARED_CACHE_ORDER: std::sync::LazyLock<
parking_lot::Mutex<VecDeque<(SharedCacheKey, u64)>>,
> = std::sync::LazyLock::new(|| parking_lot::Mutex::new(VecDeque::new()));
static SHARED_CACHE_MAX_ENTRIES: AtomicUsize = AtomicUsize::new(20_000);
static SHARED_CACHE_MAX_ENTRIES_OVERRIDDEN: AtomicBool = AtomicBool::new(false);
static SHARED_CACHE_INSERT_ID: AtomicU64 = AtomicU64::new(1);
fn shared_cache_max_entries() -> usize {
if SHARED_CACHE_MAX_ENTRIES_OVERRIDDEN.load(Ordering::Relaxed) {
return SHARED_CACHE_MAX_ENTRIES.load(Ordering::Relaxed);
}
std::env::var("PICANTE_SHARED_CACHE_MAX_ENTRIES")
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or_else(|| SHARED_CACHE_MAX_ENTRIES.load(Ordering::Relaxed))
}
pub(crate) fn shared_cache_get(
runtime_id: RuntimeId,
kind: QueryKindId,
key: &Key,
) -> Option<SharedCacheRecord> {
let k = SharedCacheKey {
runtime_id,
kind,
key: key.clone(),
};
SHARED_CACHE.get(&k).map(|v| v.clone())
}
pub(crate) fn shared_cache_put(
runtime_id: RuntimeId,
kind: QueryKindId,
key: Key,
mut record: SharedCacheRecord,
) {
let k = SharedCacheKey {
runtime_id,
kind,
key,
};
let max = shared_cache_max_entries();
let insert_id = SHARED_CACHE_INSERT_ID.fetch_add(1, Ordering::Relaxed);
record.insert_id = insert_id;
{
let mut order = SHARED_CACHE_ORDER.lock();
order.push_back((k.clone(), insert_id));
while SHARED_CACHE.len() > max {
let Some((old_key, old_id)) = order.pop_front() else {
break;
};
let should_remove = SHARED_CACHE
.get(&old_key)
.map(|v| v.insert_id == old_id)
.unwrap_or(false);
if should_remove {
SHARED_CACHE.remove(&old_key);
}
}
}
SHARED_CACHE.insert(k, record);
}
#[doc(hidden)]
pub fn __test_shared_cache_clear() {
SHARED_CACHE.clear();
SHARED_CACHE_ORDER.lock().clear();
}
#[doc(hidden)]
pub fn __test_shared_cache_set_max_entries(max_entries: usize) {
SHARED_CACHE_MAX_ENTRIES.store(max_entries.max(1), Ordering::Relaxed);
SHARED_CACHE_MAX_ENTRIES_OVERRIDDEN.store(true, Ordering::Relaxed);
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct InFlightKey {
pub runtime_id: RuntimeId,
pub revision: Revision,
pub kind: QueryKindId,
pub key: Key,
}
#[derive(Debug, Clone)]
pub(crate) enum InFlightState {
Running,
Done {
value: ArcAny,
deps: Arc<[Dep]>,
changed_at: Revision,
},
Failed(Arc<PicanteError>),
Cancelled,
}
pub(crate) struct InFlightEntry {
state: parking_lot::Mutex<InFlightState>,
notify: Notify,
}
impl InFlightEntry {
fn new() -> Self {
Self {
state: parking_lot::Mutex::new(InFlightState::Running),
notify: Notify::new(),
}
}
pub(crate) fn state(&self) -> InFlightState {
self.state.lock().clone()
}
fn complete(&self, value: ArcAny, deps: Arc<[Dep]>, changed_at: Revision) {
trace!("inflight entry: completing with success");
*self.state.lock() = InFlightState::Done {
value,
deps,
changed_at,
};
self.notify.notify_waiters();
}
fn fail(&self, error: Arc<PicanteError>) {
trace!("inflight entry: completing with error");
*self.state.lock() = InFlightState::Failed(error);
self.notify.notify_waiters();
}
fn cancel(&self) {
trace!("inflight entry: cancelled (leader dropped)");
*self.state.lock() = InFlightState::Cancelled;
self.notify.notify_waiters();
}
pub(crate) fn notified(&self) -> impl std::future::Future<Output = ()> + '_ {
self.notify.notified()
}
}
pub(crate) enum TryLeadResult {
Leader(InFlightGuard),
Follower(Arc<InFlightEntry>),
}
pub(crate) struct InFlightGuard {
key: InFlightKey,
entry: Arc<InFlightEntry>,
completed: bool,
}
impl InFlightGuard {
pub(crate) fn complete(mut self, value: ArcAny, deps: Arc<[Dep]>, changed_at: Revision) {
self.entry.complete(value, deps, changed_at);
self.completed = true;
IN_FLIGHT_REGISTRY.remove(&self.key);
}
pub(crate) fn fail(mut self, error: Arc<PicanteError>) {
self.entry.fail(error);
self.completed = true;
IN_FLIGHT_REGISTRY.remove(&self.key);
}
}
impl Drop for InFlightGuard {
fn drop(&mut self) {
if !self.completed {
self.entry.cancel();
IN_FLIGHT_REGISTRY.remove(&self.key);
}
}
}
pub(crate) fn try_lead(key: InFlightKey) -> TryLeadResult {
use dashmap::mapref::entry::Entry;
match IN_FLIGHT_REGISTRY.entry(key.clone()) {
Entry::Occupied(occupied) => {
TryLeadResult::Follower(occupied.get().clone())
}
Entry::Vacant(vacant) => {
let entry = Arc::new(InFlightEntry::new());
vacant.insert(entry.clone());
TryLeadResult::Leader(InFlightGuard {
key,
entry,
completed: false,
})
}
}
}