wb_cache/
cache.rs

1use crate::entry_selector::EntryKeySelector;
2use crate::prelude::*;
3use crate::traits::Observer;
4use crate::update_iterator::UpdateIterator;
5use crate::update_state::UpdateState;
6
7use fieldx_plus::child_build;
8use fieldx_plus::fx_plus;
9use moka::future::Cache as MokaCache;
10use moka::ops::compute::CompResult as MokaCompResult;
11use moka::policy::EvictionPolicy;
12use std::collections::HashMap;
13use std::fmt::Debug;
14use std::fmt::Display;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::atomic::AtomicBool;
18use std::sync::Arc;
19use std::time::Duration;
20use std::time::Instant;
21use tokio::task::JoinHandle;
22use tokio::time::interval;
23use tracing::debug;
24use tracing::instrument;
25
26pub use moka::ops::compute::Op;
27
28macro_rules! wbc_event {
29    ($self:ident, $method:ident($($args:tt)*) $( $post:tt )* ) => {
30        {
31        let observers = $self.observers().await;
32        if !observers.is_empty() {
33            for observer in observers.iter() {
34                observer.$method($($args)*).await $( $post )*;
35            }
36        }
37        }
38    };
39}
40
41macro_rules! check_error {
42    ($self:expr) => {
43        if let Some(err) = $self.clear_error() {
44            return Err(err);
45        }
46    };
47}
48
49// The key `K` for a secondary entry must always match the primary key,
50// as it uniquely identifies the corresponding primary value in the cache.
51#[derive(Clone, Debug)]
52pub(crate) enum ValueState<K, V>
53where
54    K: Debug + Hash + Clone + Eq + Sized + Send + Sync + 'static,
55    V: Debug + Clone + Send + Sync + 'static,
56{
57    Primary(V),
58    Secondary(K),
59}
60
61impl<K, V> ValueState<K, V>
62where
63    K: Debug + Display + Hash + Clone + Eq + Sized + Send + Sync + 'static,
64    V: Debug + Clone + Send + Sync + 'static,
65{
66    pub(crate) fn into_value(self) -> V {
67        match self {
68            Self::Primary(v) => v,
69            Self::Secondary(_) => panic!("secondary doesn't have a value"),
70        }
71    }
72}
73
74type ArcCache<DC> =
75    Arc<MokaCache<<DC as DataController>::Key, ValueState<<DC as DataController>::Key, <DC as DataController>::Value>>>;
76type UpdatesHash<DC> = HashMap<<DC as DataController>::Key, Arc<UpdateState<DC>>>;
77
78/// This is where all the magic happens!
79///
80/// ```ignore
81/// let controller = MyDataController::new(host, port);
82/// let cache = Cache::builder()
83///     .data_controller(controller)
84///     .max_updates(1000)
85///     .max_capacity(100_000)
86///    .flush_interval(Duration::from_secs(10))
87///    .build();
88///
89/// // The key type is defined by the data controller implementation of DataController.
90/// cache.entry(key).await?
91///     .and_try_compute_with(|entry| async {
92///        let record = if let Some(entry) = entry {
93///            modify(entry.into_value()).await?
94///        }
95///        else {
96///           create_new().await?
97///        };
98///
99///        Ok(Op::Put(record))
100///     })
101///     .await?;
102/// ```
103#[fx_plus(
104    parent,
105    new(off),
106    // Need explicit `default(off)` because the field defaults are for the builder type only.
107    default(off),
108    sync,
109    builder(
110        post_build(initial_setup),
111        doc("Builder object of [`Cache`].", "", "See [`Cache::builder()`] method."),
112        method_doc("Implement builder pattern for [`Cache`]."),
113    )
114)]
115pub struct Cache<DC>
116where
117    DC: DataController,
118    DC::Key: Send + Sync + 'static,
119    DC::Error: Send + Sync + 'static,
120{
121    /// The last error that occurred within the background task.  If set, any subsequent call to a public method
122    /// operating on the cache object will return this error.
123    #[fieldx(
124        lock,
125        clearer(doc("Clears and returns the last error or `None`.")),
126        get(clone),
127        set(private),
128        builder(off)
129    )]
130    error: Arc<DC::Error>,
131
132    /// The data controller object.
133    #[fieldx(vis(pub), builder(vis(pub), required, into), get(clone))]
134    data_controller: Arc<DC>,
135
136    // This is a transitive attribute that we use to get a name from the builder and then bypass it into the cache
137    // object builder.
138    #[fieldx(lock, private, optional, clearer, get(off), builder(vis(pub), doc("Cache name.")))]
139    name: &'static str,
140
141    /// The maximum size of the updates pool. This not a hard limit but a threshold that triggers automatic flushing
142    /// by the background task.
143    ///
144    /// Defaults to 100.
145    #[fieldx(get(copy), default(100))]
146    max_updates: u64,
147
148    /// The maximum cache capacity. This is a hard limit on the number of key entries (not records).
149    /// See the crate documentation for details.
150    ///
151    /// Defaults to 10000.
152    #[fieldx(get(copy), default(10_000))]
153    max_capacity: u64,
154
155    /// The delay between two consecutive flushes. If a flush was manually requested then the timer is reset.
156    #[fieldx(get(copy), set(doc("Change the flush interval.")), default(Duration::from_secs(10)))]
157    flush_interval: Duration,
158
159    #[fieldx(vis(pub(crate)), set(private), builder(off))]
160    cache: Option<ArcCache<DC>>,
161
162    // Here we keep the ensured update records, i.e. those ready to be submitted back to the data controller for processing.
163    #[fieldx(lock, vis(pub(crate)), set(private), get, get_mut, builder(off))]
164    updates: UpdatesHash<DC>,
165
166    #[fieldx(private, clearer, writer, builder(off))]
167    monitor_task: JoinHandle<()>,
168
169    /// The period of time between two consecutive checks of the cache state by the background task.
170    #[fieldx(get(copy), default(10))]
171    monitor_tick_duration: u64,
172
173    #[fieldx(private, get(clone), default(Arc::new(tokio::sync::Notify::new())))]
174    flush_notifier: Arc<tokio::sync::Notify>,
175
176    #[fieldx(private, get(clone), default(Arc::new(tokio::sync::Notify::new())))]
177    cleanup_notifier: Arc<tokio::sync::Notify>,
178
179    #[fieldx(lock, private, get(copy), set, builder(off), default(Instant::now()))]
180    last_flush: Instant,
181
182    #[fieldx(mode(async), private, lock, get, builder("_observers", private), default)]
183    observers: Vec<Box<dyn Observer<DC>>>,
184
185    #[fieldx(mode(async), private, writer, set, get(copy), builder(off), default)]
186    closed: bool,
187
188    #[fieldx(builder(off), default(false.into()))]
189    shutdown: AtomicBool,
190}
191
192impl<DC> Cache<DC>
193where
194    DC: DataController,
195{
196    fn initial_setup(mut self) -> Self {
197        self.closed = false.into();
198        self.set_updates(HashMap::with_capacity(self.max_updates() as usize));
199        self.set_cache(Some(Arc::new(
200            MokaCache::builder()
201                .max_capacity(self.max_capacity())
202                .name(self.clear_name().unwrap_or_else(|| std::any::type_name::<DC::Value>()))
203                .eviction_policy(EvictionPolicy::tiny_lfu())
204                .build(),
205        )));
206
207        self
208    }
209
210    fn cache(&self) -> ArcCache<DC> {
211        Arc::clone(self.cache.as_ref().expect("Internal error: cache not initialized"))
212    }
213
214    #[instrument(level = "trace", skip(self))]
215    async fn get_primary_key_from(&self, key: &DC::Key) -> Result<Option<DC::Key>, DC::Error> {
216        Ok(if let Some(v) = self.cache().get(key).await {
217            Some(match v {
218                ValueState::Primary(_) => key.clone(),
219                ValueState::Secondary(ref k) => k.clone(),
220            })
221        }
222        else {
223            self.data_controller().get_primary_key_for(key).await?
224        })
225    }
226
227    // This method is for primaries only
228    #[instrument(level = "trace", skip(self, f))]
229    pub(crate) async fn get_and_try_compute_with_primary<F, Fut>(
230        &self,
231        key: &DC::Key,
232        f: F,
233    ) -> Result<CompResult<DC>, Arc<DC::Error>>
234    where
235        F: FnOnce(Option<Entry<DC>>) -> Fut,
236        Fut: Future<Output = Result<Op<DC::Value>, DC::Error>>,
237    {
238        let myself = self.myself().unwrap();
239
240        self.maybe_flush_one(key).await?;
241        debug!("[{}] get_and_try_compute_with_primary(key: {key:?})", myself.name());
242
243        let result = self
244            .cache()
245            .entry(key.clone())
246            .and_try_compute_with(|entry| async {
247                let (wb_entry, old_v, fetched) = if let Some(entry) = entry {
248                    let old_v = entry.into_value().into_value();
249                    (
250                        Some(Entry::new(&myself, key.clone(), old_v.clone())),
251                        Some(old_v),
252                        false,
253                    )
254                }
255                else {
256                    let old_v = myself.data_controller().get_for_key(key).await?;
257                    old_v.map_or((None, None, false), |v| {
258                        (Some(Entry::new(&myself, key.clone(), v.clone())), Some(v), true)
259                    })
260                };
261
262                let op = f(wb_entry).await?;
263
264                let op = match op {
265                    Op::Remove => {
266                        if let Some(ref old_v) = old_v {
267                            let secondaries = myself.data_controller().secondary_keys_of(old_v);
268                            // If primary gets removed all related secondaries are to be dropped out of the cache.
269                            for skey in secondaries {
270                                myself.cache().invalidate(&skey).await;
271                            }
272                        }
273
274                        myself.on_delete(key, old_v.map(Arc::new)).await?
275                    }
276                    Op::Put(v) => {
277                        let v = Arc::new(v);
278                        if let Some(old_v) = old_v {
279                            myself.on_change(key, v, old_v).await?
280                        }
281                        else {
282                            myself.on_new(key, v).await?
283                        }
284                    }
285                    Op::Nop => {
286                        // Even if user wants to do nothing we still need to put the newly fetched value into cache.
287                        if fetched {
288                            Op::Put(ValueState::Primary(old_v.unwrap()))
289                        }
290                        else {
291                            Op::Nop
292                        }
293                    }
294                };
295
296                Result::<Op<ValueState<DC::Key, DC::Value>>, Arc<DC::Error>>::Ok(op)
297            })
298            .await?;
299
300        Ok(match result {
301            MokaCompResult::Inserted(e) => CompResult::Inserted(Entry::from_primary_entry(self, e)),
302            MokaCompResult::Removed(e) => CompResult::Removed(Entry::from_primary_entry(self, e)),
303            MokaCompResult::ReplacedWith(e) => CompResult::ReplacedWith(Entry::from_primary_entry(self, e)),
304            MokaCompResult::Unchanged(e) => CompResult::Unchanged(Entry::from_primary_entry(self, e)),
305            MokaCompResult::StillNone(k) => CompResult::StillNone(k),
306        })
307    }
308
309    // This method is for secondaries only. If CompResult wraps an Entry then the entry would have secondary key and
310    // the primary's value in it because this is what'd be expected by user.
311    #[instrument(level = "trace", skip(self, f))]
312    pub(crate) async fn get_and_try_compute_with_secondary<F, Fut>(
313        &self,
314        key: &DC::Key,
315        f: F,
316    ) -> Result<CompResult<DC>, Arc<DC::Error>>
317    where
318        F: FnOnce(Option<Entry<DC>>) -> Fut,
319        Fut: Future<Output = Result<Op<DC::Value>, DC::Error>>,
320    {
321        let myself = self.myself().unwrap();
322        let mut primary_value = None;
323
324        let result = self
325            .cache()
326            .entry(key.clone())
327            .and_try_compute_with(|entry| async {
328                let primary_key = if let Some(entry) = entry {
329                    Some(match entry.into_value() {
330                        ValueState::Secondary(k) => k,
331                        _ => panic!(
332                            "Key '{key}' is submitted as a secondary but the corresponding cache entry is primary"
333                        ),
334                    })
335                }
336                else {
337                    self.get_primary_key_from(key).await?
338                };
339
340                let secondary_key = key.clone();
341
342                let result = if let Some(ref pkey) = primary_key {
343                    self.get_and_try_compute_with_primary(pkey, |entry| async move {
344                        let secondary_entry = if let Some(entry) = entry {
345                            // Some(Entry::new(&myself, secondary_key, entry.value().await?.clone()))
346                            // XXX Temporary!
347                            Some(Entry::new(&myself, secondary_key, entry.value().await.unwrap().clone()))
348                        }
349                        else {
350                            None
351                        };
352                        f(secondary_entry).await
353                    })
354                    .await?
355                }
356                else {
357                    // We don't know the primary key yet. Get the user's response and see what to do with it.
358                    // Since the primary key will be known only if the user has provided a value it is only possible
359                    // to use the get_and_try_compute_with_primary method if Op::Put is returned.
360                    match f(None).await? {
361                        Op::Nop | Op::Remove => CompResult::StillNone(Arc::new(secondary_key)),
362                        Op::Put(new_value) => {
363                            let pkey = myself.data_controller().primary_key_of(&new_value);
364                            self.get_and_try_compute_with_primary(&pkey, |_| async { Ok(Op::Put(new_value)) })
365                                .await?
366                        }
367                    }
368                };
369
370                let op = match result {
371                    CompResult::Inserted(v) | CompResult::ReplacedWith(v) | CompResult::Unchanged(v) => {
372                        primary_value = Some(v.into_value());
373                        Op::Put(ValueState::Secondary(primary_key.unwrap()))
374                    }
375                    CompResult::Removed(e) => {
376                        primary_value = Some(e.into_value());
377                        Op::Remove
378                    }
379                    CompResult::StillNone(_) => Op::Nop,
380                };
381
382                Result::<Op<ValueState<DC::Key, DC::Value>>, Arc<DC::Error>>::Ok(op)
383            })
384            .await?;
385
386        Ok(match result {
387            MokaCompResult::Inserted(_) => CompResult::Inserted(Entry::new(self, key.clone(), primary_value.unwrap())),
388            MokaCompResult::ReplacedWith(_) => {
389                CompResult::ReplacedWith(Entry::new(self, key.clone(), primary_value.unwrap()))
390            }
391            MokaCompResult::Unchanged(_) => {
392                CompResult::Unchanged(Entry::new(self, key.clone(), primary_value.unwrap()))
393            }
394            MokaCompResult::Removed(_) => CompResult::Removed(Entry::new(self, key.clone(), primary_value.unwrap())),
395            MokaCompResult::StillNone(k) => CompResult::StillNone(k),
396        })
397    }
398
399    #[instrument(level = "trace", skip(self, init))]
400    pub(crate) async fn get_or_try_insert_with_primary(
401        &self,
402        key: &DC::Key,
403        init: impl Future<Output = Result<DC::Value, DC::Error>>,
404    ) -> Result<Entry<DC>, Arc<DC::Error>> {
405        debug!("get_or_try_insert_with_primary(key: {key:?})");
406
407        self.maybe_flush_one(key).await?;
408
409        let cache_entry = self
410            .cache()
411            .entry(key.clone())
412            .or_try_insert_with(async {
413                Ok(ValueState::Primary(
414                    if let Some(v) = self.data_controller().get_for_key(key).await? {
415                        v
416                    }
417                    else {
418                        let new_value = init.await?;
419                        self.on_new(key, Arc::new(new_value.clone())).await?;
420                        new_value
421                    },
422                ))
423            })
424            .await?;
425        Ok(Entry::new(self, key.clone(), cache_entry.into_value().into_value()))
426    }
427
428    #[instrument(level = "trace", skip(self, init))]
429    pub(crate) async fn get_or_try_insert_with_secondary(
430        &self,
431        key: &DC::Key,
432        init: impl Future<Output = Result<DC::Value, DC::Error>>,
433    ) -> Result<Entry<DC>, Arc<DC::Error>> {
434        let myself = self.myself().unwrap();
435        let result = self
436            .cache()
437            .entry(key.clone())
438            .and_try_compute_with(|entry| async {
439                let primary_key = if let Some(entry) = entry {
440                    let ValueState::Secondary(pkey) = entry.value()
441                    else {
442                        panic!("Not a secondary key: '{key}'")
443                    };
444                    self.get_or_try_insert_with_primary(pkey, init).await?;
445                    pkey.clone()
446                }
447                else if let Some(pkey) = myself.data_controller().get_primary_key_for(key).await? {
448                    self.get_or_try_insert_with_primary(&pkey, init).await?;
449                    pkey
450                }
451                else {
452                    let new_value = init.await?;
453                    let pkey = self.data_controller().primary_key_of(&new_value);
454                    self.insert(new_value).await?;
455                    pkey
456                };
457
458                Result::<_, Arc<DC::Error>>::Ok(Op::Put(ValueState::Secondary(primary_key)))
459            })
460            .await?;
461
462        Ok(match result {
463            MokaCompResult::Inserted(e) | MokaCompResult::Unchanged(e) | MokaCompResult::ReplacedWith(e) => {
464                Entry::new(self, key.clone(), e.into_value().into_value())
465            }
466            _ => panic!("Unexpected outcome of get_or_try_insert_with_secondary: {result:?}"),
467        })
468    }
469
470    // This method ensures that each thread locks the entire cache only for the duration required to create a new hash
471    // entry.
472    #[instrument(level = "trace", skip(self, f))]
473    #[allow(clippy::type_complexity)]
474    pub(crate) async fn get_update_state_and_compute<'a, F, Fut>(
475        &self,
476        key: &'a DC::Key,
477        value: Option<Arc<DC::Value>>,
478        f: F,
479    ) -> Result<Op<ValueState<DC::Key, DC::Value>>, DC::Error>
480    where
481        F: FnOnce(&'a DC::Key, Option<Arc<DC::Value>>, Arc<UpdateState<DC>>) -> Fut,
482        Fut: Future<Output = Result<DataControllerOp, DC::Error>>,
483    {
484        let update_state;
485        self.check_task().await;
486        {
487            let mut updates = self.updates_mut();
488
489            let count = updates.len();
490            let current_capacity = updates.capacity();
491            if current_capacity <= (count - (count / 10)) {
492                updates.reserve(current_capacity.max(self.max_updates() as usize) / 2);
493            }
494
495            let update_key = value
496                .as_ref()
497                .map(|v| self.data_controller().primary_key_of(v))
498                .unwrap_or(key.clone());
499
500            if !updates.contains_key(&update_key) {
501                update_state = child_build!(self, UpdateState<DC>).unwrap();
502                updates.insert(key.clone(), Arc::clone(&update_state));
503            }
504            else {
505                update_state = updates.get(&update_key).unwrap().clone();
506            }
507            // At this point, the update state has at least two references, which protects it from being collected by
508            // the _purify_updates method.
509        };
510
511        let op = f(key, value.as_ref().map(Arc::clone), Arc::clone(&update_state)).await?;
512
513        // There is a chance that if the update was empty when fetched, it was collected by the _purify_updates method.
514        // For security reasons, we insert the update back into the pool unconditionally.
515        self.updates_mut().insert(key.clone(), update_state.clone());
516
517        self._on_dc_op(op, key, value).await
518    }
519
520    /// Cache name. Most useful for debugging and logging.
521    #[allow(dead_code)]
522    #[inline]
523    pub fn name(&self) -> String {
524        self.cache().name().unwrap_or("<anon>").to_string()
525    }
526
527    /// Returns an object that represents a key in the cache.
528    #[instrument(level = "trace")]
529    pub async fn entry(&self, key: DC::Key) -> Result<EntryKeySelector<DC>, Arc<DC::Error>> {
530        check_error!(self);
531        Ok(if let Some(pkey) = self.get_primary_key_from(&key).await? {
532            child_build!(
533                self,
534                EntryKeySelector<DC> {
535                    primary: pkey == key,
536                    key: key,
537                    primary_key: pkey,
538                }
539            )
540        }
541        else {
542            child_build!(
543                self,
544                EntryKeySelector<DC> {
545                    primary: self.data_controller().is_primary(&key),
546                    key: key,
547                }
548            )
549        }
550        .unwrap())
551    }
552
553    /// Try to get a value from the cache by its key. If the value is not present, the controller attempts to fetch it
554    /// via the data controller. Returns `None` if such a key does not exist.
555    #[instrument(level = "trace")]
556    pub async fn get(&self, key: &DC::Key) -> Result<Option<DC::Value>, Arc<DC::Error>> {
557        check_error!(self);
558        let outcome = self
559            .entry(key.clone())
560            .await?
561            .and_try_compute_with(|_| async { Ok(Op::Nop) })
562            .await?;
563
564        Ok(match outcome {
565            CompResult::Inserted(entry) | CompResult::Unchanged(entry) | CompResult::ReplacedWith(entry) => {
566                let value = entry.into_value();
567                self.on_access(key, Arc::new(value.clone())).await?;
568                Some(value)
569            }
570            CompResult::StillNone(_) => None,
571            _ => None,
572        })
573    }
574
575    /// Insert a value into the cache. This operation triggers the [`on_new`](crate#on_new) data controller
576    /// chain of events even in case there is already a value present for the same key.
577    #[instrument(level = "trace")]
578    pub async fn insert(&self, value: DC::Value) -> Result<Option<DC::Value>, Arc<DC::Error>> {
579        check_error!(self);
580        let key = self.data_controller().primary_key_of(&value);
581        let res = self
582            .cache()
583            .entry(key.clone())
584            .and_try_compute_with(|_| async {
585                let op = self.on_new(&key, Arc::new(value)).await;
586                op
587            })
588            .await?;
589
590        match res {
591            MokaCompResult::Inserted(entry)
592            | MokaCompResult::ReplacedWith(entry)
593            | MokaCompResult::Unchanged(entry) => {
594                let value = entry.into_value().into_value();
595                Ok(Some(value))
596            }
597            MokaCompResult::StillNone(_) => Ok(None),
598            _ => panic!("Impossible result of insert operation: {res:?}"),
599        }
600    }
601
602    /// Delete a value from the cache by its key. If the value is not cached yet, it will be fetched from the backend
603    /// first. This behavior is subject to further optimization. In most cases, however, this should not be a problem
604    /// because if one knows what to delete without inspecting it first or using it in other ways, then it would be more
605    /// efficient to delete directly in the backend.
606    #[instrument(level = "trace")]
607    pub async fn delete(&self, key: &DC::Key) -> Result<Option<DC::Value>, Arc<DC::Error>> {
608        check_error!(self);
609        // let value = self.cache().remove(key).await;
610        let result = self
611            .entry(key.clone())
612            .await?
613            .and_try_compute_with(|entry| async move { Ok(if entry.is_some() { Op::Remove } else { Op::Nop }) })
614            .await?;
615
616        Ok(match result {
617            CompResult::Removed(entry) => Some(entry.into_value()),
618            CompResult::StillNone(_) => None,
619            _ => panic!("Impossible result of delete operation: {result:?}"),
620        })
621    }
622
623    /// Invalidate a key in the cache. A secondary key is invalidated by removing it from the cache, while a primary key
624    /// is invalidated by removing it and all its secondary keys from the cache.
625    #[instrument(level = "trace")]
626    pub async fn invalidate(&self, key: &DC::Key) -> Result<(), Arc<DC::Error>> {
627        check_error!(self);
628        let myself = self.myself().unwrap();
629        self.cache()
630            .entry(key.clone())
631            .and_compute_with(|entry| async {
632                if let Some(entry) = entry {
633                    if let ValueState::Primary(value) = entry.value() {
634                        for secondary in myself.data_controller().secondary_keys_of(value) {
635                            myself.cache().invalidate(&secondary).await;
636                        }
637                    }
638                }
639                Op::Remove
640            })
641            .await;
642        Ok(())
643    }
644
645    // Remove succesfully written updates from the pool.
646    fn _purify_updates(&self, update_iter: Arc<UpdateIterator<DC>>) -> usize {
647        // First of all, ensure minimal overhead on lock acquisition. The operation itself is not going to take too long
648        // and must be deadlock-free as well.
649        let mut updates = self.updates_mut();
650        let count = 0;
651        for (key, guard) in update_iter.worked_mut().iter() {
652            let update = updates.get_mut(key).unwrap();
653            if Arc::strong_count(update) > 1 {
654                // This update is being used somewhere else. We cannot remove it from the pool yet.
655                continue;
656            }
657
658            // If the update entry has no update in it then it's been written and we can remove it from the pool.
659            if guard.is_none() {
660                updates.remove(key);
661            }
662        }
663        // At this point the update iterator would be finally dropped and all the locks on update records that are still
664        // present in the pool would be released.
665
666        // Return the number of updates that have been actually flushed.
667        count
668    }
669
670    /// Low-level immediate flush operation that requires a list of keys to flush.  It does not check for
671    /// [error](#method.error) generated by the background task.
672    pub async fn flush_many_raw(&self, keys: Vec<DC::Key>) -> Result<usize, DC::Error> {
673        let update_iter = child_build!(
674            self, UpdateIterator<DC> {
675                keys: keys
676            }
677        )
678        .expect("Internal error: UpdateIterator builder failure");
679
680        wbc_event!(self, on_flush(update_iter.clone())?);
681
682        // Allow the update iterator to be re-iterated.  Since it's a non-deterministic iterator whose outcomes depend
683        // on the state of the cache update pool, its implementation supports such uncommon usage.
684        update_iter.reset();
685
686        self.data_controller().write_back(update_iter.clone()).await?;
687        let updates_count = self._purify_updates(update_iter);
688        self.set_last_flush(Instant::now());
689
690        Ok(updates_count)
691    }
692
693    /// The same as the [`flush`](#method.flush) method but does not check for [error](#method.error) that may have
694    /// occurred in the background task.
695    #[instrument(level = "trace")]
696    pub async fn flush_raw(&self) -> Result<usize, DC::Error> {
697        let update_keys = {
698            let updates = self.updates();
699            updates.keys().cloned().collect::<Vec<_>>()
700        };
701        self.flush_many_raw(update_keys).await
702    }
703
704    /// Immediately flushes all updates in the pool but will do nothing and immediately error out if the background task
705    /// has encountered an [error](#method.error).
706    #[instrument(level = "trace")]
707    pub async fn flush(&self) -> Result<usize, Arc<DC::Error>> {
708        check_error!(self);
709        self.flush_raw().await.map_err(Arc::new)
710    }
711
712    /// Initiates a flush via the background task.
713    ///
714    /// This method does not wait for the flush to complete; it returns immediately.
715    /// If the background task has previously encountered an [error](#method.error),
716    /// this method will do nothing and return that error.
717    #[instrument(level = "trace")]
718    pub async fn soft_flush(&self) -> Result<(), Arc<DC::Error>> {
719        check_error!(self);
720        self.flush_notifier().notify_waiters();
721        Ok(())
722    }
723
724    /// Flushes a single key in the cache. Although the operation itself is inefficient from the caching perspective, it
725    /// is useful in a multi-cache environment where values in one backend refer to values in another.  For example,
726    /// when there is a foreign key relationship between two tables in a database, a dependent record cannot be written
727    /// into the backend until its dependency is written first.
728    ///
729    /// This method is most useful when invoked by an [observer](crate#observers).
730    ///
731    /// Does nothing and returns an error if the background task has encountered an [error](#method.error).
732    #[instrument(level = "trace")]
733    pub async fn flush_one(&self, key: &DC::Key) -> Result<usize, Arc<DC::Error>> {
734        check_error!(self);
735        // Use a local variable to avoid holding the lock on the updates pool for too long.
736        let update = self.updates().get(key).cloned();
737        if let Some(update) = update {
738            // At this point we have two options if the update is not empty:
739            // 1. There is no lock on the update data. This either means that no processing is currently being done by
740            //    the background task, or the update is queued for processing but the data controller hasn't gotten to
741            //    it yet. In the latter case, the race could still be won by this call. The update iterator
742            //    implementation will simply skip this record when it gets to it.
743            // 2. There is a lock on the data. In this case, we must wait until it is released before proceeding
744            //    further. When the lock is released, the update is likely to be empty. If it is not empty, then there
745            //    was an error in the data controller. We can give it another try, and if it fails again, report the
746            //    error back to the caller.
747
748            let guard = update.data.clone().write_owned().await;
749
750            // If the update is empty, we can safely skip it.
751            if let Some(update_data) = guard.as_ref() {
752                wbc_event!(self, on_flush_one(key, update_data)?);
753
754                debug!("flush single key: {key:?}");
755
756                let update_iter = child_build!(
757                    self, UpdateIterator<DC> {
758                        key_guard: (key.clone(), guard),
759                    }
760                )
761                .expect("Internal error: UpdateIterator builder failure");
762
763                self.data_controller().write_back(update_iter.clone()).await?;
764                return Ok(self._purify_updates(update_iter));
765            }
766        }
767        Ok(0)
768    }
769
770    #[instrument(level = "trace")]
771    async fn monitor_updates(&self) {
772        let flush_interval = self.flush_interval();
773        let mut ticking_interval = interval(Duration::from_millis(self.monitor_tick_duration()));
774        ticking_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
775        let max_updates = self.max_updates() as usize;
776        // Obtain the exclusive lock on the closed flag. This will be released when the task is finished. This
777        // behavior allows the `close` method to wait for the task to finish before returning.
778        let mut closed_guard = self.write_closed().await;
779
780        loop {
781            if *closed_guard {
782                break;
783            }
784
785            let mut forced = false;
786            let flush_notifier = self.flush_notifier();
787            let cleanup_notifier = self.cleanup_notifier();
788            tokio::select! {
789                _ = flush_notifier.notified() => {
790                    // The flush was requested manually. Reset the timer.
791                    forced = true;
792                }
793                _ = cleanup_notifier.notified() => {
794                    // The cleanup was requested manually. Reset the timer.
795                    forced = true;
796                }
797                _ = ticking_interval.tick() => {
798                    // Do nothing, just wait for the next tick.
799                }
800            }
801
802            *closed_guard = self.is_shut_down();
803
804            if self.updates().is_empty() {
805                // Don't consume resources if no updates have been produced.
806                continue;
807            }
808
809            if forced
810                || *closed_guard
811                // When the updates pool size exceeded the limit
812                || self.updates().len() > max_updates
813                // When last flush took place earlier than the flush interval ago...
814                || self.last_flush().elapsed() >= flush_interval
815            {
816                if let Err(error) = self.flush().await {
817                    self.set_error(error.clone());
818                    wbc_event!(self, on_monitor_error(&error));
819                }
820            }
821        }
822    }
823
824    #[instrument(level = "trace", skip(self))]
825    async fn check_task(&self) {
826        let mut task_guard = self.write_monitor_task();
827        if task_guard.as_ref().is_none_or(|t| t.is_finished()) {
828            let async_self = self.myself().unwrap();
829            *task_guard = Some(tokio::spawn(async move { async_self.monitor_updates().await }));
830        }
831    }
832
833    async fn _on_dc_op(
834        &self,
835        op: DataControllerOp,
836        key: &DC::Key,
837        value: Option<Arc<DC::Value>>,
838    ) -> Result<Op<ValueState<DC::Key, DC::Value>>, DC::Error> {
839        Ok(match op {
840            DataControllerOp::Nop => Op::Nop,
841            DataControllerOp::Insert => {
842                if let Some(value) = value {
843                    Op::Put(ValueState::Primary(value.as_ref().clone()))
844                }
845                else {
846                    Op::Nop
847                }
848            }
849            DataControllerOp::Revoke => Op::Remove,
850            DataControllerOp::Drop => {
851                self.updates_mut().remove(key);
852                Op::Remove
853            }
854        })
855    }
856
857    #[instrument(level = "trace")]
858    #[allow(clippy::type_complexity)]
859    pub(crate) async fn on_new(
860        &self,
861        key: &DC::Key,
862        value: Arc<DC::Value>,
863    ) -> Result<Op<ValueState<DC::Key, DC::Value>>, DC::Error> {
864        self.get_update_state_and_compute(key, Some(value), |key, value, update_state| async move {
865            update_state.on_new(key, value.as_ref().unwrap()).await
866        })
867        .await
868    }
869
870    #[instrument(level = "trace")]
871    #[allow(clippy::type_complexity)]
872    pub(crate) async fn on_change(
873        &self,
874        key: &DC::Key,
875        value: Arc<DC::Value>,
876        old_val: DC::Value,
877    ) -> Result<Op<ValueState<DC::Key, DC::Value>>, DC::Error> {
878        self.get_update_state_and_compute(key, Some(value), |key, value, update_state| async move {
879            update_state.on_change(key, value.unwrap(), old_val).await
880        })
881        .await
882    }
883
884    #[instrument(level = "trace")]
885    pub(crate) async fn on_access<'a>(&self, key: &DC::Key, value: Arc<DC::Value>) -> Result<(), DC::Error> {
886        self.get_update_state_and_compute(key, Some(value), |key, value, update_state| async move {
887            update_state.on_access(key, value.unwrap()).await
888        })
889        .await?;
890        Ok(())
891    }
892
893    #[instrument(level = "trace")]
894    #[allow(clippy::type_complexity)]
895    pub(crate) async fn on_delete(
896        &self,
897        key: &DC::Key,
898        value: Option<Arc<DC::Value>>,
899    ) -> Result<Op<ValueState<DC::Key, DC::Value>>, DC::Error> {
900        self.get_update_state_and_compute(key, value, |key, _value, update_state| async move {
901            update_state.on_delete(key).await
902        })
903        .await
904    }
905
906    // If a key is in the updates but not in the cache, it means that to obtain its valid value, we need to flush it
907    // first.
908    #[instrument(level = "trace")]
909    pub(crate) async fn maybe_flush_one(&self, key: &DC::Key) -> Result<usize, Arc<DC::Error>> {
910        if self.cache().contains_key(key) {
911            return Ok(0);
912        }
913        debug!("DO flush_one(key: {key:?})");
914        return self.flush_one(key).await;
915    }
916
917    /// Prepares the cache for shutdown. This method will notify the background task to stop and flush all updates.
918    /// It will block until the task is finished.
919    ///
920    /// The method is mandatory to be called before the cache is dropped to ensure that no data is lost.
921    ///
922    /// Will do nothing and return an error if the background task has encountered an [error](#method.error).
923    #[instrument(level = "trace")]
924    pub async fn close(&self) -> Result<(), Arc<DC::Error>> {
925        check_error!(self);
926
927        if self.is_shut_down() {
928            return Ok(());
929        }
930
931        self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
932        self.cleanup_notifier().notify_waiters();
933
934        // This will block until the monitor task is finished.
935        let _ = self.write_closed().await;
936
937        Ok(())
938    }
939
940    /// Returns the shutdown status of the cache.
941    pub fn is_shut_down(&self) -> bool {
942        self.shutdown.load(std::sync::atomic::Ordering::SeqCst)
943    }
944}
945
946impl<DC> Debug for Cache<DC>
947where
948    DC: DataController,
949{
950    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
951        f.debug_struct("Cache")
952            .field("name", &self.name())
953            .field("updates_count", &self.updates().len())
954            .field("cache_entries", &self.cache().entry_count())
955            .finish()
956    }
957}
958
959impl<DC> CacheBuilder<DC>
960where
961    DC: DataController,
962{
963    /// Adds an [observer](crate#observers) to the cache.
964    pub fn observer(mut self, observer: impl Observer<DC>) -> Self {
965        if let Some(ref mut observers) = self.observers {
966            observers.push(Box::new(observer));
967            self
968        }
969        else {
970            self._observers(vec![Box::new(observer)])
971        }
972    }
973}