1use crate::entry_selector::EntryKeySelector;
2use crate::prelude::*;
3use crate::traits::Observer;
4use crate::update_iterator::UpdateIterator;
5use crate::update_state::UpdateState;
6
7use fieldx_plus::child_build;
8use fieldx_plus::fx_plus;
9use moka::future::Cache as MokaCache;
10use moka::ops::compute::CompResult as MokaCompResult;
11use moka::policy::EvictionPolicy;
12use std::collections::HashMap;
13use std::fmt::Debug;
14use std::fmt::Display;
15use std::future::Future;
16use std::hash::Hash;
17use std::sync::atomic::AtomicBool;
18use std::sync::Arc;
19use std::time::Duration;
20use std::time::Instant;
21use tokio::task::JoinHandle;
22use tokio::time::interval;
23use tracing::debug;
24use tracing::instrument;
25
26pub use moka::ops::compute::Op;
27
28macro_rules! wbc_event {
29 ($self:ident, $method:ident($($args:tt)*) $( $post:tt )* ) => {
30 {
31 let observers = $self.observers().await;
32 if !observers.is_empty() {
33 for observer in observers.iter() {
34 observer.$method($($args)*).await $( $post )*;
35 }
36 }
37 }
38 };
39}
40
41macro_rules! check_error {
42 ($self:expr) => {
43 if let Some(err) = $self.clear_error() {
44 return Err(err);
45 }
46 };
47}
48
49#[derive(Clone, Debug)]
52pub(crate) enum ValueState<K, V>
53where
54 K: Debug + Hash + Clone + Eq + Sized + Send + Sync + 'static,
55 V: Debug + Clone + Send + Sync + 'static,
56{
57 Primary(V),
58 Secondary(K),
59}
60
61impl<K, V> ValueState<K, V>
62where
63 K: Debug + Display + Hash + Clone + Eq + Sized + Send + Sync + 'static,
64 V: Debug + Clone + Send + Sync + 'static,
65{
66 pub(crate) fn into_value(self) -> V {
67 match self {
68 Self::Primary(v) => v,
69 Self::Secondary(_) => panic!("secondary doesn't have a value"),
70 }
71 }
72}
73
74type ArcCache<DC> =
75 Arc<MokaCache<<DC as DataController>::Key, ValueState<<DC as DataController>::Key, <DC as DataController>::Value>>>;
76type UpdatesHash<DC> = HashMap<<DC as DataController>::Key, Arc<UpdateState<DC>>>;
77
78#[fx_plus(
104 parent,
105 new(off),
106 default(off),
108 sync,
109 builder(
110 post_build(initial_setup),
111 doc("Builder object of [`Cache`].", "", "See [`Cache::builder()`] method."),
112 method_doc("Implement builder pattern for [`Cache`]."),
113 )
114)]
115pub struct Cache<DC>
116where
117 DC: DataController,
118 DC::Key: Send + Sync + 'static,
119 DC::Error: Send + Sync + 'static,
120{
121 #[fieldx(
124 lock,
125 clearer(doc("Clears and returns the last error or `None`.")),
126 get(clone),
127 set(private),
128 builder(off)
129 )]
130 error: Arc<DC::Error>,
131
132 #[fieldx(vis(pub), builder(vis(pub), required, into), get(clone))]
134 data_controller: Arc<DC>,
135
136 #[fieldx(lock, private, optional, clearer, get(off), builder(vis(pub), doc("Cache name.")))]
139 name: &'static str,
140
141 #[fieldx(get(copy), default(100))]
146 max_updates: u64,
147
148 #[fieldx(get(copy), default(10_000))]
153 max_capacity: u64,
154
155 #[fieldx(get(copy), set(doc("Change the flush interval.")), default(Duration::from_secs(10)))]
157 flush_interval: Duration,
158
159 #[fieldx(vis(pub(crate)), set(private), builder(off))]
160 cache: Option<ArcCache<DC>>,
161
162 #[fieldx(lock, vis(pub(crate)), set(private), get, get_mut, builder(off))]
164 updates: UpdatesHash<DC>,
165
166 #[fieldx(private, clearer, writer, builder(off))]
167 monitor_task: JoinHandle<()>,
168
169 #[fieldx(get(copy), default(10))]
171 monitor_tick_duration: u64,
172
173 #[fieldx(private, get(clone), default(Arc::new(tokio::sync::Notify::new())))]
174 flush_notifier: Arc<tokio::sync::Notify>,
175
176 #[fieldx(private, get(clone), default(Arc::new(tokio::sync::Notify::new())))]
177 cleanup_notifier: Arc<tokio::sync::Notify>,
178
179 #[fieldx(lock, private, get(copy), set, builder(off), default(Instant::now()))]
180 last_flush: Instant,
181
182 #[fieldx(mode(async), private, lock, get, builder("_observers", private), default)]
183 observers: Vec<Box<dyn Observer<DC>>>,
184
185 #[fieldx(mode(async), private, writer, set, get(copy), builder(off), default)]
186 closed: bool,
187
188 #[fieldx(builder(off), default(false.into()))]
189 shutdown: AtomicBool,
190}
191
192impl<DC> Cache<DC>
193where
194 DC: DataController,
195{
196 fn initial_setup(mut self) -> Self {
197 self.closed = false.into();
198 self.set_updates(HashMap::with_capacity(self.max_updates() as usize));
199 self.set_cache(Some(Arc::new(
200 MokaCache::builder()
201 .max_capacity(self.max_capacity())
202 .name(self.clear_name().unwrap_or_else(|| std::any::type_name::<DC::Value>()))
203 .eviction_policy(EvictionPolicy::tiny_lfu())
204 .build(),
205 )));
206
207 self
208 }
209
210 fn cache(&self) -> ArcCache<DC> {
211 Arc::clone(self.cache.as_ref().expect("Internal error: cache not initialized"))
212 }
213
214 #[instrument(level = "trace", skip(self))]
215 async fn get_primary_key_from(&self, key: &DC::Key) -> Result<Option<DC::Key>, DC::Error> {
216 Ok(if let Some(v) = self.cache().get(key).await {
217 Some(match v {
218 ValueState::Primary(_) => key.clone(),
219 ValueState::Secondary(ref k) => k.clone(),
220 })
221 }
222 else {
223 self.data_controller().get_primary_key_for(key).await?
224 })
225 }
226
227 #[instrument(level = "trace", skip(self, f))]
229 pub(crate) async fn get_and_try_compute_with_primary<F, Fut>(
230 &self,
231 key: &DC::Key,
232 f: F,
233 ) -> Result<CompResult<DC>, Arc<DC::Error>>
234 where
235 F: FnOnce(Option<Entry<DC>>) -> Fut,
236 Fut: Future<Output = Result<Op<DC::Value>, DC::Error>>,
237 {
238 let myself = self.myself().unwrap();
239
240 self.maybe_flush_one(key).await?;
241 debug!("[{}] get_and_try_compute_with_primary(key: {key:?})", myself.name());
242
243 let result = self
244 .cache()
245 .entry(key.clone())
246 .and_try_compute_with(|entry| async {
247 let (wb_entry, old_v, fetched) = if let Some(entry) = entry {
248 let old_v = entry.into_value().into_value();
249 (
250 Some(Entry::new(&myself, key.clone(), old_v.clone())),
251 Some(old_v),
252 false,
253 )
254 }
255 else {
256 let old_v = myself.data_controller().get_for_key(key).await?;
257 old_v.map_or((None, None, false), |v| {
258 (Some(Entry::new(&myself, key.clone(), v.clone())), Some(v), true)
259 })
260 };
261
262 let op = f(wb_entry).await?;
263
264 let op = match op {
265 Op::Remove => {
266 if let Some(ref old_v) = old_v {
267 let secondaries = myself.data_controller().secondary_keys_of(old_v);
268 for skey in secondaries {
270 myself.cache().invalidate(&skey).await;
271 }
272 }
273
274 myself.on_delete(key, old_v.map(Arc::new)).await?
275 }
276 Op::Put(v) => {
277 let v = Arc::new(v);
278 if let Some(old_v) = old_v {
279 myself.on_change(key, v, old_v).await?
280 }
281 else {
282 myself.on_new(key, v).await?
283 }
284 }
285 Op::Nop => {
286 if fetched {
288 Op::Put(ValueState::Primary(old_v.unwrap()))
289 }
290 else {
291 Op::Nop
292 }
293 }
294 };
295
296 Result::<Op<ValueState<DC::Key, DC::Value>>, Arc<DC::Error>>::Ok(op)
297 })
298 .await?;
299
300 Ok(match result {
301 MokaCompResult::Inserted(e) => CompResult::Inserted(Entry::from_primary_entry(self, e)),
302 MokaCompResult::Removed(e) => CompResult::Removed(Entry::from_primary_entry(self, e)),
303 MokaCompResult::ReplacedWith(e) => CompResult::ReplacedWith(Entry::from_primary_entry(self, e)),
304 MokaCompResult::Unchanged(e) => CompResult::Unchanged(Entry::from_primary_entry(self, e)),
305 MokaCompResult::StillNone(k) => CompResult::StillNone(k),
306 })
307 }
308
309 #[instrument(level = "trace", skip(self, f))]
312 pub(crate) async fn get_and_try_compute_with_secondary<F, Fut>(
313 &self,
314 key: &DC::Key,
315 f: F,
316 ) -> Result<CompResult<DC>, Arc<DC::Error>>
317 where
318 F: FnOnce(Option<Entry<DC>>) -> Fut,
319 Fut: Future<Output = Result<Op<DC::Value>, DC::Error>>,
320 {
321 let myself = self.myself().unwrap();
322 let mut primary_value = None;
323
324 let result = self
325 .cache()
326 .entry(key.clone())
327 .and_try_compute_with(|entry| async {
328 let primary_key = if let Some(entry) = entry {
329 Some(match entry.into_value() {
330 ValueState::Secondary(k) => k,
331 _ => panic!(
332 "Key '{key}' is submitted as a secondary but the corresponding cache entry is primary"
333 ),
334 })
335 }
336 else {
337 self.get_primary_key_from(key).await?
338 };
339
340 let secondary_key = key.clone();
341
342 let result = if let Some(ref pkey) = primary_key {
343 self.get_and_try_compute_with_primary(pkey, |entry| async move {
344 let secondary_entry = if let Some(entry) = entry {
345 Some(Entry::new(&myself, secondary_key, entry.value().await.unwrap().clone()))
348 }
349 else {
350 None
351 };
352 f(secondary_entry).await
353 })
354 .await?
355 }
356 else {
357 match f(None).await? {
361 Op::Nop | Op::Remove => CompResult::StillNone(Arc::new(secondary_key)),
362 Op::Put(new_value) => {
363 let pkey = myself.data_controller().primary_key_of(&new_value);
364 self.get_and_try_compute_with_primary(&pkey, |_| async { Ok(Op::Put(new_value)) })
365 .await?
366 }
367 }
368 };
369
370 let op = match result {
371 CompResult::Inserted(v) | CompResult::ReplacedWith(v) | CompResult::Unchanged(v) => {
372 primary_value = Some(v.into_value());
373 Op::Put(ValueState::Secondary(primary_key.unwrap()))
374 }
375 CompResult::Removed(e) => {
376 primary_value = Some(e.into_value());
377 Op::Remove
378 }
379 CompResult::StillNone(_) => Op::Nop,
380 };
381
382 Result::<Op<ValueState<DC::Key, DC::Value>>, Arc<DC::Error>>::Ok(op)
383 })
384 .await?;
385
386 Ok(match result {
387 MokaCompResult::Inserted(_) => CompResult::Inserted(Entry::new(self, key.clone(), primary_value.unwrap())),
388 MokaCompResult::ReplacedWith(_) => {
389 CompResult::ReplacedWith(Entry::new(self, key.clone(), primary_value.unwrap()))
390 }
391 MokaCompResult::Unchanged(_) => {
392 CompResult::Unchanged(Entry::new(self, key.clone(), primary_value.unwrap()))
393 }
394 MokaCompResult::Removed(_) => CompResult::Removed(Entry::new(self, key.clone(), primary_value.unwrap())),
395 MokaCompResult::StillNone(k) => CompResult::StillNone(k),
396 })
397 }
398
399 #[instrument(level = "trace", skip(self, init))]
400 pub(crate) async fn get_or_try_insert_with_primary(
401 &self,
402 key: &DC::Key,
403 init: impl Future<Output = Result<DC::Value, DC::Error>>,
404 ) -> Result<Entry<DC>, Arc<DC::Error>> {
405 debug!("get_or_try_insert_with_primary(key: {key:?})");
406
407 self.maybe_flush_one(key).await?;
408
409 let cache_entry = self
410 .cache()
411 .entry(key.clone())
412 .or_try_insert_with(async {
413 Ok(ValueState::Primary(
414 if let Some(v) = self.data_controller().get_for_key(key).await? {
415 v
416 }
417 else {
418 let new_value = init.await?;
419 self.on_new(key, Arc::new(new_value.clone())).await?;
420 new_value
421 },
422 ))
423 })
424 .await?;
425 Ok(Entry::new(self, key.clone(), cache_entry.into_value().into_value()))
426 }
427
428 #[instrument(level = "trace", skip(self, init))]
429 pub(crate) async fn get_or_try_insert_with_secondary(
430 &self,
431 key: &DC::Key,
432 init: impl Future<Output = Result<DC::Value, DC::Error>>,
433 ) -> Result<Entry<DC>, Arc<DC::Error>> {
434 let myself = self.myself().unwrap();
435 let result = self
436 .cache()
437 .entry(key.clone())
438 .and_try_compute_with(|entry| async {
439 let primary_key = if let Some(entry) = entry {
440 let ValueState::Secondary(pkey) = entry.value()
441 else {
442 panic!("Not a secondary key: '{key}'")
443 };
444 self.get_or_try_insert_with_primary(pkey, init).await?;
445 pkey.clone()
446 }
447 else if let Some(pkey) = myself.data_controller().get_primary_key_for(key).await? {
448 self.get_or_try_insert_with_primary(&pkey, init).await?;
449 pkey
450 }
451 else {
452 let new_value = init.await?;
453 let pkey = self.data_controller().primary_key_of(&new_value);
454 self.insert(new_value).await?;
455 pkey
456 };
457
458 Result::<_, Arc<DC::Error>>::Ok(Op::Put(ValueState::Secondary(primary_key)))
459 })
460 .await?;
461
462 Ok(match result {
463 MokaCompResult::Inserted(e) | MokaCompResult::Unchanged(e) | MokaCompResult::ReplacedWith(e) => {
464 Entry::new(self, key.clone(), e.into_value().into_value())
465 }
466 _ => panic!("Unexpected outcome of get_or_try_insert_with_secondary: {result:?}"),
467 })
468 }
469
470 #[instrument(level = "trace", skip(self, f))]
473 #[allow(clippy::type_complexity)]
474 pub(crate) async fn get_update_state_and_compute<'a, F, Fut>(
475 &self,
476 key: &'a DC::Key,
477 value: Option<Arc<DC::Value>>,
478 f: F,
479 ) -> Result<Op<ValueState<DC::Key, DC::Value>>, DC::Error>
480 where
481 F: FnOnce(&'a DC::Key, Option<Arc<DC::Value>>, Arc<UpdateState<DC>>) -> Fut,
482 Fut: Future<Output = Result<DataControllerOp, DC::Error>>,
483 {
484 let update_state;
485 self.check_task().await;
486 {
487 let mut updates = self.updates_mut();
488
489 let count = updates.len();
490 let current_capacity = updates.capacity();
491 if current_capacity <= (count - (count / 10)) {
492 updates.reserve(current_capacity.max(self.max_updates() as usize) / 2);
493 }
494
495 let update_key = value
496 .as_ref()
497 .map(|v| self.data_controller().primary_key_of(v))
498 .unwrap_or(key.clone());
499
500 if !updates.contains_key(&update_key) {
501 update_state = child_build!(self, UpdateState<DC>).unwrap();
502 updates.insert(key.clone(), Arc::clone(&update_state));
503 }
504 else {
505 update_state = updates.get(&update_key).unwrap().clone();
506 }
507 };
510
511 let op = f(key, value.as_ref().map(Arc::clone), Arc::clone(&update_state)).await?;
512
513 self.updates_mut().insert(key.clone(), update_state.clone());
516
517 self._on_dc_op(op, key, value).await
518 }
519
520 #[allow(dead_code)]
522 #[inline]
523 pub fn name(&self) -> String {
524 self.cache().name().unwrap_or("<anon>").to_string()
525 }
526
527 #[instrument(level = "trace")]
529 pub async fn entry(&self, key: DC::Key) -> Result<EntryKeySelector<DC>, Arc<DC::Error>> {
530 check_error!(self);
531 Ok(if let Some(pkey) = self.get_primary_key_from(&key).await? {
532 child_build!(
533 self,
534 EntryKeySelector<DC> {
535 primary: pkey == key,
536 key: key,
537 primary_key: pkey,
538 }
539 )
540 }
541 else {
542 child_build!(
543 self,
544 EntryKeySelector<DC> {
545 primary: self.data_controller().is_primary(&key),
546 key: key,
547 }
548 )
549 }
550 .unwrap())
551 }
552
553 #[instrument(level = "trace")]
556 pub async fn get(&self, key: &DC::Key) -> Result<Option<DC::Value>, Arc<DC::Error>> {
557 check_error!(self);
558 let outcome = self
559 .entry(key.clone())
560 .await?
561 .and_try_compute_with(|_| async { Ok(Op::Nop) })
562 .await?;
563
564 Ok(match outcome {
565 CompResult::Inserted(entry) | CompResult::Unchanged(entry) | CompResult::ReplacedWith(entry) => {
566 let value = entry.into_value();
567 self.on_access(key, Arc::new(value.clone())).await?;
568 Some(value)
569 }
570 CompResult::StillNone(_) => None,
571 _ => None,
572 })
573 }
574
575 #[instrument(level = "trace")]
578 pub async fn insert(&self, value: DC::Value) -> Result<Option<DC::Value>, Arc<DC::Error>> {
579 check_error!(self);
580 let key = self.data_controller().primary_key_of(&value);
581 let res = self
582 .cache()
583 .entry(key.clone())
584 .and_try_compute_with(|_| async {
585 let op = self.on_new(&key, Arc::new(value)).await;
586 op
587 })
588 .await?;
589
590 match res {
591 MokaCompResult::Inserted(entry)
592 | MokaCompResult::ReplacedWith(entry)
593 | MokaCompResult::Unchanged(entry) => {
594 let value = entry.into_value().into_value();
595 Ok(Some(value))
596 }
597 MokaCompResult::StillNone(_) => Ok(None),
598 _ => panic!("Impossible result of insert operation: {res:?}"),
599 }
600 }
601
602 #[instrument(level = "trace")]
607 pub async fn delete(&self, key: &DC::Key) -> Result<Option<DC::Value>, Arc<DC::Error>> {
608 check_error!(self);
609 let result = self
611 .entry(key.clone())
612 .await?
613 .and_try_compute_with(|entry| async move { Ok(if entry.is_some() { Op::Remove } else { Op::Nop }) })
614 .await?;
615
616 Ok(match result {
617 CompResult::Removed(entry) => Some(entry.into_value()),
618 CompResult::StillNone(_) => None,
619 _ => panic!("Impossible result of delete operation: {result:?}"),
620 })
621 }
622
623 #[instrument(level = "trace")]
626 pub async fn invalidate(&self, key: &DC::Key) -> Result<(), Arc<DC::Error>> {
627 check_error!(self);
628 let myself = self.myself().unwrap();
629 self.cache()
630 .entry(key.clone())
631 .and_compute_with(|entry| async {
632 if let Some(entry) = entry {
633 if let ValueState::Primary(value) = entry.value() {
634 for secondary in myself.data_controller().secondary_keys_of(value) {
635 myself.cache().invalidate(&secondary).await;
636 }
637 }
638 }
639 Op::Remove
640 })
641 .await;
642 Ok(())
643 }
644
645 fn _purify_updates(&self, update_iter: Arc<UpdateIterator<DC>>) -> usize {
647 let mut updates = self.updates_mut();
650 let count = 0;
651 for (key, guard) in update_iter.worked_mut().iter() {
652 let update = updates.get_mut(key).unwrap();
653 if Arc::strong_count(update) > 1 {
654 continue;
656 }
657
658 if guard.is_none() {
660 updates.remove(key);
661 }
662 }
663 count
668 }
669
670 pub async fn flush_many_raw(&self, keys: Vec<DC::Key>) -> Result<usize, DC::Error> {
673 let update_iter = child_build!(
674 self, UpdateIterator<DC> {
675 keys: keys
676 }
677 )
678 .expect("Internal error: UpdateIterator builder failure");
679
680 wbc_event!(self, on_flush(update_iter.clone())?);
681
682 update_iter.reset();
685
686 self.data_controller().write_back(update_iter.clone()).await?;
687 let updates_count = self._purify_updates(update_iter);
688 self.set_last_flush(Instant::now());
689
690 Ok(updates_count)
691 }
692
693 #[instrument(level = "trace")]
696 pub async fn flush_raw(&self) -> Result<usize, DC::Error> {
697 let update_keys = {
698 let updates = self.updates();
699 updates.keys().cloned().collect::<Vec<_>>()
700 };
701 self.flush_many_raw(update_keys).await
702 }
703
704 #[instrument(level = "trace")]
707 pub async fn flush(&self) -> Result<usize, Arc<DC::Error>> {
708 check_error!(self);
709 self.flush_raw().await.map_err(Arc::new)
710 }
711
712 #[instrument(level = "trace")]
718 pub async fn soft_flush(&self) -> Result<(), Arc<DC::Error>> {
719 check_error!(self);
720 self.flush_notifier().notify_waiters();
721 Ok(())
722 }
723
724 #[instrument(level = "trace")]
733 pub async fn flush_one(&self, key: &DC::Key) -> Result<usize, Arc<DC::Error>> {
734 check_error!(self);
735 let update = self.updates().get(key).cloned();
737 if let Some(update) = update {
738 let guard = update.data.clone().write_owned().await;
749
750 if let Some(update_data) = guard.as_ref() {
752 wbc_event!(self, on_flush_one(key, update_data)?);
753
754 debug!("flush single key: {key:?}");
755
756 let update_iter = child_build!(
757 self, UpdateIterator<DC> {
758 key_guard: (key.clone(), guard),
759 }
760 )
761 .expect("Internal error: UpdateIterator builder failure");
762
763 self.data_controller().write_back(update_iter.clone()).await?;
764 return Ok(self._purify_updates(update_iter));
765 }
766 }
767 Ok(0)
768 }
769
770 #[instrument(level = "trace")]
771 async fn monitor_updates(&self) {
772 let flush_interval = self.flush_interval();
773 let mut ticking_interval = interval(Duration::from_millis(self.monitor_tick_duration()));
774 ticking_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
775 let max_updates = self.max_updates() as usize;
776 let mut closed_guard = self.write_closed().await;
779
780 loop {
781 if *closed_guard {
782 break;
783 }
784
785 let mut forced = false;
786 let flush_notifier = self.flush_notifier();
787 let cleanup_notifier = self.cleanup_notifier();
788 tokio::select! {
789 _ = flush_notifier.notified() => {
790 forced = true;
792 }
793 _ = cleanup_notifier.notified() => {
794 forced = true;
796 }
797 _ = ticking_interval.tick() => {
798 }
800 }
801
802 *closed_guard = self.is_shut_down();
803
804 if self.updates().is_empty() {
805 continue;
807 }
808
809 if forced
810 || *closed_guard
811 || self.updates().len() > max_updates
813 || self.last_flush().elapsed() >= flush_interval
815 {
816 if let Err(error) = self.flush().await {
817 self.set_error(error.clone());
818 wbc_event!(self, on_monitor_error(&error));
819 }
820 }
821 }
822 }
823
824 #[instrument(level = "trace", skip(self))]
825 async fn check_task(&self) {
826 let mut task_guard = self.write_monitor_task();
827 if task_guard.as_ref().is_none_or(|t| t.is_finished()) {
828 let async_self = self.myself().unwrap();
829 *task_guard = Some(tokio::spawn(async move { async_self.monitor_updates().await }));
830 }
831 }
832
833 async fn _on_dc_op(
834 &self,
835 op: DataControllerOp,
836 key: &DC::Key,
837 value: Option<Arc<DC::Value>>,
838 ) -> Result<Op<ValueState<DC::Key, DC::Value>>, DC::Error> {
839 Ok(match op {
840 DataControllerOp::Nop => Op::Nop,
841 DataControllerOp::Insert => {
842 if let Some(value) = value {
843 Op::Put(ValueState::Primary(value.as_ref().clone()))
844 }
845 else {
846 Op::Nop
847 }
848 }
849 DataControllerOp::Revoke => Op::Remove,
850 DataControllerOp::Drop => {
851 self.updates_mut().remove(key);
852 Op::Remove
853 }
854 })
855 }
856
857 #[instrument(level = "trace")]
858 #[allow(clippy::type_complexity)]
859 pub(crate) async fn on_new(
860 &self,
861 key: &DC::Key,
862 value: Arc<DC::Value>,
863 ) -> Result<Op<ValueState<DC::Key, DC::Value>>, DC::Error> {
864 self.get_update_state_and_compute(key, Some(value), |key, value, update_state| async move {
865 update_state.on_new(key, value.as_ref().unwrap()).await
866 })
867 .await
868 }
869
870 #[instrument(level = "trace")]
871 #[allow(clippy::type_complexity)]
872 pub(crate) async fn on_change(
873 &self,
874 key: &DC::Key,
875 value: Arc<DC::Value>,
876 old_val: DC::Value,
877 ) -> Result<Op<ValueState<DC::Key, DC::Value>>, DC::Error> {
878 self.get_update_state_and_compute(key, Some(value), |key, value, update_state| async move {
879 update_state.on_change(key, value.unwrap(), old_val).await
880 })
881 .await
882 }
883
884 #[instrument(level = "trace")]
885 pub(crate) async fn on_access<'a>(&self, key: &DC::Key, value: Arc<DC::Value>) -> Result<(), DC::Error> {
886 self.get_update_state_and_compute(key, Some(value), |key, value, update_state| async move {
887 update_state.on_access(key, value.unwrap()).await
888 })
889 .await?;
890 Ok(())
891 }
892
893 #[instrument(level = "trace")]
894 #[allow(clippy::type_complexity)]
895 pub(crate) async fn on_delete(
896 &self,
897 key: &DC::Key,
898 value: Option<Arc<DC::Value>>,
899 ) -> Result<Op<ValueState<DC::Key, DC::Value>>, DC::Error> {
900 self.get_update_state_and_compute(key, value, |key, _value, update_state| async move {
901 update_state.on_delete(key).await
902 })
903 .await
904 }
905
906 #[instrument(level = "trace")]
909 pub(crate) async fn maybe_flush_one(&self, key: &DC::Key) -> Result<usize, Arc<DC::Error>> {
910 if self.cache().contains_key(key) {
911 return Ok(0);
912 }
913 debug!("DO flush_one(key: {key:?})");
914 return self.flush_one(key).await;
915 }
916
917 #[instrument(level = "trace")]
924 pub async fn close(&self) -> Result<(), Arc<DC::Error>> {
925 check_error!(self);
926
927 if self.is_shut_down() {
928 return Ok(());
929 }
930
931 self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
932 self.cleanup_notifier().notify_waiters();
933
934 let _ = self.write_closed().await;
936
937 Ok(())
938 }
939
940 pub fn is_shut_down(&self) -> bool {
942 self.shutdown.load(std::sync::atomic::Ordering::SeqCst)
943 }
944}
945
946impl<DC> Debug for Cache<DC>
947where
948 DC: DataController,
949{
950 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
951 f.debug_struct("Cache")
952 .field("name", &self.name())
953 .field("updates_count", &self.updates().len())
954 .field("cache_entries", &self.cache().entry_count())
955 .finish()
956 }
957}
958
959impl<DC> CacheBuilder<DC>
960where
961 DC: DataController,
962{
963 pub fn observer(mut self, observer: impl Observer<DC>) -> Self {
965 if let Some(ref mut observers) = self.observers {
966 observers.push(Box::new(observer));
967 self
968 }
969 else {
970 self._observers(vec![Box::new(observer)])
971 }
972 }
973}