use crate::entry_selector::EntryKeySelector;
use crate::prelude::*;
use crate::traits::Observer;
use crate::update_iterator::UpdateIterator;
use crate::update_state::UpdateState;
use fieldx_plus::child_build;
use fieldx_plus::fx_plus;
use moka::future::Cache as MokaCache;
use moka::ops::compute::CompResult as MokaCompResult;
use moka::policy::EvictionPolicy;
use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Display;
use std::future::Future;
use std::hash::Hash;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::task::JoinHandle;
use tokio::time::interval;
use tracing::debug;
use tracing::instrument;
pub use moka::ops::compute::Op;
macro_rules! wbc_event {
($self:ident, $method:ident($($args:tt)*) $( $post:tt )* ) => {
{
let observers = $self.observers().await;
if !observers.is_empty() {
for observer in observers.iter() {
observer.$method($($args)*).await $( $post )*;
}
}
}
};
}
macro_rules! check_error {
($self:expr) => {
if let Some(err) = $self.clear_error() {
return Err(err);
}
};
}
#[derive(Clone, Debug)]
pub(crate) enum ValueState<K, V>
where
K: Debug + Hash + Clone + Eq + Sized + Send + Sync + 'static,
V: Debug + Clone + Send + Sync + 'static,
{
Primary(V),
Secondary(K),
}
impl<K, V> ValueState<K, V>
where
K: Debug + Display + Hash + Clone + Eq + Sized + Send + Sync + 'static,
V: Debug + Clone + Send + Sync + 'static,
{
pub(crate) fn into_value(self) -> V {
match self {
Self::Primary(v) => v,
Self::Secondary(_) => panic!("secondary doesn't have a value"),
}
}
}
type ArcCache<DC> =
Arc<MokaCache<<DC as DataController>::Key, ValueState<<DC as DataController>::Key, <DC as DataController>::Value>>>;
type UpdatesHash<DC> = HashMap<<DC as DataController>::Key, Arc<UpdateState<DC>>>;
#[fx_plus(
parent,
new(off),
// Need explicit `default(off)` because the field defaults are for the builder type only.
default(off),
sync,
builder(
post_build(initial_setup),
doc("Builder object of [`Cache`].", "", "See [`Cache::builder()`] method."),
method_doc("Implement builder pattern for [`Cache`]."),
)
)]
pub struct Cache<DC>
where
DC: DataController,
DC::Key: Send + Sync + 'static,
DC::Error: Send + Sync + 'static,
{
#[fieldx(
lock,
clearer(doc("Clears and returns the last error or `None`.")),
get(clone),
set(private),
builder(off)
)]
error: Arc<DC::Error>,
#[fieldx(vis(pub), builder(vis(pub), required, into), get(clone))]
data_controller: Arc<DC>,
#[fieldx(lock, private, optional, clearer, get(off), builder(vis(pub), doc("Cache name.")))]
name: &'static str,
#[fieldx(get(copy), default(100))]
max_updates: u64,
#[fieldx(get(copy), default(10_000))]
max_capacity: u64,
#[fieldx(get(copy), set(doc("Change the flush interval.")), default(Duration::from_secs(10)))]
flush_interval: Duration,
#[fieldx(vis(pub(crate)), set(private), builder(off))]
cache: Option<ArcCache<DC>>,
#[fieldx(lock, vis(pub(crate)), set(private), get, get_mut, builder(off))]
updates: UpdatesHash<DC>,
#[fieldx(private, clearer, writer, builder(off))]
monitor_task: JoinHandle<()>,
#[fieldx(get(copy), default(10))]
monitor_tick_duration: u64,
#[fieldx(private, get(clone), default(Arc::new(tokio::sync::Notify::new())))]
flush_notifier: Arc<tokio::sync::Notify>,
#[fieldx(private, get(clone), default(Arc::new(tokio::sync::Notify::new())))]
cleanup_notifier: Arc<tokio::sync::Notify>,
#[fieldx(lock, private, get(copy), set, builder(off), default(Instant::now()))]
last_flush: Instant,
#[fieldx(mode(async), private, lock, get, builder("_observers", private), default)]
observers: Vec<Box<dyn Observer<DC>>>,
#[fieldx(mode(async), private, writer, set, get(copy), builder(off), default)]
closed: bool,
#[fieldx(builder(off), default(false.into()))]
shutdown: AtomicBool,
}
impl<DC> Cache<DC>
where
DC: DataController,
{
fn initial_setup(mut self) -> Self {
self.closed = false.into();
self.set_updates(HashMap::with_capacity(self.max_updates() as usize));
self.set_cache(Some(Arc::new(
MokaCache::builder()
.max_capacity(self.max_capacity())
.name(self.clear_name().unwrap_or_else(|| std::any::type_name::<DC::Value>()))
.eviction_policy(EvictionPolicy::tiny_lfu())
.build(),
)));
self
}
fn cache(&self) -> ArcCache<DC> {
Arc::clone(self.cache.as_ref().expect("Internal error: cache not initialized"))
}
#[instrument(level = "trace", skip(self))]
async fn get_primary_key_from(&self, key: &DC::Key) -> Result<Option<DC::Key>, DC::Error> {
Ok(if let Some(v) = self.cache().get(key).await {
Some(match v {
ValueState::Primary(_) => key.clone(),
ValueState::Secondary(ref k) => k.clone(),
})
}
else {
self.data_controller().get_primary_key_for(key).await?
})
}
#[instrument(level = "trace", skip(self, f))]
pub(crate) async fn get_and_try_compute_with_primary<F, Fut>(
&self,
key: &DC::Key,
f: F,
) -> Result<CompResult<DC>, Arc<DC::Error>>
where
F: FnOnce(Option<Entry<DC>>) -> Fut,
Fut: Future<Output = Result<Op<DC::Value>, DC::Error>>,
{
let myself = self.myself().unwrap();
self.maybe_flush_one(key).await?;
debug!("[{}] get_and_try_compute_with_primary(key: {key:?})", myself.name());
let result = self
.cache()
.entry(key.clone())
.and_try_compute_with(|entry| async {
let (wb_entry, old_v, fetched) = if let Some(entry) = entry {
let old_v = entry.into_value().into_value();
(
Some(Entry::new(&myself, key.clone(), old_v.clone())),
Some(old_v),
false,
)
}
else {
let old_v = myself.data_controller().get_for_key(key).await?;
old_v.map_or((None, None, false), |v| {
(Some(Entry::new(&myself, key.clone(), v.clone())), Some(v), true)
})
};
let op = f(wb_entry).await?;
let op = match op {
Op::Remove => {
if let Some(ref old_v) = old_v {
let secondaries = myself.data_controller().secondary_keys_of(old_v);
for skey in secondaries {
myself.cache().invalidate(&skey).await;
}
}
myself.on_delete(key, old_v.map(Arc::new)).await?
}
Op::Put(v) => {
let v = Arc::new(v);
if let Some(old_v) = old_v {
myself.on_change(key, v, old_v).await?
}
else {
myself.on_new(key, v).await?
}
}
Op::Nop => {
if fetched {
Op::Put(ValueState::Primary(old_v.unwrap()))
}
else {
Op::Nop
}
}
};
Result::<Op<ValueState<DC::Key, DC::Value>>, Arc<DC::Error>>::Ok(op)
})
.await?;
Ok(match result {
MokaCompResult::Inserted(e) => CompResult::Inserted(Entry::from_primary_entry(self, e)),
MokaCompResult::Removed(e) => CompResult::Removed(Entry::from_primary_entry(self, e)),
MokaCompResult::ReplacedWith(e) => CompResult::ReplacedWith(Entry::from_primary_entry(self, e)),
MokaCompResult::Unchanged(e) => CompResult::Unchanged(Entry::from_primary_entry(self, e)),
MokaCompResult::StillNone(k) => CompResult::StillNone(k),
})
}
#[instrument(level = "trace", skip(self, f))]
pub(crate) async fn get_and_try_compute_with_secondary<F, Fut>(
&self,
key: &DC::Key,
f: F,
) -> Result<CompResult<DC>, Arc<DC::Error>>
where
F: FnOnce(Option<Entry<DC>>) -> Fut,
Fut: Future<Output = Result<Op<DC::Value>, DC::Error>>,
{
let myself = self.myself().unwrap();
let mut primary_value = None;
let result = self
.cache()
.entry(key.clone())
.and_try_compute_with(|entry| async {
let primary_key = if let Some(entry) = entry {
Some(match entry.into_value() {
ValueState::Secondary(k) => k,
_ => panic!(
"Key '{key}' is submitted as a secondary but the corresponding cache entry is primary"
),
})
}
else {
self.get_primary_key_from(key).await?
};
let secondary_key = key.clone();
let result = if let Some(ref pkey) = primary_key {
self.get_and_try_compute_with_primary(pkey, |entry| async move {
let secondary_entry = if let Some(entry) = entry {
Some(Entry::new(&myself, secondary_key, entry.value().await.unwrap().clone()))
}
else {
None
};
f(secondary_entry).await
})
.await?
}
else {
match f(None).await? {
Op::Nop | Op::Remove => CompResult::StillNone(Arc::new(secondary_key)),
Op::Put(new_value) => {
let pkey = myself.data_controller().primary_key_of(&new_value);
self.get_and_try_compute_with_primary(&pkey, |_| async { Ok(Op::Put(new_value)) })
.await?
}
}
};
let op = match result {
CompResult::Inserted(v) | CompResult::ReplacedWith(v) | CompResult::Unchanged(v) => {
primary_value = Some(v.into_value());
Op::Put(ValueState::Secondary(primary_key.unwrap()))
}
CompResult::Removed(e) => {
primary_value = Some(e.into_value());
Op::Remove
}
CompResult::StillNone(_) => Op::Nop,
};
Result::<Op<ValueState<DC::Key, DC::Value>>, Arc<DC::Error>>::Ok(op)
})
.await?;
Ok(match result {
MokaCompResult::Inserted(_) => CompResult::Inserted(Entry::new(self, key.clone(), primary_value.unwrap())),
MokaCompResult::ReplacedWith(_) => {
CompResult::ReplacedWith(Entry::new(self, key.clone(), primary_value.unwrap()))
}
MokaCompResult::Unchanged(_) => {
CompResult::Unchanged(Entry::new(self, key.clone(), primary_value.unwrap()))
}
MokaCompResult::Removed(_) => CompResult::Removed(Entry::new(self, key.clone(), primary_value.unwrap())),
MokaCompResult::StillNone(k) => CompResult::StillNone(k),
})
}
#[instrument(level = "trace", skip(self, init))]
pub(crate) async fn get_or_try_insert_with_primary(
&self,
key: &DC::Key,
init: impl Future<Output = Result<DC::Value, DC::Error>>,
) -> Result<Entry<DC>, Arc<DC::Error>> {
debug!("get_or_try_insert_with_primary(key: {key:?})");
self.maybe_flush_one(key).await?;
let cache_entry = self
.cache()
.entry(key.clone())
.or_try_insert_with(async {
Ok(ValueState::Primary(
if let Some(v) = self.data_controller().get_for_key(key).await? {
v
}
else {
let new_value = init.await?;
self.on_new(key, Arc::new(new_value.clone())).await?;
new_value
},
))
})
.await?;
Ok(Entry::new(self, key.clone(), cache_entry.into_value().into_value()))
}
#[instrument(level = "trace", skip(self, init))]
pub(crate) async fn get_or_try_insert_with_secondary(
&self,
key: &DC::Key,
init: impl Future<Output = Result<DC::Value, DC::Error>>,
) -> Result<Entry<DC>, Arc<DC::Error>> {
let myself = self.myself().unwrap();
let result = self
.cache()
.entry(key.clone())
.and_try_compute_with(|entry| async {
let primary_key = if let Some(entry) = entry {
let ValueState::Secondary(pkey) = entry.value()
else {
panic!("Not a secondary key: '{key}'")
};
self.get_or_try_insert_with_primary(pkey, init).await?;
pkey.clone()
}
else if let Some(pkey) = myself.data_controller().get_primary_key_for(key).await? {
self.get_or_try_insert_with_primary(&pkey, init).await?;
pkey
}
else {
let new_value = init.await?;
let pkey = self.data_controller().primary_key_of(&new_value);
self.insert(new_value).await?;
pkey
};
Result::<_, Arc<DC::Error>>::Ok(Op::Put(ValueState::Secondary(primary_key)))
})
.await?;
Ok(match result {
MokaCompResult::Inserted(e) | MokaCompResult::Unchanged(e) | MokaCompResult::ReplacedWith(e) => {
Entry::new(self, key.clone(), e.into_value().into_value())
}
_ => panic!("Unexpected outcome of get_or_try_insert_with_secondary: {result:?}"),
})
}
#[instrument(level = "trace", skip(self, f))]
#[allow(clippy::type_complexity)]
pub(crate) async fn get_update_state_and_compute<'a, F, Fut>(
&self,
key: &'a DC::Key,
value: Option<Arc<DC::Value>>,
f: F,
) -> Result<Op<ValueState<DC::Key, DC::Value>>, DC::Error>
where
F: FnOnce(&'a DC::Key, Option<Arc<DC::Value>>, Arc<UpdateState<DC>>) -> Fut,
Fut: Future<Output = Result<DataControllerOp, DC::Error>>,
{
let update_state;
self.check_task().await;
{
let mut updates = self.updates_mut();
let count = updates.len();
let current_capacity = updates.capacity();
if current_capacity <= (count - (count / 10)) {
updates.reserve(current_capacity.max(self.max_updates() as usize) / 2);
}
let update_key = value
.as_ref()
.map(|v| self.data_controller().primary_key_of(v))
.unwrap_or(key.clone());
if !updates.contains_key(&update_key) {
update_state = child_build!(self, UpdateState<DC>).unwrap();
updates.insert(key.clone(), Arc::clone(&update_state));
}
else {
update_state = updates.get(&update_key).unwrap().clone();
}
};
let op = f(key, value.as_ref().map(Arc::clone), Arc::clone(&update_state)).await?;
self.updates_mut().insert(key.clone(), update_state.clone());
self._on_dc_op(op, key, value).await
}
#[allow(dead_code)]
#[inline]
pub fn name(&self) -> String {
self.cache().name().unwrap_or("<anon>").to_string()
}
#[instrument(level = "trace")]
pub async fn entry(&self, key: DC::Key) -> Result<EntryKeySelector<DC>, Arc<DC::Error>> {
check_error!(self);
Ok(if let Some(pkey) = self.get_primary_key_from(&key).await? {
child_build!(
self,
EntryKeySelector<DC> {
primary: pkey == key,
key: key,
primary_key: pkey,
}
)
}
else {
child_build!(
self,
EntryKeySelector<DC> {
primary: self.data_controller().is_primary(&key),
key: key,
}
)
}
.unwrap())
}
#[instrument(level = "trace")]
pub async fn get(&self, key: &DC::Key) -> Result<Option<DC::Value>, Arc<DC::Error>> {
check_error!(self);
let outcome = self
.entry(key.clone())
.await?
.and_try_compute_with(|_| async { Ok(Op::Nop) })
.await?;
Ok(match outcome {
CompResult::Inserted(entry) | CompResult::Unchanged(entry) | CompResult::ReplacedWith(entry) => {
let value = entry.into_value();
self.on_access(key, Arc::new(value.clone())).await?;
Some(value)
}
CompResult::StillNone(_) => None,
_ => None,
})
}
#[instrument(level = "trace")]
pub async fn insert(&self, value: DC::Value) -> Result<Option<DC::Value>, Arc<DC::Error>> {
check_error!(self);
let key = self.data_controller().primary_key_of(&value);
let res = self
.cache()
.entry(key.clone())
.and_try_compute_with(|_| async {
let op = self.on_new(&key, Arc::new(value)).await;
op
})
.await?;
match res {
MokaCompResult::Inserted(entry)
| MokaCompResult::ReplacedWith(entry)
| MokaCompResult::Unchanged(entry) => {
let value = entry.into_value().into_value();
Ok(Some(value))
}
MokaCompResult::StillNone(_) => Ok(None),
_ => panic!("Impossible result of insert operation: {res:?}"),
}
}
#[instrument(level = "trace")]
pub async fn delete(&self, key: &DC::Key) -> Result<Option<DC::Value>, Arc<DC::Error>> {
check_error!(self);
let result = self
.entry(key.clone())
.await?
.and_try_compute_with(|entry| async move { Ok(if entry.is_some() { Op::Remove } else { Op::Nop }) })
.await?;
Ok(match result {
CompResult::Removed(entry) => Some(entry.into_value()),
CompResult::StillNone(_) => None,
_ => panic!("Impossible result of delete operation: {result:?}"),
})
}
#[instrument(level = "trace")]
pub async fn invalidate(&self, key: &DC::Key) -> Result<(), Arc<DC::Error>> {
check_error!(self);
let myself = self.myself().unwrap();
self.cache()
.entry(key.clone())
.and_compute_with(|entry| async {
if let Some(entry) = entry {
if let ValueState::Primary(value) = entry.value() {
for secondary in myself.data_controller().secondary_keys_of(value) {
myself.cache().invalidate(&secondary).await;
}
}
}
Op::Remove
})
.await;
Ok(())
}
fn _purify_updates(&self, update_iter: Arc<UpdateIterator<DC>>) -> usize {
let mut updates = self.updates_mut();
let count = 0;
for (key, guard) in update_iter.worked_mut().iter() {
let update = updates.get_mut(key).unwrap();
if Arc::strong_count(update) > 1 {
continue;
}
if guard.is_none() {
updates.remove(key);
}
}
count
}
pub async fn flush_many_raw(&self, keys: Vec<DC::Key>) -> Result<usize, DC::Error> {
let update_iter = child_build!(
self, UpdateIterator<DC> {
keys: keys
}
)
.expect("Internal error: UpdateIterator builder failure");
wbc_event!(self, on_flush(update_iter.clone())?);
update_iter.reset();
self.data_controller().write_back(update_iter.clone()).await?;
let updates_count = self._purify_updates(update_iter);
self.set_last_flush(Instant::now());
Ok(updates_count)
}
#[instrument(level = "trace")]
pub async fn flush_raw(&self) -> Result<usize, DC::Error> {
let update_keys = {
let updates = self.updates();
updates.keys().cloned().collect::<Vec<_>>()
};
self.flush_many_raw(update_keys).await
}
#[instrument(level = "trace")]
pub async fn flush(&self) -> Result<usize, Arc<DC::Error>> {
check_error!(self);
self.flush_raw().await.map_err(Arc::new)
}
#[instrument(level = "trace")]
pub async fn soft_flush(&self) -> Result<(), Arc<DC::Error>> {
check_error!(self);
self.flush_notifier().notify_waiters();
Ok(())
}
#[instrument(level = "trace")]
pub async fn flush_one(&self, key: &DC::Key) -> Result<usize, Arc<DC::Error>> {
check_error!(self);
let update = self.updates().get(key).cloned();
if let Some(update) = update {
let guard = update.data.clone().write_owned().await;
if let Some(update_data) = guard.as_ref() {
wbc_event!(self, on_flush_one(key, update_data)?);
debug!("flush single key: {key:?}");
let update_iter = child_build!(
self, UpdateIterator<DC> {
key_guard: (key.clone(), guard),
}
)
.expect("Internal error: UpdateIterator builder failure");
self.data_controller().write_back(update_iter.clone()).await?;
return Ok(self._purify_updates(update_iter));
}
}
Ok(0)
}
#[instrument(level = "trace")]
async fn monitor_updates(&self) {
let flush_interval = self.flush_interval();
let mut ticking_interval = interval(Duration::from_millis(self.monitor_tick_duration()));
ticking_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let max_updates = self.max_updates() as usize;
let mut closed_guard = self.write_closed().await;
loop {
if *closed_guard {
break;
}
let mut forced = false;
let flush_notifier = self.flush_notifier();
let cleanup_notifier = self.cleanup_notifier();
tokio::select! {
_ = flush_notifier.notified() => {
forced = true;
}
_ = cleanup_notifier.notified() => {
forced = true;
}
_ = ticking_interval.tick() => {
}
}
*closed_guard = self.is_shut_down();
if self.updates().is_empty() {
continue;
}
if forced
|| *closed_guard
|| self.updates().len() > max_updates
|| self.last_flush().elapsed() >= flush_interval
{
if let Err(error) = self.flush().await {
self.set_error(error.clone());
wbc_event!(self, on_monitor_error(&error));
}
}
}
}
#[instrument(level = "trace", skip(self))]
async fn check_task(&self) {
let mut task_guard = self.write_monitor_task();
if task_guard.as_ref().is_none_or(|t| t.is_finished()) {
let async_self = self.myself().unwrap();
*task_guard = Some(tokio::spawn(async move { async_self.monitor_updates().await }));
}
}
async fn _on_dc_op(
&self,
op: DataControllerOp,
key: &DC::Key,
value: Option<Arc<DC::Value>>,
) -> Result<Op<ValueState<DC::Key, DC::Value>>, DC::Error> {
Ok(match op {
DataControllerOp::Nop => Op::Nop,
DataControllerOp::Insert => {
if let Some(value) = value {
Op::Put(ValueState::Primary(value.as_ref().clone()))
}
else {
Op::Nop
}
}
DataControllerOp::Revoke => Op::Remove,
DataControllerOp::Drop => {
self.updates_mut().remove(key);
Op::Remove
}
})
}
#[instrument(level = "trace")]
#[allow(clippy::type_complexity)]
pub(crate) async fn on_new(
&self,
key: &DC::Key,
value: Arc<DC::Value>,
) -> Result<Op<ValueState<DC::Key, DC::Value>>, DC::Error> {
self.get_update_state_and_compute(key, Some(value), |key, value, update_state| async move {
update_state.on_new(key, value.as_ref().unwrap()).await
})
.await
}
#[instrument(level = "trace")]
#[allow(clippy::type_complexity)]
pub(crate) async fn on_change(
&self,
key: &DC::Key,
value: Arc<DC::Value>,
old_val: DC::Value,
) -> Result<Op<ValueState<DC::Key, DC::Value>>, DC::Error> {
self.get_update_state_and_compute(key, Some(value), |key, value, update_state| async move {
update_state.on_change(key, value.unwrap(), old_val).await
})
.await
}
#[instrument(level = "trace")]
pub(crate) async fn on_access<'a>(&self, key: &DC::Key, value: Arc<DC::Value>) -> Result<(), DC::Error> {
self.get_update_state_and_compute(key, Some(value), |key, value, update_state| async move {
update_state.on_access(key, value.unwrap()).await
})
.await?;
Ok(())
}
#[instrument(level = "trace")]
#[allow(clippy::type_complexity)]
pub(crate) async fn on_delete(
&self,
key: &DC::Key,
value: Option<Arc<DC::Value>>,
) -> Result<Op<ValueState<DC::Key, DC::Value>>, DC::Error> {
self.get_update_state_and_compute(key, value, |key, _value, update_state| async move {
update_state.on_delete(key).await
})
.await
}
#[instrument(level = "trace")]
pub(crate) async fn maybe_flush_one(&self, key: &DC::Key) -> Result<usize, Arc<DC::Error>> {
if self.cache().contains_key(key) {
return Ok(0);
}
debug!("DO flush_one(key: {key:?})");
return self.flush_one(key).await;
}
#[instrument(level = "trace")]
pub async fn close(&self) -> Result<(), Arc<DC::Error>> {
check_error!(self);
if self.is_shut_down() {
return Ok(());
}
self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
self.cleanup_notifier().notify_waiters();
let _ = self.write_closed().await;
Ok(())
}
pub fn is_shut_down(&self) -> bool {
self.shutdown.load(std::sync::atomic::Ordering::SeqCst)
}
}
impl<DC> Debug for Cache<DC>
where
DC: DataController,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Cache")
.field("name", &self.name())
.field("updates_count", &self.updates().len())
.field("cache_entries", &self.cache().entry_count())
.finish()
}
}
impl<DC> CacheBuilder<DC>
where
DC: DataController,
{
pub fn observer(mut self, observer: impl Observer<DC>) -> Self {
if let Some(ref mut observers) = self.observers {
observers.push(Box::new(observer));
self
}
else {
self._observers(vec![Box::new(observer)])
}
}
}