use crate::db::{DynIngredient, IngredientLookup, Touch};
use crate::error::{PicanteError, PicanteResult};
use crate::frame::{self, ActiveFrameHandle};
use crate::inflight::{self, InFlightKey, InFlightState, SharedCacheRecord, TryLeadResult};
use crate::key::{Dep, DynKey, Key, QueryKindId};
use crate::persist::{PersistableIngredient, SectionType};
use crate::revision::Revision;
use facet::Facet;
use facet_core::Shape;
use facet_reflect::{HeapValue, Partial, Peek};
use futures_util::FutureExt;
use futures_util::future::BoxFuture;
use parking_lot::RwLock;
use std::any::Any;
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::{Mutex, Notify};
use tracing::trace;
type ComputeFuture<'db, V> = BoxFuture<'db, PicanteResult<V>>;
type ComputeFn<DB, K, V> = dyn for<'db> Fn(&'db DB, K) -> ComputeFuture<'db, V> + Send + Sync;
type ArcAny = Arc<dyn Any + Send + Sync>;
type ComputeFut<'a> = BoxFuture<'a, PicanteResult<ArcAny>>;
struct ErasedRecordData {
dyn_key: DynKey,
value: ArcAny,
verified_at: Revision,
changed_at: Revision,
deps: Arc<[Dep]>,
}
type EncodeRecordFn = fn(
kind_name: &'static str,
dyn_key: &DynKey,
value: &ArcAny,
verified_at: Revision,
changed_at: Revision,
deps: &[Dep],
) -> PicanteResult<Vec<u8>>;
type DecodeRecordFn = fn(kind: QueryKindId, bytes: Vec<u8>) -> PicanteResult<ErasedRecordData>;
type EncodeIncrementalFn = fn(
kind_name: &'static str,
dyn_key: &DynKey,
value: &ArcAny,
verified_at: Revision,
changed_at: Revision,
deps: &[Dep],
) -> PicanteResult<(Vec<u8>, Vec<u8>)>;
type ApplyWalEntryFn = fn(
kind: QueryKindId,
key_bytes: Vec<u8>,
value_bytes: Option<Vec<u8>>,
) -> PicanteResult<ApplyWalResult>;
struct ApplyWalResult {
dyn_key: DynKey,
cell: Option<Arc<ErasedCell>>, }
type EqErasedFn = fn(&dyn Any, &dyn Any) -> bool;
#[inline(never)]
fn encode_with_peek(peek: Peek<'_, '_>, what: &'static str) -> PicanteResult<Vec<u8>> {
facet_postcard::peek_to_vec(peek).map_err(|e| {
Arc::new(PicanteError::Encode {
what,
message: format!("{e:?}"),
})
})
}
#[inline(never)]
unsafe fn decode_to_heap_value(
bytes: &[u8],
shape: &'static Shape,
what: &'static str,
) -> PicanteResult<HeapValue<'static, false>> {
let partial = unsafe { Partial::alloc_shape_owned(shape) }.map_err(|e| {
Arc::new(PicanteError::Decode {
what,
message: format!("alloc failed: {e:?}"),
})
})?;
let partial = facet_postcard::from_slice_into(bytes, partial).map_err(|e| {
Arc::new(PicanteError::Decode {
what,
message: format!("{e:?}"),
})
})?;
partial.build().map_err(|e| {
Arc::new(PicanteError::Decode {
what,
message: format!("build failed: {e:?}"),
})
})
}
trait ErasedCompute<DB>: Send + Sync {
fn compute<'a>(&'a self, db: &'a DB, key: Key) -> ComputeFut<'a>;
}
struct TypedCompute<DB, K, V> {
f: Arc<ComputeFn<DB, K, V>>,
_phantom: PhantomData<(K, V)>,
}
impl<DB, K, V> ErasedCompute<DB> for TypedCompute<DB, K, V>
where
DB: IngredientLookup + Send + Sync + 'static,
K: Facet<'static> + Send + Sync + 'static,
V: Send + Sync + 'static,
{
fn compute<'a>(&'a self, db: &'a DB, key: Key) -> ComputeFut<'a> {
Box::pin(async move {
let k: K = key.decode_facet()?;
let v: V = (self.f)(db, k).await?;
Ok(Arc::new(v) as ArcAny)
})
}
}
fn eq_erased_for<V>(a: &dyn Any, b: &dyn Any) -> bool
where
V: Facet<'static> + 'static,
{
crate::facet_eq::facet_eq::<V>(a, b)
}
struct DerivedCore {
kind: QueryKindId,
kind_name: &'static str,
cells: RwLock<im::HashMap<DynKey, Arc<ErasedCell>>>,
encode_record: EncodeRecordFn,
decode_record: DecodeRecordFn,
encode_incremental: EncodeIncrementalFn,
apply_wal_entry: ApplyWalEntryFn,
}
impl DerivedCore {
fn new(
kind: QueryKindId,
kind_name: &'static str,
encode_record: EncodeRecordFn,
decode_record: DecodeRecordFn,
encode_incremental: EncodeIncrementalFn,
apply_wal_entry: ApplyWalEntryFn,
) -> Self {
Self {
kind,
kind_name,
cells: RwLock::new(im::HashMap::new()),
encode_record,
decode_record,
encode_incremental,
apply_wal_entry,
}
}
async fn access_scoped_erased<DB>(
&self,
db: &DB,
requested: DynKey,
want_value: bool,
compute: &dyn ErasedCompute<DB>,
eq_erased: EqErasedFn,
) -> PicanteResult<ErasedAccessResult>
where
DB: IngredientLookup + Send + Sync + 'static,
{
let key_hash = requested.key.hash();
if let Some(stack) = frame::find_cycle(&requested) {
return Err(Arc::new(PicanteError::Cycle {
requested: requested.clone(),
stack,
}));
}
if want_value && frame::has_active_frame() {
trace!(
kind = self.kind.0,
key_hash = %format!("{:016x}", key_hash),
"derived dep"
);
frame::record_dep(Dep {
kind: self.kind,
key: requested.key.clone(),
});
}
let cell = {
if let Some(cell) = self.cells.read().get(&requested) {
cell.clone()
} else {
let mut cells = self.cells.write();
if let Some(cell) = cells.get(&requested) {
cell.clone()
} else {
let cell = Arc::new(ErasedCell::new());
cells.insert(requested.clone(), cell.clone());
cell
}
}
};
loop {
let rev = db.runtime().current_revision();
let notified = cell.notify.notified();
enum ErasedObserved {
Ready {
value: Option<Arc<dyn std::any::Any + Send + Sync>>,
changed_at: Revision,
},
Error(Arc<PicanteError>),
Running {
started_at: Revision,
},
StaleReady {
deps: Arc<[Dep]>,
changed_at: Revision,
},
StaleOther,
}
let observed = {
let state = cell.state.lock().await;
match &*state {
ErasedState::Ready {
value,
verified_at,
changed_at,
..
} if *verified_at == rev => ErasedObserved::Ready {
value: want_value.then(|| value.clone()),
changed_at: *changed_at,
},
ErasedState::Poisoned { error, verified_at } if *verified_at == rev => {
ErasedObserved::Error(error.clone())
}
ErasedState::Running { started_at } => ErasedObserved::Running {
started_at: *started_at,
},
ErasedState::Ready {
deps, changed_at, ..
} => ErasedObserved::StaleReady {
deps: deps.clone(),
changed_at: *changed_at,
},
_ => ErasedObserved::StaleOther,
}
};
match observed {
ErasedObserved::Ready { value, changed_at } => {
if db.runtime().current_revision() == rev {
return Ok(ErasedAccessResult { value, changed_at });
}
continue;
}
ErasedObserved::Error(e) => {
if db.runtime().current_revision() == rev {
return Err(e);
}
continue;
}
ErasedObserved::Running { started_at } => {
trace!(
kind = self.kind.0,
key_hash = %format!("{:016x}", key_hash),
started_at = started_at.0,
"wait on running cell"
);
notified.await;
continue;
}
ErasedObserved::StaleReady { deps, changed_at } => {
if self
.try_revalidate(db, &requested, rev, &deps, changed_at)
.await?
{
let mut state = cell.state.lock().await;
match &mut *state {
ErasedState::Ready {
value,
verified_at,
changed_at,
..
} => {
*verified_at = rev;
let out_value = want_value.then(|| value.clone());
let out_changed_at = *changed_at;
drop(state);
if db.runtime().current_revision() == rev {
return Ok(ErasedAccessResult {
value: out_value,
changed_at: out_changed_at,
});
}
continue;
}
ErasedState::Running { .. } => {
continue;
}
_ => continue,
}
}
}
ErasedObserved::StaleOther => {}
}
let (started, prev) = {
let mut prev: Option<(Arc<dyn std::any::Any + Send + Sync>, Revision)> = None;
let mut state = cell.state.lock().await;
match &*state {
ErasedState::Ready { verified_at, .. } if *verified_at == rev => (false, None), ErasedState::Poisoned { verified_at, .. } if *verified_at == rev => {
(false, None)
} ErasedState::Running { .. } => (false, None), _ => {
let old = std::mem::replace(
&mut *state,
ErasedState::Running { started_at: rev },
);
if let ErasedState::Ready {
value, changed_at, ..
} = old
{
prev = Some((value, changed_at));
}
(true, prev)
}
}
};
if !started {
continue;
}
if let Some(record) =
inflight::shared_cache_get(db.runtime().id(), self.kind, &requested.key)
{
let can_adopt = if record.verified_at == rev {
true
} else {
self.try_revalidate(db, &requested, rev, &record.deps, record.changed_at)
.await?
};
if can_adopt {
db.runtime()
.update_query_deps(requested.clone(), record.deps.clone());
let mut state = cell.state.lock().await;
*state = ErasedState::Ready {
value: record.value.clone(),
verified_at: rev,
changed_at: record.changed_at,
deps: record.deps.clone(),
};
drop(state);
cell.notify.notify_waiters();
inflight::shared_cache_put(
db.runtime().id(),
self.kind,
requested.key.clone(),
SharedCacheRecord {
value: record.value.clone(),
deps: record.deps.clone(),
changed_at: record.changed_at,
verified_at: rev,
insert_id: 0,
},
);
if db.runtime().current_revision() == rev {
let out_value = want_value.then(|| record.value.clone());
return Ok(ErasedAccessResult {
value: out_value,
changed_at: record.changed_at,
});
}
continue;
}
}
let inflight_key = InFlightKey {
runtime_id: db.runtime().id(),
revision: rev,
kind: self.kind,
key: requested.key.clone(),
};
match inflight::try_lead(inflight_key.clone()) {
TryLeadResult::Follower(entry) => {
trace!(
kind = self.kind.0,
key_hash = %format!("{:016x}", key_hash),
rev = rev.0,
"inflight: follower, waiting for leader"
);
{
let mut state = cell.state.lock().await;
*state = ErasedState::Vacant;
}
loop {
let notified = entry.notified();
let entry_state = entry.state();
match entry_state {
InFlightState::Running => {
notified.await;
}
InFlightState::Done {
value,
deps,
changed_at,
} => {
trace!(
kind = self.kind.0,
key_hash = %format!("{:016x}", key_hash),
rev = rev.0,
"inflight: follower got result from leader"
);
let out_value = want_value.then(|| value.clone());
let mut state = cell.state.lock().await;
*state = ErasedState::Ready {
value: value.clone(),
verified_at: rev,
changed_at,
deps: deps.clone(),
};
drop(state);
cell.notify.notify_waiters();
db.runtime()
.update_query_deps(requested.clone(), deps.clone());
if changed_at == rev {
db.runtime().notify_query_changed(rev, requested.clone());
}
inflight::shared_cache_put(
db.runtime().id(),
self.kind,
requested.key.clone(),
SharedCacheRecord {
value: value.clone(),
deps: deps.clone(),
changed_at,
verified_at: rev,
insert_id: 0,
},
);
if db.runtime().current_revision() == rev {
return Ok(ErasedAccessResult {
value: out_value,
changed_at,
});
}
break;
}
InFlightState::Failed(err) => {
trace!(
kind = self.kind.0,
key_hash = %format!("{:016x}", key_hash),
rev = rev.0,
"inflight: follower got error from leader"
);
let mut state = cell.state.lock().await;
*state = ErasedState::Poisoned {
error: err.clone(),
verified_at: rev,
};
drop(state);
cell.notify.notify_waiters();
if db.runtime().current_revision() == rev {
return Err(err);
}
break;
}
InFlightState::Cancelled => {
trace!(
kind = self.kind.0,
key_hash = %format!("{:016x}", key_hash),
rev = rev.0,
"inflight: leader cancelled, will retry"
);
break;
}
}
}
continue;
}
TryLeadResult::Leader(guard) => {
trace!(
kind = self.kind.0,
key_hash = %format!("{:016x}", key_hash),
rev = rev.0,
"inflight: leader, computing"
);
let frame = ActiveFrameHandle::new(requested.clone(), rev);
let _frame_guard = frame::push_frame(frame.clone());
trace!(
kind = self.kind.0,
key_hash = %format!("{:016x}", key_hash),
rev = rev.0,
"compute: start"
);
let result =
std::panic::AssertUnwindSafe(compute.compute(db, requested.key.clone()))
.catch_unwind()
.await;
let deps: Arc<[Dep]> = frame.take_deps().into();
match result {
Ok(Ok(out)) => {
let changed_at = match prev {
Some((prev_value, prev_changed_at)) => {
let is_same = Arc::ptr_eq(&prev_value, &out)
|| eq_erased(prev_value.as_ref(), out.as_ref());
if is_same { prev_changed_at } else { rev }
}
None => rev,
};
db.runtime()
.update_query_deps(requested.clone(), deps.clone());
if changed_at == rev {
db.runtime().notify_query_changed(rev, requested.clone());
}
let out_value = want_value.then(|| out.clone());
let mut state = cell.state.lock().await;
*state = ErasedState::Ready {
value: out.clone(),
verified_at: rev,
changed_at,
deps: deps.clone(),
};
drop(state);
cell.notify.notify_waiters();
inflight::shared_cache_put(
db.runtime().id(),
self.kind,
requested.key.clone(),
SharedCacheRecord {
value: out.clone(),
deps: deps.clone(),
changed_at,
verified_at: rev,
insert_id: 0,
},
);
guard.complete(out, deps, changed_at);
trace!(
kind = self.kind.0,
key_hash = %format!("{:016x}", key_hash),
rev = rev.0,
"compute: ok"
);
if db.runtime().current_revision() == rev {
return Ok(ErasedAccessResult {
value: out_value,
changed_at,
});
}
continue;
}
Ok(Err(err)) => {
let mut state = cell.state.lock().await;
*state = ErasedState::Poisoned {
error: err.clone(),
verified_at: rev,
};
drop(state);
cell.notify.notify_waiters();
guard.fail(err.clone());
trace!(
kind = self.kind.0,
key_hash = %format!("{:016x}", key_hash),
rev = rev.0,
"compute: err"
);
if db.runtime().current_revision() == rev {
return Err(err);
}
continue;
}
Err(panic_payload) => {
let err = Arc::new(PicanteError::Panic {
message: panic_message(panic_payload),
});
let mut state = cell.state.lock().await;
*state = ErasedState::Poisoned {
error: err.clone(),
verified_at: rev,
};
drop(state);
cell.notify.notify_waiters();
guard.fail(err.clone());
trace!(
kind = self.kind.0,
key_hash = %format!("{:016x}", key_hash),
rev = rev.0,
"compute: panic"
);
if db.runtime().current_revision() == rev {
return Err(err);
}
continue;
}
}
}
}
}
}
async fn try_revalidate<DB>(
&self,
db: &DB,
requested: &DynKey,
rev: Revision,
deps: &Arc<[Dep]>,
self_changed_at: Revision,
) -> PicanteResult<bool>
where
DB: IngredientLookup + Send + Sync + 'static,
{
trace!(
kind = self.kind.0,
key_hash = %format!("{:016x}", requested.key.hash()),
deps = deps.len(),
"revalidate: start"
);
let frame = ActiveFrameHandle::new(requested.clone(), rev);
let _guard = frame::push_frame(frame);
for dep in deps.iter() {
let Some(ingredient) = db.ingredient(dep.kind) else {
return Ok(false);
};
let touch = ingredient.touch(db, dep.key.clone()).await?;
if touch.changed_at > self_changed_at {
return Ok(false);
}
}
Ok(true)
}
async fn save_records_erased(&self) -> PicanteResult<Vec<Vec<u8>>> {
let snapshot: Vec<(DynKey, Arc<ErasedCell>)> = {
let cells = self.cells.read();
cells.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
};
let mut records = Vec::with_capacity(snapshot.len());
for (dyn_key, cell) in snapshot {
let state = cell.state.lock().await;
let ErasedState::Ready {
value,
verified_at,
changed_at,
deps,
} = &*state
else {
continue;
};
let bytes = (self.encode_record)(
self.kind_name,
&dyn_key,
value,
*verified_at,
*changed_at,
deps,
)?;
records.push(bytes);
}
trace!(
kind = self.kind.0,
records = records.len(),
"save_records (derived, erased)"
);
Ok(records)
}
fn load_records_erased(&self, records: Vec<Vec<u8>>) -> PicanteResult<()> {
for bytes in records {
let data = (self.decode_record)(self.kind, bytes)?;
let cell = Arc::new(ErasedCell::new_ready(
data.value,
data.verified_at,
data.changed_at,
data.deps,
));
let mut cells = self.cells.write();
cells.insert(data.dyn_key, cell);
}
Ok(())
}
async fn save_incremental_records_erased(
&self,
since_revision: u64,
) -> PicanteResult<Vec<(u64, Vec<u8>, Option<Vec<u8>>)>> {
let snapshot: Vec<(DynKey, Arc<ErasedCell>)> = {
let cells = self.cells.read();
cells.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
};
let mut changes = Vec::new();
for (dyn_key, cell) in snapshot {
let state = cell.state.lock().await;
let ErasedState::Ready {
value,
changed_at,
verified_at,
deps,
} = &*state
else {
continue;
};
if changed_at.0 <= since_revision {
continue;
}
let (key_bytes, value_bytes) = (self.encode_incremental)(
self.kind_name,
&dyn_key,
value,
*verified_at,
*changed_at,
deps,
)?;
changes.push((changed_at.0, key_bytes, Some(value_bytes)));
}
trace!(
kind = self.kind.0,
changes = changes.len(),
since_revision,
"save_incremental_records (derived, erased)"
);
Ok(changes)
}
fn apply_wal_entry_erased(
&self,
_revision: u64,
key_bytes: Vec<u8>,
value_bytes: Option<Vec<u8>>,
) -> PicanteResult<()> {
let result = (self.apply_wal_entry)(self.kind, key_bytes, value_bytes)?;
let mut cells = self.cells.write();
if let Some(cell) = result.cell {
cells.insert(result.dyn_key, cell);
} else {
cells.remove(&result.dyn_key);
}
Ok(())
}
async fn restore_runtime_state_inner(
&self,
runtime: &crate::runtime::Runtime,
) -> PicanteResult<()> {
let snapshot: Vec<(DynKey, Arc<ErasedCell>)> = {
let cells = self.cells.read();
cells.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
};
for (dyn_key, cell) in snapshot {
let state = cell.state.lock().await;
let ErasedState::Ready { deps, .. } = &*state else {
continue;
};
runtime.update_query_deps(dyn_key, deps.clone());
}
trace!(kind = self.kind.0, "restore_runtime_state (derived)");
Ok(())
}
fn touch_erased<'a, DB>(
&'a self,
db: &'a DB,
dyn_key: DynKey,
compute: &'a dyn ErasedCompute<DB>,
eq_erased: EqErasedFn,
) -> BoxFuture<'a, PicanteResult<Touch>>
where
DB: IngredientLookup + Send + Sync + 'static,
{
Box::pin(async move {
let result = frame::scope_if_needed_boxed(Box::pin(
self.access_scoped_erased(db, dyn_key, false, compute, eq_erased),
))
.await?;
Ok(Touch {
changed_at: result.changed_at,
})
})
}
fn get_erased<'a, DB>(
&'a self,
db: &'a DB,
dyn_key: DynKey,
compute: &'a dyn ErasedCompute<DB>,
eq_erased: EqErasedFn,
) -> BoxFuture<'a, PicanteResult<ArcAny>>
where
DB: IngredientLookup + Send + Sync + 'static,
{
Box::pin(async move {
let result = frame::scope_if_needed_boxed(Box::pin(self.access_scoped_erased(
db,
dyn_key.clone(),
true,
compute,
eq_erased,
)))
.await?;
result.value.ok_or_else(|| {
Arc::new(PicanteError::Panic {
message: format!("[BUG] expected value but got None for key {:?}", dyn_key),
})
})
})
}
async fn snapshot_cells_deep_inner(&self) -> im::HashMap<DynKey, Arc<ErasedCell>> {
let cells_snapshot: Vec<(DynKey, Arc<ErasedCell>)> = {
let cells = self.cells.read();
cells.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
};
let mut result = im::HashMap::new();
for (dyn_key, cell) in cells_snapshot {
let state = cell.state.lock().await;
if let ErasedState::Ready {
value,
verified_at,
changed_at,
deps,
} = &*state
{
let cloned_value = value.clone();
let new_cell = Arc::new(ErasedCell::new_ready(
cloned_value,
*verified_at,
*changed_at,
deps.clone(),
));
result.insert(dyn_key, new_cell);
}
}
result
}
}
fn make_encode_record<K, V>() -> EncodeRecordFn
where
K: Clone + Eq + Hash + Facet<'static> + Send + Sync + 'static,
V: Clone + Facet<'static> + Send + Sync + 'static,
{
|kind_name, dyn_key, value, verified_at, changed_at, deps| {
let key: K = dyn_key.key.decode_facet().map_err(|e| {
Arc::new(PicanteError::Panic {
message: format!(
"[BUG] failed to decode key for ingredient {} during save: {:?}",
kind_name, e
),
})
})?;
let typed_value: &V = value.downcast_ref::<V>().ok_or_else(|| {
Arc::new(PicanteError::Panic {
message: format!(
"[BUG] type mismatch in save_records for ingredient {}: \
expected {}, got TypeId {:?}",
kind_name,
std::any::type_name::<V>(),
(&**value as &dyn std::any::Any).type_id()
),
})
})?;
let deps = deps
.iter()
.map(|d| DepRecord {
kind_id: d.kind.as_u32(),
key_bytes: d.key.bytes().to_vec(),
})
.collect();
let rec = DerivedRecord::<K, V> {
key,
value: typed_value.clone(),
verified_at: verified_at.0,
changed_at: changed_at.0,
deps,
};
encode_with_peek(Peek::new(&rec), "derived record")
}
}
fn make_decode_record<K, V>() -> DecodeRecordFn
where
K: Clone + Eq + Hash + Facet<'static> + Send + Sync + 'static,
V: Clone + Facet<'static> + Send + Sync + 'static,
{
|kind, bytes| {
let heap_value = unsafe {
decode_to_heap_value(&bytes, <DerivedRecord<K, V>>::SHAPE, "derived record")
}?;
let rec: DerivedRecord<K, V> = heap_value.materialize().map_err(|e| {
Arc::new(PicanteError::Decode {
what: "derived record (materialize)",
message: format!("{e:?}"),
})
})?;
let deps: Arc<[Dep]> = rec
.deps
.into_iter()
.map(|d| Dep {
kind: QueryKindId(d.kind_id),
key: Key::from_bytes(d.key_bytes),
})
.collect::<Vec<_>>()
.into();
let dyn_key = DynKey {
kind,
key: Key::encode_facet(&rec.key)?,
};
let value = Arc::new(rec.value) as ArcAny;
Ok(ErasedRecordData {
dyn_key,
value,
verified_at: Revision(rec.verified_at),
changed_at: Revision(rec.changed_at),
deps,
})
}
}
fn make_encode_incremental<K, V>() -> EncodeIncrementalFn
where
K: Clone + Eq + Hash + Facet<'static> + Send + Sync + 'static,
V: Clone + Facet<'static> + Send + Sync + 'static,
{
|kind_name, dyn_key, value, verified_at, changed_at, deps| {
let key: K = dyn_key.key.decode_facet().map_err(|e| {
Arc::new(PicanteError::Panic {
message: format!(
"[BUG] failed to decode key for ingredient {} during incremental save: {:?}",
kind_name, e
),
})
})?;
let typed_value: &V = value.downcast_ref::<V>().ok_or_else(|| {
Arc::new(PicanteError::Panic {
message: format!(
"[BUG] type mismatch in save_incremental_records for ingredient {}: \
expected {}, got TypeId {:?}",
kind_name,
std::any::type_name::<V>(),
(&**value as &dyn std::any::Any).type_id()
),
})
})?;
let dep_records = deps
.iter()
.map(|d| DepRecord {
kind_id: d.kind.as_u32(),
key_bytes: d.key.bytes().to_vec(),
})
.collect();
let rec = DerivedRecord::<K, V> {
key: key.clone(),
value: typed_value.clone(),
verified_at: verified_at.0,
changed_at: changed_at.0,
deps: dep_records,
};
let key_bytes = encode_with_peek(Peek::new(&key), "derived key")?;
let value_bytes = encode_with_peek(Peek::new(&rec), "derived record")?;
Ok((key_bytes, value_bytes))
}
}
fn make_apply_wal_entry<K, V>() -> ApplyWalEntryFn
where
K: Clone + Eq + Hash + Facet<'static> + Send + Sync + 'static,
V: Clone + Facet<'static> + Send + Sync + 'static,
{
|kind, key_bytes, value_bytes| {
let heap_value =
unsafe { decode_to_heap_value(&key_bytes, K::SHAPE, "derived key from WAL") }?;
let key: K = heap_value.materialize().map_err(|e| {
Arc::new(PicanteError::Decode {
what: "derived key from WAL (materialize)",
message: format!("{e:?}"),
})
})?;
let dyn_key = DynKey {
kind,
key: Key::encode_facet(&key)?,
};
if let Some(value_bytes) = value_bytes {
let heap_value = unsafe {
decode_to_heap_value(
&value_bytes,
<DerivedRecord<K, V>>::SHAPE,
"derived record from WAL",
)
}?;
let rec: DerivedRecord<K, V> = heap_value.materialize().map_err(|e| {
Arc::new(PicanteError::Decode {
what: "derived record from WAL (materialize)",
message: format!("{e:?}"),
})
})?;
let deps: Arc<[Dep]> = rec
.deps
.into_iter()
.map(|d| Dep {
kind: QueryKindId(d.kind_id),
key: Key::from_bytes(d.key_bytes),
})
.collect::<Vec<_>>()
.into();
let erased_value = Arc::new(rec.value) as ArcAny;
let cell = Arc::new(ErasedCell::new_ready(
erased_value,
Revision(rec.verified_at),
Revision(rec.changed_at),
deps,
));
Ok(ApplyWalResult {
dyn_key,
cell: Some(cell),
})
} else {
Ok(ApplyWalResult {
dyn_key,
cell: None,
})
}
}
}
pub struct DerivedIngredient<DB, K, V>
where
K: Clone + Eq + Hash,
{
core: DerivedCore,
_phantom: PhantomData<(K, V)>,
compute: Arc<dyn ErasedCompute<DB>>,
eq_erased: EqErasedFn,
}
impl<DB, K, V> DerivedIngredient<DB, K, V>
where
DB: IngredientLookup + Send + Sync + 'static,
K: Clone + Eq + Hash + Facet<'static> + Send + Sync + 'static,
V: Clone + Facet<'static> + Send + Sync + 'static,
{
pub fn new(
kind: QueryKindId,
kind_name: &'static str,
compute: impl for<'db> Fn(&'db DB, K) -> ComputeFuture<'db, V> + Send + Sync + 'static,
) -> Self {
let typed_compute = TypedCompute {
f: Arc::new(compute),
_phantom: PhantomData,
};
let compute_erased: Arc<dyn ErasedCompute<DB>> = Arc::new(typed_compute);
let encode_record = make_encode_record::<K, V>();
let decode_record = make_decode_record::<K, V>();
let encode_incremental = make_encode_incremental::<K, V>();
let apply_wal_entry = make_apply_wal_entry::<K, V>();
Self {
core: DerivedCore::new(
kind,
kind_name,
encode_record,
decode_record,
encode_incremental,
apply_wal_entry,
),
_phantom: PhantomData,
compute: compute_erased,
eq_erased: eq_erased_for::<V>,
}
}
pub fn kind(&self) -> QueryKindId {
self.core.kind
}
pub fn kind_name(&self) -> &'static str {
self.core.kind_name
}
pub async fn get(&self, db: &DB, key: K) -> PicanteResult<V> {
let dyn_key = DynKey {
kind: self.core.kind,
key: Key::encode_facet(&key)?,
};
let arc_any = self
.core
.get_erased(db, dyn_key, self.compute.as_ref(), self.eq_erased)
.await?;
let arc_v = arc_any.downcast::<V>().map_err(|any| {
Arc::new(PicanteError::Panic {
message: format!(
"[BUG] type mismatch in get() for ingredient {}: expected {}, got TypeId {:?}",
self.core.kind_name,
std::any::type_name::<V>(),
(&*any as &dyn std::any::Any).type_id()
),
})
})?;
let value = Arc::try_unwrap(arc_v).unwrap_or_else(|arc| (*arc).clone());
Ok(value)
}
pub async fn touch(&self, db: &DB, key: K) -> PicanteResult<Revision> {
let dyn_key = DynKey {
kind: self.core.kind,
key: Key::encode_facet(&key)?,
};
let touch = self
.core
.touch_erased(db, dyn_key, self.compute.as_ref(), self.eq_erased)
.await?;
Ok(touch.changed_at)
}
pub fn snapshot(&self) -> im::HashMap<DynKey, Arc<ErasedCell>> {
self.core.cells.read().clone()
}
pub fn load_cells(&self, cells: im::HashMap<DynKey, Arc<ErasedCell>>) {
*self.core.cells.write() = cells;
}
pub fn cell_for_key(&self, key: &K) -> PicanteResult<Option<Arc<ErasedCell>>> {
let dyn_key = DynKey {
kind: self.core.kind,
key: Key::encode_facet(key)?,
};
Ok(self.core.cells.read().get(&dyn_key).cloned())
}
pub fn insert_ready_record(&self, key: &K, record: ErasedReadyRecord) -> PicanteResult<()> {
let dyn_key = DynKey {
kind: self.core.kind,
key: Key::encode_facet(key)?,
};
let cell = Arc::new(ErasedCell::new_ready(
record.value,
record.verified_at,
record.changed_at,
record.deps,
));
let mut cells = self.core.cells.write();
cells.insert(dyn_key, cell);
Ok(())
}
pub async fn record_is_valid_on(
&self,
db: &DB,
record: &ErasedReadyRecord,
) -> PicanteResult<bool> {
let self_changed_at = record.changed_at;
for dep in record.deps.iter() {
let Some(ingredient) = db.ingredient(dep.kind) else {
return Ok(false);
};
let touch = ingredient.touch(db, dep.key.clone()).await?;
if touch.changed_at > self_changed_at {
return Ok(false);
}
}
Ok(true)
}
pub async fn snapshot_cells_deep(&self) -> im::HashMap<DynKey, Arc<ErasedCell>>
where
V: Clone,
{
self.core.snapshot_cells_deep_inner().await
}
}
pub struct ErasedCell {
state: Mutex<ErasedState>,
notify: Notify,
}
enum ErasedState {
Vacant,
Running {
started_at: Revision,
},
Ready {
value: Arc<dyn std::any::Any + Send + Sync>,
verified_at: Revision,
changed_at: Revision,
deps: Arc<[Dep]>,
},
Poisoned {
error: Arc<PicanteError>,
verified_at: Revision,
},
}
impl ErasedCell {
fn new() -> Self {
Self {
state: Mutex::new(ErasedState::Vacant),
notify: Notify::new(),
}
}
fn new_ready(
value: Arc<dyn std::any::Any + Send + Sync>,
verified_at: Revision,
changed_at: Revision,
deps: Arc<[Dep]>,
) -> Self {
Self {
state: Mutex::new(ErasedState::Ready {
value,
verified_at,
changed_at,
deps,
}),
notify: Notify::new(),
}
}
pub async fn ready_record(&self) -> Option<ErasedReadyRecord> {
let state = self.state.lock().await;
match &*state {
ErasedState::Ready {
value,
verified_at,
changed_at,
deps,
} => Some(ErasedReadyRecord {
value: value.clone(),
verified_at: *verified_at,
changed_at: *changed_at,
deps: deps.clone(),
}),
_ => None,
}
}
}
#[derive(Clone)]
pub struct ErasedReadyRecord {
pub value: Arc<dyn std::any::Any + Send + Sync>,
pub verified_at: Revision,
pub changed_at: Revision,
pub deps: Arc<[Dep]>,
}
struct ErasedAccessResult {
value: Option<Arc<dyn std::any::Any + Send + Sync>>,
changed_at: Revision,
}
#[derive(Debug, Clone, Facet)]
struct DepRecord {
kind_id: u32,
key_bytes: Vec<u8>,
}
#[derive(Debug, Clone, Facet)]
struct DerivedRecord<K, V> {
key: K,
value: V,
verified_at: u64,
changed_at: u64,
deps: Vec<DepRecord>,
}
impl<DB, K, V> PersistableIngredient for DerivedIngredient<DB, K, V>
where
DB: IngredientLookup + Send + Sync + 'static,
K: Clone + Eq + Hash + Facet<'static> + Send + Sync + 'static,
V: Clone + Facet<'static> + Send + Sync + 'static,
{
fn kind(&self) -> QueryKindId {
self.core.kind
}
fn kind_name(&self) -> &'static str {
self.core.kind_name
}
fn section_type(&self) -> SectionType {
SectionType::Derived
}
fn clear(&self) {
let mut cells = self.core.cells.write();
*cells = im::HashMap::new();
}
fn save_records(&self) -> BoxFuture<'_, PicanteResult<Vec<Vec<u8>>>> {
Box::pin(self.core.save_records_erased())
}
fn load_records(&self, records: Vec<Vec<u8>>) -> PicanteResult<()> {
self.core.load_records_erased(records)
}
fn restore_runtime_state<'a>(
&'a self,
runtime: &'a crate::runtime::Runtime,
) -> BoxFuture<'a, PicanteResult<()>> {
Box::pin(self.core.restore_runtime_state_inner(runtime))
}
fn save_incremental_records(
&self,
since_revision: u64,
) -> BoxFuture<'_, PicanteResult<Vec<(u64, Vec<u8>, Option<Vec<u8>>)>>> {
Box::pin(self.core.save_incremental_records_erased(since_revision))
}
fn apply_wal_entry(
&self,
revision: u64,
key: Vec<u8>,
value: Option<Vec<u8>>,
) -> PicanteResult<()> {
self.core.apply_wal_entry_erased(revision, key, value)
}
}
impl<DB, K, V> DynIngredient<DB> for DerivedIngredient<DB, K, V>
where
DB: IngredientLookup + Send + Sync + 'static,
K: Clone + Eq + Hash + Facet<'static> + Send + Sync + 'static,
V: Clone + Facet<'static> + Send + Sync + 'static,
{
fn touch<'a>(&'a self, db: &'a DB, key: Key) -> BoxFuture<'a, PicanteResult<Touch>> {
let dyn_key = DynKey {
kind: self.core.kind,
key,
};
self.core
.touch_erased(db, dyn_key, self.compute.as_ref(), self.eq_erased)
}
}
fn panic_message(panic: Box<dyn std::any::Any + Send>) -> String {
if let Some(s) = panic.downcast_ref::<&str>() {
(*s).to_string()
} else if let Some(s) = panic.downcast_ref::<String>() {
s.clone()
} else {
"non-string panic payload".to_string()
}
}