1mod ll;
14pub mod stats;
16
17use self::ll::{LLNodeRef, LLWeight, LL};
18use self::stats::{ARCacheReadStat, ARCacheWriteStat};
19
20#[cfg(feature = "arcache-is-hashmap")]
21use crate::hashmap::{
22 HashMap as DataMap, HashMapReadTxn as DataMapReadTxn, HashMapWriteTxn as DataMapWriteTxn,
23};
24
25#[cfg(all(feature = "arcache-is-hashtrie", not(feature = "arcache-is-hashmap")))]
26use crate::hashtrie::{
27 HashTrie as DataMap, HashTrieReadTxn as DataMapReadTxn, HashTrieWriteTxn as DataMapWriteTxn,
28};
29
30use crossbeam_queue::ArrayQueue;
31use std::collections::HashMap as Map;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::Arc;
34use std::sync::{Mutex, RwLock};
35
36use std::borrow::Borrow;
37use std::cell::UnsafeCell;
38use std::fmt::Debug;
39use std::hash::Hash;
40use std::mem;
41use std::num::NonZeroUsize;
42use std::ops::Deref;
43use std::ops::DerefMut;
44use std::time::Instant;
45
46use tracing::trace;
47
48const READ_THREAD_CACHE_RATIO: isize = 8;
49const WRITE_THREAD_CACHE_RATIO: isize = 4;
50
51const WRITE_THREAD_CHANNEL_SIZE: usize = 64;
52const READ_THREAD_CHANNEL_SIZE: usize = 64;
53
54const TXN_LOOKBACK_LIMIT_DEFAULT: u8 = 32;
55const TXN_LOOKBACK_LIMIT_ABS_MIN: u8 = 4;
56
57const WATERMARK_DISABLE_MIN: usize = 128;
58
59const WATERMARK_DISABLE_DIVISOR: usize = 20;
60const WATERMARK_DISABLE_RATIO: usize = 18;
61
62const HAUNTED_SIZE: usize = 1;
63
64enum ThreadCacheItem<V> {
65 Present(V, bool, usize),
66 Removed(bool),
67}
68
69struct CacheHitEvent {
70 t: Instant,
71 k_hash: u64,
72}
73
74struct CacheIncludeEvent<K, V> {
75 t: Instant,
76 k: K,
77 v: V,
78 txid: u64,
79 size: usize,
80}
81
82#[derive(Hash, Ord, PartialOrd, Eq, PartialEq, Clone, Debug)]
83struct CacheItemInner<K>
84where
85 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
86{
87 k: K,
88 txid: u64,
89 size: usize,
90}
91
92impl<K> LLWeight for CacheItemInner<K>
93where
94 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
95{
96 #[inline]
97 fn ll_weight(&self) -> usize {
98 self.size
99 }
100}
101
102#[derive(Clone, Debug)]
103enum CacheItem<K, V>
104where
105 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
106{
107 Freq(LLNodeRef<CacheItemInner<K>>, V),
108 Rec(LLNodeRef<CacheItemInner<K>>, V),
109 GhostFreq(LLNodeRef<CacheItemInner<K>>),
110 GhostRec(LLNodeRef<CacheItemInner<K>>),
111 Haunted(LLNodeRef<CacheItemInner<K>>),
112}
113
114unsafe impl<
115 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
116 V: Clone + Debug + Sync + Send + 'static,
117 > Send for CacheItem<K, V>
118{
119}
120unsafe impl<
121 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
122 V: Clone + Debug + Sync + Send + 'static,
123 > Sync for CacheItem<K, V>
124{
125}
126
127#[cfg(test)]
128#[derive(Clone, Debug, PartialEq)]
129pub(crate) enum CacheState {
130 Freq,
131 Rec,
132 GhostFreq,
133 GhostRec,
134 Haunted,
135 None,
136}
137
138#[cfg(test)]
139#[derive(Debug, PartialEq)]
140pub(crate) struct CStat {
141 max: usize,
142 cache: usize,
143 tlocal: usize,
144 freq: usize,
145 rec: usize,
146 ghost_freq: usize,
147 ghost_rec: usize,
148 haunted: usize,
149 p: usize,
150}
151
152struct ArcInner<K, V>
153where
154 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
155 V: Clone + Debug + Sync + Send + 'static,
156{
157 p: usize,
159 freq: LL<CacheItemInner<K>>,
160 rec: LL<CacheItemInner<K>>,
161 ghost_freq: LL<CacheItemInner<K>>,
162 ghost_rec: LL<CacheItemInner<K>>,
163 haunted: LL<CacheItemInner<K>>,
164 hit_queue: Arc<ArrayQueue<CacheHitEvent>>,
165 inc_queue: Arc<ArrayQueue<CacheIncludeEvent<K, V>>>,
166 min_txid: u64,
167}
168
169struct ArcShared<K, V>
170where
171 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
172 V: Clone + Debug + Sync + Send + 'static,
173{
174 max: usize,
176 read_max: usize,
178 hit_queue: Arc<ArrayQueue<CacheHitEvent>>,
179 inc_queue: Arc<ArrayQueue<CacheIncludeEvent<K, V>>>,
180 watermark: usize,
183 reader_quiesce: bool,
185}
186
187pub struct ARCache<K, V>
190where
191 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
192 V: Clone + Debug + Sync + Send + 'static,
193{
194 cache: DataMap<K, CacheItem<K, V>>,
197 shared: RwLock<ArcShared<K, V>>,
200 inner: Mutex<ArcInner<K, V>>,
202 above_watermark: AtomicBool,
204 look_back_limit: u64,
205}
206
207unsafe impl<
208 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
209 V: Clone + Debug + Sync + Send + 'static,
210 > Send for ARCache<K, V>
211{
212}
213unsafe impl<
214 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
215 V: Clone + Debug + Sync + Send + 'static,
216 > Sync for ARCache<K, V>
217{
218}
219
220#[derive(Debug, Clone)]
221struct ReadCacheItem<K, V>
222where
223 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
224 V: Clone + Debug + Sync + Send + 'static,
225{
226 k: K,
227 v: V,
228 size: usize,
229}
230
231impl<K, V> LLWeight for ReadCacheItem<K, V>
232where
233 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
234 V: Clone + Debug + Sync + Send + 'static,
235{
236 #[inline]
237 fn ll_weight(&self) -> usize {
238 self.size
239 }
240}
241
242struct ReadCache<K, V>
243where
244 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
245 V: Clone + Debug + Sync + Send + 'static,
246{
247 set: Map<K, LLNodeRef<ReadCacheItem<K, V>>>,
250 read_size: usize,
251 tlru: LL<ReadCacheItem<K, V>>,
252}
253
254pub struct ARCacheReadTxn<'a, K, V, S>
258where
259 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
260 V: Clone + Debug + Sync + Send + 'static,
261 S: ARCacheReadStat + Clone,
262{
263 caller: &'a ARCache<K, V>,
264 cache: DataMapReadTxn<'a, K, CacheItem<K, V>>,
266 tlocal: Option<ReadCache<K, V>>,
267 hit_queue: Arc<ArrayQueue<CacheHitEvent>>,
268 inc_queue: Arc<ArrayQueue<CacheIncludeEvent<K, V>>>,
269 above_watermark: bool,
270 reader_quiesce: bool,
271 stats: S,
272}
273
274unsafe impl<
275 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
276 V: Clone + Debug + Sync + Send + 'static,
277 S: ARCacheReadStat + Clone + Sync + Send + 'static,
278 > Send for ARCacheReadTxn<'_, K, V, S>
279{
280}
281unsafe impl<
282 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
283 V: Clone + Debug + Sync + Send + 'static,
284 S: ARCacheReadStat + Clone + Sync + Send + 'static,
285 > Sync for ARCacheReadTxn<'_, K, V, S>
286{
287}
288
289pub struct ARCacheWriteTxn<'a, K, V, S>
294where
295 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
296 V: Clone + Debug + Sync + Send + 'static,
297 S: ARCacheWriteStat<K>,
298{
299 caller: &'a ARCache<K, V>,
300 cache: DataMapWriteTxn<'a, K, CacheItem<K, V>>,
302 tlocal: Map<K, ThreadCacheItem<V>>,
305 hit: UnsafeCell<Vec<u64>>,
306 clear: UnsafeCell<bool>,
307 above_watermark: bool,
308 stats: S,
310}
311
312impl<
313 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
314 V: Clone + Debug + Sync + Send + 'static,
315 > CacheItem<K, V>
316{
317 fn to_vref(&self) -> Option<&V> {
318 match &self {
319 CacheItem::Freq(_, v) | CacheItem::Rec(_, v) => Some(v),
320 _ => None,
321 }
322 }
323
324 fn to_kvsref(&self) -> Option<(&K, &V, usize)> {
325 match &self {
326 CacheItem::Freq(lln, v) | CacheItem::Rec(lln, v) => {
327 let cii = lln.as_ref();
328 Some((&cii.k, v, cii.size))
329 }
330 _ => None,
331 }
332 }
333
334 #[cfg(test)]
335 fn to_state(&self) -> CacheState {
336 match &self {
337 CacheItem::Freq(_, _v) => CacheState::Freq,
338 CacheItem::Rec(_, _v) => CacheState::Rec,
339 CacheItem::GhostFreq(_) => CacheState::GhostFreq,
340 CacheItem::GhostRec(_) => CacheState::GhostRec,
341 CacheItem::Haunted(_) => CacheState::Haunted,
342 }
343 }
344}
345
346pub struct ARCacheBuilder {
348 max: Option<usize>,
349 read_max: Option<usize>,
350 watermark: Option<usize>,
351 reader_quiesce: bool,
352 look_back_limit: Option<u8>,
353}
354
355impl Default for ARCacheBuilder {
356 fn default() -> Self {
357 ARCacheBuilder {
358 max: None,
359 read_max: None,
360 watermark: None,
361 reader_quiesce: true,
362 look_back_limit: None,
363 }
364 }
365}
366
367impl ARCacheBuilder {
368 pub fn new() -> Self {
370 Self::default()
371 }
372
373 #[must_use]
389 pub fn set_expected_workload(
390 self,
391 total: usize,
392 threads: usize,
393 ex_ro_miss: usize,
394 ex_rw_miss: usize,
395 read_cache: bool,
396 ) -> Self {
397 let total = isize::try_from(total).unwrap();
398 let threads = isize::try_from(threads).unwrap();
399 let ro_miss = isize::try_from(ex_ro_miss).unwrap();
400 let wr_miss = isize::try_from(ex_rw_miss).unwrap();
401
402 let read_max = if read_cache {
408 let read_max_limit = total / READ_THREAD_CACHE_RATIO;
409 let read_max_thread_limit = read_max_limit / threads;
410 ro_miss.clamp(0, read_max_thread_limit)
411 } else {
412 0
414 };
415
416 let wr_miss_thread_limit = total / WRITE_THREAD_CACHE_RATIO;
419 let wr_miss = wr_miss.clamp(0, wr_miss_thread_limit);
420
421 let max = total - (wr_miss + (read_max * threads));
422
423 let max = usize::try_from(max).unwrap();
425 let read_max = usize::try_from(read_max).unwrap();
426
427 ARCacheBuilder {
428 max: Some(max),
430 read_max: Some(read_max),
431 watermark: self.watermark,
432 reader_quiesce: self.reader_quiesce,
433 look_back_limit: self.look_back_limit,
434 }
435 }
436
437 #[must_use]
443 pub fn set_size(mut self, max: usize, read_max: usize) -> Self {
444 self.max = Some(max);
445 self.read_max = Some(read_max);
446 self
447 }
448
449 #[must_use]
453 pub fn set_watermark(mut self, watermark: usize) -> Self {
454 self.watermark = Some(watermark);
455 self
456 }
457
458 #[must_use]
465 pub fn set_look_back_limit(mut self, look_back_limit: u8) -> Self {
466 self.look_back_limit = Some(look_back_limit);
467 self
468 }
469
470 #[must_use]
475 pub fn set_reader_quiesce(mut self, reader_quiesce: bool) -> Self {
476 self.reader_quiesce = reader_quiesce;
477 self
478 }
479
480 pub fn build<K, V>(self) -> Option<ARCache<K, V>>
483 where
484 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
485 V: Clone + Debug + Sync + Send + 'static,
486 {
487 let ARCacheBuilder {
488 max,
490 read_max,
491 watermark,
492 reader_quiesce,
493 look_back_limit,
494 } = self;
495
496 let (max, read_max) = max.zip(read_max)?;
497
498 let watermark = watermark.unwrap_or(if max < WATERMARK_DISABLE_MIN {
499 0
500 } else {
501 (max / WATERMARK_DISABLE_DIVISOR) * WATERMARK_DISABLE_RATIO
502 });
503 let watermark = watermark.clamp(0, max);
504 let init_watermark = watermark == 0;
506
507 let look_back_limit = look_back_limit
508 .unwrap_or(TXN_LOOKBACK_LIMIT_DEFAULT)
509 .clamp(TXN_LOOKBACK_LIMIT_ABS_MIN, u8::MAX) as u64;
510
511 let chan_size = WRITE_THREAD_CHANNEL_SIZE;
518 let hit_queue = Arc::new(ArrayQueue::new(chan_size));
519
520 let chan_size = READ_THREAD_CHANNEL_SIZE;
523 let inc_queue = Arc::new(ArrayQueue::new(chan_size));
524
525 let shared = RwLock::new(ArcShared {
526 max,
527 read_max,
528 hit_queue: hit_queue.clone(),
530 inc_queue: inc_queue.clone(),
531 watermark,
532 reader_quiesce,
533 });
534 let inner = Mutex::new(ArcInner {
535 p: 0,
537 freq: LL::new(),
538 rec: LL::new(),
539 ghost_freq: LL::new(),
540 ghost_rec: LL::new(),
541 haunted: LL::new(),
542 hit_queue,
544 inc_queue,
545 min_txid: 0,
546 });
547
548 Some(ARCache {
549 cache: DataMap::new(),
550 shared,
551 inner,
552 above_watermark: AtomicBool::new(init_watermark),
554 look_back_limit,
555 })
556 }
557}
558
559impl<
560 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
561 V: Clone + Debug + Sync + Send + 'static,
562 > ARCache<K, V>
563{
564 #[deprecated(since = "0.2.20", note = "please use`ARCacheBuilder` instead")]
566 pub fn new(
567 total: usize,
568 threads: usize,
569 ex_ro_miss: usize,
570 ex_rw_miss: usize,
571 read_cache: bool,
572 ) -> Self {
573 ARCacheBuilder::default()
574 .set_expected_workload(total, threads, ex_ro_miss, ex_rw_miss, read_cache)
575 .build()
576 .expect("Invalid cache parameters!")
577 }
578
579 #[deprecated(since = "0.2.20", note = "please use`ARCacheBuilder` instead")]
581 pub fn new_size(max: usize, read_max: usize) -> Self {
582 ARCacheBuilder::default()
583 .set_size(max, read_max)
584 .build()
585 .expect("Invalid cache parameters!")
586 }
587
588 #[deprecated(since = "0.2.20", note = "please use`ARCacheBuilder` instead")]
590 pub fn new_size_watermark(max: usize, read_max: usize, watermark: usize) -> Self {
591 ARCacheBuilder::default()
592 .set_size(max, read_max)
593 .set_watermark(watermark)
594 .build()
595 .expect("Invalid cache parameters!")
596 }
597
598 pub fn read_stats<S>(&self, stats: S) -> ARCacheReadTxn<'_, K, V, S>
602 where
603 S: ARCacheReadStat + Clone,
604 {
605 let rshared = self.shared.read().unwrap();
606 let tlocal = if rshared.read_max > 0 {
607 Some(ReadCache {
608 set: Map::new(),
609 read_size: rshared.read_max,
610 tlru: LL::new(),
611 })
612 } else {
613 None
614 };
615 let above_watermark = self.above_watermark.load(Ordering::Relaxed);
616 ARCacheReadTxn {
617 caller: self,
618 cache: self.cache.read(),
619 tlocal,
620 hit_queue: rshared.hit_queue.clone(),
622 inc_queue: rshared.inc_queue.clone(),
623 above_watermark,
624 reader_quiesce: rshared.reader_quiesce,
625 stats,
626 }
627 }
628
629 pub fn read(&self) -> ARCacheReadTxn<'_, K, V, ()> {
633 self.read_stats(())
634 }
635
636 pub fn write(&self) -> ARCacheWriteTxn<'_, K, V, ()> {
640 self.write_stats(())
641 }
642
643 pub fn write_stats<S>(&self, stats: S) -> ARCacheWriteTxn<'_, K, V, S>
645 where
646 S: ARCacheWriteStat<K>,
647 {
648 let above_watermark = self.above_watermark.load(Ordering::Relaxed);
649 ARCacheWriteTxn {
650 caller: self,
651 cache: self.cache.write(),
652 tlocal: Map::new(),
653 hit: UnsafeCell::new(Vec::new()),
654 clear: UnsafeCell::new(false),
655 above_watermark,
656 stats,
658 }
659 }
660
661 fn try_write_stats<S>(&self, stats: S) -> Result<ARCacheWriteTxn<'_, K, V, S>, S>
662 where
663 S: ARCacheWriteStat<K>,
664 {
665 match self.cache.try_write() {
666 Some(cache) => {
667 let above_watermark = self.above_watermark.load(Ordering::Relaxed);
668 Ok(ARCacheWriteTxn {
669 caller: self,
670 cache,
671 tlocal: Map::new(),
672 hit: UnsafeCell::new(Vec::new()),
673 clear: UnsafeCell::new(false),
674 above_watermark,
675 stats,
677 })
678 }
679 None => Err(stats),
680 }
681 }
682
683 pub fn try_quiesce_stats<S>(&self, stats: S) -> S
686 where
687 S: ARCacheWriteStat<K>,
688 {
689 match self.try_write_stats(stats) {
693 Ok(wr_txn) => wr_txn.commit(),
694 Err(stats) => stats,
695 }
696 }
697
698 pub fn try_quiesce(&self) {
701 self.try_quiesce_stats(())
702 }
703
704 fn calc_p_freq(ghost_rec_len: usize, ghost_freq_len: usize, p: &mut usize, size: usize) {
705 let delta = if ghost_rec_len > ghost_freq_len {
706 ghost_rec_len / ghost_freq_len
707 } else {
708 1
709 } * size;
710 let p_was = *p;
711 if delta < *p {
712 *p -= delta
713 } else {
714 *p = 0
715 }
716 tracing::trace!("f {} >>> {}", p_was, *p);
717 }
718
719 fn calc_p_rec(
720 cap: usize,
721 ghost_rec_len: usize,
722 ghost_freq_len: usize,
723 p: &mut usize,
724 size: usize,
725 ) {
726 let delta = if ghost_freq_len > ghost_rec_len {
727 ghost_freq_len / ghost_rec_len
728 } else {
729 1
730 } * size;
731 let p_was = *p;
732 if delta <= cap - *p {
733 *p += delta
734 } else {
735 *p = cap
736 }
737 tracing::trace!("r {} >>> {}", p_was, *p);
738 }
739
740 fn drain_tlocal_inc<S>(
741 &self,
742 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
743 inner: &mut ArcInner<K, V>,
744 shared: &ArcShared<K, V>,
745 tlocal: Map<K, ThreadCacheItem<V>>,
746 commit_txid: u64,
747 stats: &mut S,
748 ) where
749 S: ARCacheWriteStat<K>,
750 {
751 tlocal.into_iter().for_each(|(k, tcio)| {
753 #[cfg(test)]
754 {
755 inner.rec.verify();
756 inner.freq.verify();
757 inner.ghost_rec.verify();
758 inner.ghost_freq.verify();
759 inner.haunted.verify();
760 }
761
762 let r = cache.get_mut(&k);
763 match (r, tcio) {
764 (None, ThreadCacheItem::Present(tci, clean, size)) => {
765 assert!(clean);
766 let llp = inner.rec.append_k(CacheItemInner {
767 k: k.clone(),
768 txid: commit_txid,
769 size,
770 });
771 stats.include(&k);
773 let existing = cache.insert(k, CacheItem::Rec(llp, tci));
775 assert!(
776 existing.is_none(),
777 "Impossible state! Key must not already exist in cache!"
778 );
779 }
780 (None, ThreadCacheItem::Removed(clean)) => {
781 assert!(clean);
782 let llp = inner.haunted.append_k(CacheItemInner {
784 k: k.clone(),
785 txid: commit_txid,
786 size: HAUNTED_SIZE,
787 });
788 let existing = cache.insert(k, CacheItem::Haunted(llp));
790 assert!(
791 existing.is_none(),
792 "Impossible state! Key must not already exist in cache!"
793 );
794 debug_assert!(inner.haunted.len() > 0);
796 }
797 (Some(ref mut ci), ThreadCacheItem::Removed(clean)) => {
798 assert!(clean);
799 let mut next_state = match ci {
801 CacheItem::Freq(llp, _v) => {
802 let mut owned = inner.freq.extract(llp.clone());
803 owned.as_mut().txid = commit_txid;
804 owned.as_mut().size = HAUNTED_SIZE;
805 let pointer = inner.haunted.append_n(owned);
806 debug_assert!(inner.haunted.len() > 0);
807 CacheItem::Haunted(pointer)
808 }
809 CacheItem::Rec(llp, _v) => {
810 let mut owned = inner.rec.extract(llp.clone());
812 owned.as_mut().txid = commit_txid;
813 owned.as_mut().size = HAUNTED_SIZE;
814 let pointer = inner.haunted.append_n(owned);
815 debug_assert!(inner.haunted.len() > 0);
816 CacheItem::Haunted(pointer)
817 }
818 CacheItem::GhostFreq(llp) => {
819 let mut owned = inner.ghost_freq.extract(llp.clone());
820 owned.as_mut().txid = commit_txid;
821 owned.as_mut().size = HAUNTED_SIZE;
822 let pointer = inner.haunted.append_n(owned);
823 debug_assert!(inner.haunted.len() > 0);
824 CacheItem::Haunted(pointer)
825 }
826 CacheItem::GhostRec(llp) => {
827 let mut owned = inner.ghost_rec.extract(llp.clone());
828 owned.as_mut().txid = commit_txid;
829 owned.as_mut().size = HAUNTED_SIZE;
830 let pointer = inner.haunted.append_n(owned);
831 debug_assert!(inner.haunted.len() > 0);
832 CacheItem::Haunted(pointer)
833 }
834 CacheItem::Haunted(llp) => {
835 unsafe { llp.make_mut().txid = commit_txid };
836 debug_assert!(inner.haunted.len() > 0);
837 CacheItem::Haunted(llp.clone())
838 }
839 };
840 mem::swap(*ci, &mut next_state);
842 }
843 (Some(ref mut ci), ThreadCacheItem::Present(tci, clean, size)) => {
846 assert!(clean);
847 let mut next_state = match ci {
850 CacheItem::Freq(llp, _v) => {
851 let mut owned = inner.freq.extract(llp.clone());
852 owned.as_mut().txid = commit_txid;
853 owned.as_mut().size = size;
854 stats.modify(&owned.as_ref().k);
856 let pointer = inner.freq.append_n(owned);
857 CacheItem::Freq(pointer, tci)
859 }
860 CacheItem::Rec(llp, _v) => {
861 let mut owned = inner.rec.extract(llp.clone());
863 owned.as_mut().txid = commit_txid;
864 owned.as_mut().size = size;
865 stats.modify(&owned.as_ref().k);
866 let pointer = inner.freq.append_n(owned);
867 CacheItem::Freq(pointer, tci)
868 }
869 CacheItem::GhostFreq(llp) => {
870 Self::calc_p_freq(
872 inner.ghost_rec.len(),
873 inner.ghost_freq.len(),
874 &mut inner.p,
875 size,
876 );
877 let mut owned = inner.ghost_freq.extract(llp.clone());
878 owned.as_mut().txid = commit_txid;
879 owned.as_mut().size = size;
880 stats.ghost_frequent_revive(&owned.as_ref().k);
881 let pointer = inner.freq.append_n(owned);
882 CacheItem::Freq(pointer, tci)
883 }
884 CacheItem::GhostRec(llp) => {
885 Self::calc_p_rec(
887 shared.max,
888 inner.ghost_rec.len(),
889 inner.ghost_freq.len(),
890 &mut inner.p,
891 size,
892 );
893 let mut owned = inner.ghost_rec.extract(llp.clone());
894 owned.as_mut().txid = commit_txid;
895 owned.as_mut().size = size;
896 stats.ghost_recent_revive(&owned.as_ref().k);
897 let pointer = inner.rec.append_n(owned);
898 CacheItem::Rec(pointer, tci)
899 }
900 CacheItem::Haunted(llp) => {
901 let before_len = inner.haunted.len();
906 debug_assert!(inner.haunted.len() > 0);
907 let mut owned = inner.haunted.extract(llp.clone());
908 debug_assert!(before_len - HAUNTED_SIZE == inner.haunted.len());
909
910 owned.as_mut().txid = commit_txid;
911 owned.as_mut().size = size;
912 stats.include_haunted(&owned.as_ref().k);
913 let pointer = inner.rec.append_n(owned);
914 CacheItem::Rec(pointer, tci)
915 }
916 };
917 mem::swap(*ci, &mut next_state);
919 }
920 }
921
922 #[cfg(test)]
923 {
924 inner.rec.verify();
925 inner.freq.verify();
926 inner.ghost_rec.verify();
927 inner.ghost_freq.verify();
928 inner.haunted.verify();
929 }
930 });
931 }
932
933 fn drain_hit_rx(
934 &self,
935 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
936 inner: &mut ArcInner<K, V>,
937 commit_ts: Instant,
938 ) {
939 while let Some(ce) = inner.hit_queue.pop() {
943 let CacheHitEvent { t, k_hash } = ce;
944 if let Some(ref mut ci_slots) = unsafe { cache.get_slot_mut(k_hash) } {
945 for ref mut ci in ci_slots.iter_mut() {
946 let mut next_state = match &ci.v {
947 CacheItem::Freq(llp, v) => {
948 inner.freq.touch(llp.to_owned());
949 CacheItem::Freq(llp.to_owned(), v.to_owned())
950 }
951 CacheItem::Rec(llp, v) => {
952 let owned = inner.rec.extract(llp.to_owned());
953 let pointer = inner.freq.append_n(owned);
954 CacheItem::Freq(pointer, v.to_owned())
955 }
956 CacheItem::GhostFreq(llp) => {
959 inner.ghost_freq.touch(llp.to_owned());
960 CacheItem::GhostFreq(llp.to_owned())
961 }
962 CacheItem::GhostRec(llp) => {
963 inner.ghost_rec.touch(llp.to_owned());
964 CacheItem::GhostRec(llp.to_owned())
965 }
966 CacheItem::Haunted(llp) => {
967 debug_assert!(inner.haunted.len() > 0);
971 CacheItem::Haunted(llp.to_owned())
972 }
973 };
974 mem::swap(&mut ci.v, &mut next_state);
975 } }
977 if t >= commit_ts {
981 break;
982 }
983 }
984 }
985
986 fn drain_inc_rx<S>(
987 &self,
988 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
989 inner: &mut ArcInner<K, V>,
990 shared: &ArcShared<K, V>,
991 commit_ts: Instant,
992 stats: &mut S,
993 ) where
994 S: ARCacheWriteStat<K>,
995 {
996 while let Some(ce) = inner.inc_queue.pop() {
997 let CacheIncludeEvent {
999 t,
1000 k,
1001 v: iv,
1002 txid,
1003 size,
1004 } = ce;
1005 let mut r = cache.get_mut(&k);
1006 match r {
1007 Some(ref mut ci) => {
1008 let mut next_state = match &ci {
1009 CacheItem::Freq(llp, _v) => {
1010 if llp.as_ref().txid >= txid || inner.min_txid > txid {
1011 inner.freq.touch(llp.to_owned());
1013 None
1014 } else {
1015 let mut owned = inner.freq.extract(llp.to_owned());
1017 owned.as_mut().txid = txid;
1018 owned.as_mut().size = size;
1019 stats.modify(&owned.as_mut().k);
1020 let pointer = inner.freq.append_n(owned);
1021 Some(CacheItem::Freq(pointer, iv))
1022 }
1023 }
1024 CacheItem::Rec(llp, v) => {
1025 let mut owned = inner.rec.extract(llp.to_owned());
1026 if llp.as_ref().txid >= txid || inner.min_txid > txid {
1027 let pointer = inner.freq.append_n(owned);
1028 Some(CacheItem::Freq(pointer, v.to_owned()))
1029 } else {
1030 owned.as_mut().txid = txid;
1031 owned.as_mut().size = size;
1032 stats.modify(&owned.as_mut().k);
1033 let pointer = inner.freq.append_n(owned);
1034 Some(CacheItem::Freq(pointer, iv))
1035 }
1036 }
1037 CacheItem::GhostFreq(llp) => {
1038 if llp.as_ref().txid > txid || inner.min_txid > txid {
1040 let size = llp.as_ref().size;
1042 Self::calc_p_freq(
1043 inner.ghost_rec.len(),
1044 inner.ghost_freq.len(),
1045 &mut inner.p,
1046 size,
1047 );
1048 inner.ghost_freq.touch(llp.to_owned());
1049 None
1050 } else {
1051 Self::calc_p_freq(
1053 inner.ghost_rec.len(),
1054 inner.ghost_freq.len(),
1055 &mut inner.p,
1056 size,
1057 );
1058 let mut owned = inner.ghost_freq.extract(llp.to_owned());
1059 owned.as_mut().txid = txid;
1060 owned.as_mut().size = size;
1061 stats.ghost_frequent_revive(&owned.as_mut().k);
1062 let pointer = inner.freq.append_n(owned);
1063 Some(CacheItem::Freq(pointer, iv))
1064 }
1065 }
1066 CacheItem::GhostRec(llp) => {
1067 if llp.as_ref().txid > txid || inner.min_txid > txid {
1069 let size = llp.as_ref().size;
1070 Self::calc_p_rec(
1071 shared.max,
1072 inner.ghost_rec.len(),
1073 inner.ghost_freq.len(),
1074 &mut inner.p,
1075 size,
1076 );
1077 inner.ghost_rec.touch(llp.clone());
1078 None
1079 } else {
1080 Self::calc_p_rec(
1081 shared.max,
1082 inner.ghost_rec.len(),
1083 inner.ghost_freq.len(),
1084 &mut inner.p,
1085 size,
1086 );
1087 let mut owned = inner.ghost_rec.extract(llp.to_owned());
1088 owned.as_mut().txid = txid;
1089 owned.as_mut().size = size;
1090 stats.ghost_recent_revive(&owned.as_ref().k);
1091 let pointer = inner.rec.append_n(owned);
1092 Some(CacheItem::Rec(pointer, iv))
1093 }
1094 }
1095 CacheItem::Haunted(llp) => {
1096 if llp.as_ref().txid > txid || inner.min_txid > txid {
1102 None
1103 } else {
1104 debug_assert!(inner.haunted.len() > 0);
1107 let before_len = inner.haunted.len();
1108
1109 let mut owned = inner.haunted.extract(llp.to_owned());
1110
1111 debug_assert!(before_len - HAUNTED_SIZE == inner.haunted.len());
1112
1113 owned.as_mut().txid = txid;
1114 debug_assert!(owned.as_mut().size == HAUNTED_SIZE);
1115 stats.include_haunted(&owned.as_mut().k);
1116 let pointer = inner.rec.append_n(owned);
1117 Some(CacheItem::Rec(pointer, iv))
1118 }
1119 }
1120 };
1121 if let Some(ref mut next_state) = next_state {
1122 mem::swap(*ci, next_state);
1123 }
1124 }
1125 None => {
1126 if txid >= inner.min_txid {
1129 let llp = inner.rec.append_k(CacheItemInner {
1130 k: k.clone(),
1131 txid,
1132 size,
1133 });
1134 stats.include(&k);
1135 let existing = cache.insert(k, CacheItem::Rec(llp, iv));
1137 assert!(
1138 existing.is_none(),
1139 "Impossible state! Key must not already exist in cache!"
1140 );
1141 }
1142 }
1143 };
1144
1145 if t >= commit_ts {
1147 break;
1148 }
1149 }
1150 }
1151
1152 fn drain_tlocal_hits(
1153 &self,
1154 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1155 inner: &mut ArcInner<K, V>,
1156 commit_txid: u64,
1158 hit: Vec<u64>,
1159 ) {
1160 hit.into_iter().for_each(|k_hash| {
1162 let mut r = unsafe { cache.get_slot_mut(k_hash) };
1172 match r {
1173 Some(ref mut ci_slots) => {
1174 for ref mut ci in ci_slots.iter_mut() {
1175 let mut next_state = match &ci.v {
1181 CacheItem::Freq(llp, v) => {
1182 if llp.as_ref().txid != commit_txid {
1185 inner.freq.touch(llp.to_owned());
1186 Some(CacheItem::Freq(llp.to_owned(), v.to_owned()))
1187 } else {
1188 None
1189 }
1190 }
1191 CacheItem::Rec(llp, v) => {
1192 if llp.as_ref().txid != commit_txid {
1193 let owned = inner.rec.extract(llp.clone());
1195 let pointer = inner.freq.append_n(owned);
1196 Some(CacheItem::Freq(pointer, v.clone()))
1197 } else {
1198 None
1199 }
1200 }
1201 _ => {
1202 None
1204 }
1205 };
1206 if let Some(ref mut next_state) = next_state {
1208 mem::swap(&mut ci.v, next_state);
1209 }
1210 } }
1212 None => {
1213 unreachable!();
1215 }
1216 }
1217 });
1218 }
1219
1220 fn evict_to_haunted_len(
1221 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1222 ll: &mut LL<CacheItemInner<K>>,
1223 to_ll: &mut LL<CacheItemInner<K>>,
1224 size: usize,
1225 txid: u64,
1226 ) {
1227 let to_ll_before = to_ll.len();
1228 let ll_before = ll.len();
1229 let mut added = 0;
1230 let mut removed = 0;
1231
1232 while ll.len() > size {
1233 #[cfg(test)]
1234 {
1235 ll.verify();
1236 to_ll.verify();
1237 }
1238
1239 if let Some(mut owned) = ll.pop() {
1240 debug_assert!(!owned.is_null());
1241
1242 removed += owned.as_mut().size;
1244
1245 assert_eq!(ll.len(), ll_before - removed);
1246
1247 owned.as_mut().txid = txid;
1249 owned.as_mut().size = HAUNTED_SIZE;
1251 added += HAUNTED_SIZE;
1252
1253 let pointer = to_ll.append_n(owned);
1254
1255 assert_eq!(
1256 to_ll.len(),
1257 to_ll_before + added,
1258 "Impossible State! List lengths are no longer consistent!"
1259 );
1260
1261 let mut r = cache.get_mut(&pointer.as_ref().k);
1262
1263 match r {
1264 Some(ref mut ci) => {
1265 let mut next_state = CacheItem::Haunted(pointer);
1267 mem::swap(*ci, &mut next_state);
1268 }
1269 None => {
1270 unreachable!();
1272 }
1273 };
1274 } else {
1275 unreachable!();
1277 }
1278
1279 #[cfg(test)]
1280 {
1281 ll.verify();
1282 to_ll.verify();
1283 }
1284 }
1285 }
1286
1287 fn evict_to_len<S>(
1288 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1289 ll: &mut LL<CacheItemInner<K>>,
1290 to_ll: &mut LL<CacheItemInner<K>>,
1291 size: usize,
1292 txid: u64,
1293 stats: &mut S,
1294 ) where
1295 S: ARCacheWriteStat<K>,
1296 {
1297 debug_assert!(ll.len() >= size);
1298
1299 while ll.len() > size {
1300 #[cfg(test)]
1301 {
1302 ll.verify();
1303 to_ll.verify();
1304 }
1305
1306 if let Some(mut owned) = ll.pop() {
1307 debug_assert!(!owned.is_null());
1308 let mut r = cache.get_mut(&owned.as_ref().k);
1309 owned.as_mut().txid = txid;
1311 match r {
1312 Some(ref mut ci) => {
1313 let mut next_state = match &ci {
1314 CacheItem::Freq(llp, _v) => {
1315 assert!(llp == &owned, "Impossible State! Pointer in map does not match the pointer from the list!");
1317 stats.evict_from_frequent(&owned.as_ref().k);
1320 let pointer = to_ll.append_n(owned);
1321 CacheItem::GhostFreq(pointer)
1322 }
1323 CacheItem::Rec(llp, _v) => {
1324 assert!(llp == &owned, "Impossible State! Pointer in map does not match the pointer from the list!");
1326 stats.evict_from_recent(&owned.as_mut().k);
1329 let pointer = to_ll.append_n(owned);
1330 CacheItem::GhostRec(pointer)
1331 }
1332 _ => {
1333 unreachable!();
1336 }
1337 };
1338 mem::swap(*ci, &mut next_state);
1340 }
1341 None => {
1342 unreachable!();
1347 }
1348 };
1349 } else {
1350 unreachable!();
1352 }
1353
1354 #[cfg(test)]
1355 {
1356 ll.verify();
1357 to_ll.verify();
1358 }
1359 } }
1361
1362 #[allow(clippy::cognitive_complexity)]
1363 fn evict<S>(
1364 &self,
1365 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1366 inner: &mut ArcInner<K, V>,
1367 shared: &ArcShared<K, V>,
1368 commit_txid: u64,
1369 stats: &mut S,
1370 ) where
1371 S: ARCacheWriteStat<K>,
1372 {
1373 debug_assert!(inner.p <= shared.max);
1374 let p = inner.p;
1376
1377 if inner.rec.len() + inner.freq.len() > shared.max {
1378 trace!(
1380 "from -> rec {:?}, freq {:?}",
1381 inner.rec.len(),
1382 inner.freq.len()
1383 );
1384 let delta = (inner.rec.len() + inner.freq.len()) - shared.max;
1385 let rec_to_len = if inner.p == 0 {
1399 trace!("p == 0 => {:?} - {}", inner.rec.len(), delta);
1400 if delta < inner.rec.len() {
1401 inner.rec.len() - delta
1403 } else {
1404 0
1406 }
1407 } else if inner.rec.len() > inner.p {
1408 let rec_delta = inner.rec.len() - inner.p;
1410 if rec_delta > delta {
1411 inner.rec.len() - delta
1424 } else {
1425 inner.rec.len() - rec_delta
1436 }
1437 } else {
1438 inner.rec.len()
1442 };
1443
1444 let freq_to_len = shared.max - rec_to_len;
1446 debug_assert!(rec_to_len <= inner.rec.len());
1448 debug_assert!(freq_to_len <= inner.freq.len());
1449
1450 Self::evict_to_len(
1456 cache,
1457 &mut inner.rec,
1458 &mut inner.ghost_rec,
1459 rec_to_len,
1460 commit_txid,
1461 stats,
1462 );
1463 Self::evict_to_len(
1464 cache,
1465 &mut inner.freq,
1466 &mut inner.ghost_freq,
1467 freq_to_len,
1468 commit_txid,
1469 stats,
1470 );
1471
1472 if inner.ghost_rec.len() > (shared.max - p) {
1476 Self::evict_to_haunted_len(
1477 cache,
1478 &mut inner.ghost_rec,
1479 &mut inner.haunted,
1480 freq_to_len,
1481 commit_txid,
1482 );
1483 }
1484
1485 if inner.ghost_freq.len() > p {
1486 Self::evict_to_haunted_len(
1487 cache,
1488 &mut inner.ghost_freq,
1489 &mut inner.haunted,
1490 rec_to_len,
1491 commit_txid,
1492 );
1493 }
1494 }
1495 }
1496
1497 fn drain_ll_to_ghost<S>(
1498 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1499 ll: &mut LL<CacheItemInner<K>>,
1500 gf: &mut LL<CacheItemInner<K>>,
1501 gr: &mut LL<CacheItemInner<K>>,
1502 txid: u64,
1503 stats: &mut S,
1504 ) where
1505 S: ARCacheWriteStat<K>,
1506 {
1507 while let Some(mut owned) = ll.pop() {
1508 #[cfg(test)]
1509 {
1510 ll.verify();
1511 gf.verify();
1512 gr.verify();
1513 }
1514
1515 debug_assert!(!owned.is_null());
1516
1517 owned.as_mut().txid = txid;
1519
1520 let mut r = cache.get_mut(&owned.as_ref().k);
1521 match r {
1522 Some(ref mut ci) => {
1523 let mut next_state = match &ci {
1524 CacheItem::Freq(n, _) => {
1525 debug_assert!(n == &owned);
1526 stats.evict_from_frequent(&owned.as_ref().k);
1527 let pointer = gf.append_n(owned);
1528 CacheItem::GhostFreq(pointer)
1529 }
1530 CacheItem::Rec(n, _) => {
1531 debug_assert!(n == &owned);
1532 stats.evict_from_recent(&owned.as_ref().k);
1533 let pointer = gr.append_n(owned);
1534 CacheItem::GhostRec(pointer)
1535 }
1536 _ => {
1537 unreachable!();
1539 }
1540 };
1541 mem::swap(*ci, &mut next_state);
1543 }
1544 None => {
1545 unreachable!();
1550 }
1551 }
1552
1553 #[cfg(test)]
1554 {
1555 ll.verify();
1556 gf.verify();
1557 gr.verify();
1558 }
1559 } }
1561
1562 fn drain_ll_min_txid(
1563 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1564 ll: &mut LL<CacheItemInner<K>>,
1565 min_txid: u64,
1566 ) {
1567 while let Some(node) = ll.peek_head() {
1568 #[cfg(test)]
1569 {
1570 ll.verify();
1571 }
1572
1573 if node.txid < min_txid {
1575 let before_len = ll.len();
1576 debug_assert!(ll.len() > 0);
1577
1578 cache.remove(&node.k);
1580
1581 ll.drop_head();
1583
1584 debug_assert!(before_len - HAUNTED_SIZE == ll.len());
1585 } else {
1586 break;
1589 }
1590
1591 #[cfg(test)]
1592 {
1593 ll.verify();
1594 }
1595 }
1596 }
1597
1598 #[allow(clippy::unnecessary_mut_passed)]
1599 fn commit<S>(
1600 &self,
1601 mut cache: DataMapWriteTxn<K, CacheItem<K, V>>,
1602 tlocal: Map<K, ThreadCacheItem<V>>,
1603 hit: Vec<u64>,
1604 clear: bool,
1605 init_above_watermark: bool,
1606 mut stats: S,
1608 ) -> S
1609 where
1610 S: ARCacheWriteStat<K>,
1611 {
1612 let commit_ts = Instant::now();
1614 let commit_txid = cache.get_txid();
1615 let mut inner = self.inner.lock().unwrap();
1617 let shared = self.shared.read().unwrap();
1618
1619 if clear {
1625 inner.min_txid = commit_txid;
1627
1628 let m_inner = inner.deref_mut();
1634
1635 let i_f = &mut m_inner.freq;
1636 let g_f = &mut m_inner.ghost_freq;
1637 let i_r = &mut m_inner.rec;
1638 let g_r = &mut m_inner.ghost_rec;
1639
1640 Self::drain_ll_to_ghost(&mut cache, i_f, g_f, g_r, commit_txid, &mut stats);
1642 Self::drain_ll_to_ghost(&mut cache, i_r, g_f, g_r, commit_txid, &mut stats);
1643 } else {
1644 let possible_new_limit = commit_txid.saturating_sub(self.look_back_limit);
1649 inner.min_txid = inner.min_txid.max(possible_new_limit);
1650 }
1651
1652 self.drain_tlocal_inc(
1662 &mut cache,
1663 inner.deref_mut(),
1664 shared.deref(),
1665 tlocal,
1666 commit_txid,
1667 &mut stats,
1668 );
1669
1670 self.drain_inc_rx(
1672 &mut cache,
1673 inner.deref_mut(),
1674 shared.deref(),
1675 commit_ts,
1676 &mut stats,
1677 );
1678
1679 self.drain_hit_rx(&mut cache, inner.deref_mut(), commit_ts);
1680
1681 self.drain_tlocal_hits(&mut cache, inner.deref_mut(), commit_txid, hit);
1687
1688 self.evict(
1695 &mut cache,
1696 inner.deref_mut(),
1697 shared.deref(),
1698 commit_txid,
1699 &mut stats,
1700 );
1701
1702 {
1707 let min_txid = inner.min_txid;
1709 Self::drain_ll_min_txid(&mut cache, &mut inner.haunted, min_txid);
1710 }
1711
1712 stats.p_weight(inner.p as u64);
1713 stats.shared_max(shared.max as u64);
1714 stats.freq(inner.freq.len() as u64);
1715 stats.recent(inner.rec.len() as u64);
1716 stats.all_seen_keys(cache.len() as u64);
1717
1718 if init_above_watermark {
1723 if (inner.freq.len() + inner.rec.len()) < shared.watermark {
1725 self.above_watermark.store(false, Ordering::Relaxed);
1726 }
1727 } else if (inner.freq.len() + inner.rec.len()) >= shared.watermark {
1728 self.above_watermark.store(true, Ordering::Relaxed);
1731 }
1732
1733 cache.commit();
1735 stats
1739 }
1740}
1741
1742impl<
1743 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
1744 V: Clone + Debug + Sync + Send + 'static,
1745 S: ARCacheWriteStat<K>,
1746 > ARCacheWriteTxn<'_, K, V, S>
1747{
1748 pub fn commit(self) -> S {
1755 self.caller.commit(
1756 self.cache,
1757 self.tlocal,
1758 self.hit.into_inner(),
1759 self.clear.into_inner(),
1760 self.above_watermark,
1761 self.stats,
1763 )
1764 }
1765
1766 pub fn clear(&mut self) {
1770 unsafe {
1772 let clear_ptr = self.clear.get();
1773 *clear_ptr = true;
1774 }
1775 unsafe {
1777 let hit_ptr = self.hit.get();
1778 (*hit_ptr).clear();
1779 }
1780
1781 self.stats.cache_clear();
1784 self.tlocal.clear();
1793 }
1796
1797 pub fn get<Q>(&mut self, k: &Q) -> Option<&V>
1802 where
1803 K: Borrow<Q>,
1804 Q: Hash + Eq + Ord + ?Sized,
1805 {
1806 let k_hash: u64 = self.cache.prehash(k);
1807
1808 self.stats.cache_read();
1816
1817 let r: Option<&V> = if let Some(tci) = self.tlocal.get(k) {
1818 match tci {
1819 ThreadCacheItem::Present(v, _clean, _size) => {
1820 let v = v as *const _;
1821 unsafe { Some(&(*v)) }
1822 }
1823 ThreadCacheItem::Removed(_clean) => {
1824 return None;
1825 }
1826 }
1827 } else {
1828 let is_cleared = unsafe {
1831 let clear_ptr = self.clear.get();
1832 *clear_ptr
1833 };
1834 if !is_cleared {
1835 if let Some(v) = self.cache.get_prehashed(k, k_hash) {
1836 (*v).to_vref()
1837 } else {
1838 None
1839 }
1840 } else {
1841 None
1842 }
1843 };
1844
1845 if r.is_some() {
1846 self.stats.cache_hit();
1847 }
1848
1849 if self.above_watermark && r.is_some() {
1854 unsafe {
1855 let hit_ptr = self.hit.get();
1856 (*hit_ptr).push(k_hash);
1857 }
1858 }
1859 r
1860 }
1861
1862 pub fn get_mut<Q>(&mut self, k: &Q, make_dirty: bool) -> Option<&mut V>
1871 where
1872 K: Borrow<Q>,
1873 Q: Hash + Eq + Ord + ?Sized,
1874 {
1875 let is_cleared = unsafe {
1877 let clear_ptr = self.clear.get();
1878 *clear_ptr
1879 };
1880
1881 if !is_cleared && !self.tlocal.contains_key(k) {
1884 let k_hash: u64 = self.cache.prehash(k);
1886 if let Some(v) = self.cache.get_prehashed(k, k_hash) {
1887 if let Some((dk, dv, ds)) = v.to_kvsref() {
1888 self.tlocal.insert(
1889 dk.clone(),
1890 ThreadCacheItem::Present(dv.clone(), !make_dirty, ds),
1891 );
1892 }
1893 }
1894 };
1895
1896 match self.tlocal.get_mut(k) {
1899 Some(ThreadCacheItem::Present(v, clean, _size)) => {
1900 if make_dirty && *clean {
1901 *clean = false;
1902 }
1903 let v = v as *mut _;
1904 unsafe { Some(&mut (*v)) }
1905 }
1906 _ => None,
1907 }
1908 }
1909
1910 pub fn contains_key<Q>(&mut self, k: &Q) -> bool
1912 where
1913 K: Borrow<Q>,
1914 Q: Hash + Eq + Ord + ?Sized,
1915 {
1916 self.get(k).is_some()
1917 }
1918
1919 pub fn insert(&mut self, k: K, v: V) {
1924 self.tlocal.insert(k, ThreadCacheItem::Present(v, true, 1));
1925 }
1926
1927 pub fn insert_sized(&mut self, k: K, v: V, size: NonZeroUsize) {
1929 self.tlocal
1930 .insert(k, ThreadCacheItem::Present(v, true, size.get()));
1931 }
1932
1933 pub fn remove(&mut self, k: K) {
1937 self.tlocal.insert(k, ThreadCacheItem::Removed(true));
1938 }
1939
1940 pub fn insert_dirty(&mut self, k: K, v: V) {
1946 self.tlocal.insert(k, ThreadCacheItem::Present(v, false, 1));
1947 }
1948
1949 pub fn insert_dirty_sized(&mut self, k: K, v: V, size: NonZeroUsize) {
1951 self.tlocal
1952 .insert(k, ThreadCacheItem::Present(v, false, size.get()));
1953 }
1954
1955 pub fn remove_dirty(&mut self, k: K) {
1960 self.tlocal.insert(k, ThreadCacheItem::Removed(false));
1961 }
1962
1963 pub fn is_dirty(&self) -> bool {
1965 self.iter_dirty().take(1).next().is_some()
1966 }
1967
1968 pub fn iter_dirty(&self) -> impl Iterator<Item = (&K, Option<&V>)> {
1972 self.tlocal
1973 .iter()
1974 .filter(|(_k, v)| match v {
1975 ThreadCacheItem::Present(_v, c, _size) => !c,
1976 ThreadCacheItem::Removed(c) => !c,
1977 })
1978 .map(|(k, v)| {
1979 let data = match v {
1981 ThreadCacheItem::Present(v, _c, _size) => Some(v),
1982 ThreadCacheItem::Removed(_c) => None,
1983 };
1984 (k, data)
1985 })
1986 }
1987
1988 pub fn iter_mut_dirty(&mut self) -> impl Iterator<Item = (&K, Option<&mut V>)> {
1992 self.tlocal
1993 .iter_mut()
1994 .filter(|(_k, v)| match v {
1995 ThreadCacheItem::Present(_v, c, _size) => !c,
1996 ThreadCacheItem::Removed(c) => !c,
1997 })
1998 .map(|(k, v)| {
1999 let data = match v {
2001 ThreadCacheItem::Present(v, _c, _size) => Some(v),
2002 ThreadCacheItem::Removed(_c) => None,
2003 };
2004 (k, data)
2005 })
2006 }
2007
2008 pub fn iter_mut_mark_clean(&mut self) -> impl Iterator<Item = (&K, Option<&mut V>)> {
2013 self.tlocal
2014 .iter_mut()
2015 .filter(|(_k, v)| match v {
2016 ThreadCacheItem::Present(_v, c, _size) => !c,
2017 ThreadCacheItem::Removed(c) => !c,
2018 })
2019 .map(|(k, v)| {
2020 match v {
2022 ThreadCacheItem::Present(_v, c, _size) => *c = true,
2023 ThreadCacheItem::Removed(c) => *c = true,
2024 }
2025 let data = match v {
2027 ThreadCacheItem::Present(v, _c, _size) => Some(v),
2028 ThreadCacheItem::Removed(_c) => None,
2029 };
2030 (k, data)
2031 })
2032 }
2033
2034 pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
2036 self.cache.values().filter_map(|ci| match &ci {
2037 CacheItem::Rec(lln, v) => {
2038 let cii = lln.as_ref();
2039 Some((&cii.k, v))
2040 }
2041 CacheItem::Freq(lln, v) => {
2042 let cii = lln.as_ref();
2043 Some((&cii.k, v))
2044 }
2045 _ => None,
2046 })
2047 }
2048
2049 pub fn iter_rec(&self) -> impl Iterator<Item = &K> {
2052 self.cache.values().filter_map(|ci| match &ci {
2053 CacheItem::Rec(lln, _) => {
2054 let cii = lln.as_ref();
2055 Some(&cii.k)
2056 }
2057 _ => None,
2058 })
2059 }
2060
2061 pub fn iter_freq(&self) -> impl Iterator<Item = &K> {
2064 self.cache.values().filter_map(|ci| match &ci {
2065 CacheItem::Rec(lln, _) => {
2066 let cii = lln.as_ref();
2067 Some(&cii.k)
2068 }
2069 _ => None,
2070 })
2071 }
2072
2073 #[cfg(test)]
2074 pub(crate) fn iter_ghost_rec(&self) -> impl Iterator<Item = &K> {
2075 self.cache.values().filter_map(|ci| match &ci {
2076 CacheItem::GhostRec(lln) => {
2077 let cii = lln.as_ref();
2078 Some(&cii.k)
2079 }
2080 _ => None,
2081 })
2082 }
2083
2084 #[cfg(test)]
2085 pub(crate) fn iter_ghost_freq(&self) -> impl Iterator<Item = &K> {
2086 self.cache.values().filter_map(|ci| match &ci {
2087 CacheItem::GhostFreq(lln) => {
2088 let cii = lln.as_ref();
2089 Some(&cii.k)
2090 }
2091 _ => None,
2092 })
2093 }
2094
2095 #[cfg(test)]
2096 pub(crate) fn peek_hit(&self) -> &[u64] {
2097 let hit_ptr = self.hit.get();
2098 unsafe { &(*hit_ptr) }
2099 }
2100
2101 #[cfg(test)]
2102 pub(crate) fn peek_cache<Q: ?Sized + Hash + Eq + Ord>(&self, k: &Q) -> CacheState
2103 where
2104 K: Borrow<Q>,
2105 {
2106 if let Some(v) = self.cache.get(k) {
2107 (*v).to_state()
2108 } else {
2109 CacheState::None
2110 }
2111 }
2112
2113 #[cfg(test)]
2114 pub(crate) fn peek_stat(&self) -> CStat {
2115 let inner = self.caller.inner.lock().unwrap();
2116 let shared = self.caller.shared.read().unwrap();
2117 CStat {
2118 max: shared.max,
2119 cache: self.cache.len(),
2120 tlocal: self.tlocal.len(),
2121 freq: inner.freq.len(),
2122 rec: inner.rec.len(),
2123 ghost_freq: inner.ghost_freq.len(),
2124 ghost_rec: inner.ghost_rec.len(),
2125 haunted: inner.haunted.len(),
2126 p: inner.p,
2127 }
2128 }
2129
2130 }
2132
2133impl<
2134 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
2135 V: Clone + Debug + Sync + Send + 'static,
2136 S: ARCacheReadStat + Clone,
2137 > ARCacheReadTxn<'_, K, V, S>
2138{
2139 pub fn get<Q>(&mut self, k: &Q) -> Option<&V>
2144 where
2145 K: Borrow<Q>,
2146 Q: Hash + Eq + Ord + ?Sized,
2147 {
2148 let k_hash: u64 = self.cache.prehash(k);
2149
2150 self.stats.cache_read();
2151 let mut hits = false;
2154 let mut tlocal_hits = false;
2155
2156 let r: Option<&V> = self
2157 .tlocal
2158 .as_ref()
2159 .and_then(|cache| {
2160 cache.set.get(k).map(|v| {
2161 tlocal_hits = true;
2163
2164 if self.above_watermark {
2165 let _ = self.hit_queue.push(CacheHitEvent {
2166 t: Instant::now(),
2167 k_hash,
2168 });
2169 }
2170 unsafe {
2171 let v = &v.as_ref().v as *const _;
2172 &(*v)
2174 }
2175 })
2176 })
2177 .or_else(|| {
2178 self.cache.get_prehashed(k, k_hash).and_then(|v| {
2179 (*v).to_vref().map(|vin| {
2180 hits = true;
2182
2183 if self.above_watermark {
2184 let _ = self.hit_queue.push(CacheHitEvent {
2185 t: Instant::now(),
2186 k_hash,
2187 });
2188 }
2189
2190 unsafe {
2191 let vin = vin as *const _;
2192 &(*vin)
2193 }
2194 })
2195 })
2196 });
2197
2198 if tlocal_hits {
2199 self.stats.cache_local_hit()
2200 } else if hits {
2201 self.stats.cache_main_hit()
2202 };
2203
2204 r
2205 }
2206
2207 pub fn contains_key<Q>(&mut self, k: &Q) -> bool
2209 where
2210 K: Borrow<Q>,
2211 Q: Hash + Eq + Ord + ?Sized,
2212 {
2213 self.get(k).is_some()
2214 }
2215
2216 pub fn insert_sized(&mut self, k: K, v: V, size: NonZeroUsize) {
2218 let size = size.get();
2219 if self
2222 .inc_queue
2223 .push(CacheIncludeEvent {
2224 t: Instant::now(),
2225 k: k.clone(),
2226 v: v.clone(),
2227 txid: self.cache.get_txid(),
2228 size,
2229 })
2230 .is_ok()
2231 {
2232 self.stats.include();
2233 } else {
2234 self.stats.failed_include();
2235 }
2236
2237 if let Some(ref mut cache) = self.tlocal {
2239 self.stats.local_include();
2240 while cache.tlru.len() >= cache.read_size {
2241 if let Some(owned_inner) = cache.tlru.pop_n_free() {
2242 let existing = cache.set.remove(&owned_inner.k);
2243 assert!(
2245 existing.is_some(),
2246 "Impossible state! Key was NOT present in cache!"
2247 );
2248 } else {
2249 debug_assert!(false);
2251 break;
2252 }
2253 }
2254
2255 let n = cache.tlru.append_k(ReadCacheItem {
2257 k: k.clone(),
2258 v,
2259 size,
2260 });
2261 let r = cache.set.insert(k, n);
2262 assert!(r.is_none());
2264 }
2265 }
2266
2267 pub fn insert(&mut self, k: K, v: V) {
2278 self.insert_sized(k, v, unsafe { NonZeroUsize::new_unchecked(1) })
2279 }
2280
2281 pub fn finish(self) -> S {
2283 let stats = self.stats.clone();
2284 drop(self);
2285
2286 stats
2287 }
2288}
2289
2290impl<
2291 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
2292 V: Clone + Debug + Sync + Send + 'static,
2293 S: ARCacheReadStat + Clone,
2294 > Drop for ARCacheReadTxn<'_, K, V, S>
2295{
2296 fn drop(&mut self) {
2297 if self.reader_quiesce {
2299 self.caller.try_quiesce();
2300 }
2301 }
2302}
2303
2304#[cfg(test)]
2305mod tests {
2306 use super::stats::{TraceStat, WriteCountStat};
2307 use super::ARCache;
2308 use super::ARCacheBuilder;
2309 use super::CStat;
2310 use super::CacheState;
2311 use std::num::NonZeroUsize;
2312 use std::sync::Arc;
2313 use std::thread;
2314
2315 use std::sync::atomic::{AtomicBool, Ordering};
2316
2317 #[test]
2318 fn test_cache_arc_basic() {
2319 let arc: ARCache<usize, usize> = ARCacheBuilder::new()
2320 .set_size(4, 4)
2321 .build()
2322 .expect("Invalid cache parameters!");
2323 let mut wr_txn = arc.write();
2324
2325 assert!(wr_txn.get(&1).is_none());
2326 assert!(wr_txn.peek_hit().is_empty());
2327 wr_txn.insert(1, 1);
2328 assert!(wr_txn.get(&1) == Some(&1));
2329 assert!(wr_txn.peek_hit().len() == 1);
2330
2331 wr_txn.commit();
2332
2333 let mut wr_txn = arc.write();
2335 assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
2336 assert!(wr_txn.get(&1) == Some(&1));
2337 assert!(wr_txn.peek_hit().len() == 1);
2338 wr_txn.commit();
2339 let wr_txn = arc.write();
2341 assert!(wr_txn.peek_cache(&1) == CacheState::Freq);
2342 println!("{:?}", wr_txn.peek_stat());
2343 }
2344
2345 #[test]
2346 fn test_cache_evict() {
2347 let _ = tracing_subscriber::fmt::try_init();
2348 println!("== 1");
2349 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2350 .set_size(4, 4)
2351 .build()
2352 .expect("Invalid cache parameters!");
2353 let stats = TraceStat {};
2354
2355 let mut wr_txn = arc.write_stats(stats);
2356 assert!(
2357 CStat {
2358 max: 4,
2359 cache: 0,
2360 tlocal: 0,
2361 freq: 0,
2362 rec: 0,
2363 ghost_freq: 0,
2364 ghost_rec: 0,
2365 haunted: 0,
2366 p: 0
2367 } == wr_txn.peek_stat()
2368 );
2369
2370 wr_txn.insert(1, 1);
2372 wr_txn.insert(2, 2);
2373 wr_txn.insert(3, 3);
2374 wr_txn.insert(4, 4);
2375
2376 assert!(
2377 CStat {
2378 max: 4,
2379 cache: 0,
2380 tlocal: 4,
2381 freq: 0,
2382 rec: 0,
2383 ghost_freq: 0,
2384 ghost_rec: 0,
2385 haunted: 0,
2386 p: 0
2387 } == wr_txn.peek_stat()
2388 );
2389 let stats = wr_txn.commit();
2390
2391 println!("== 2");
2393 let mut wr_txn = arc.write_stats(stats);
2394 assert!(
2395 CStat {
2396 max: 4,
2397 cache: 4,
2398 tlocal: 0,
2399 freq: 0,
2400 rec: 4,
2401 ghost_freq: 0,
2402 ghost_rec: 0,
2403 haunted: 0,
2404 p: 0
2405 } == wr_txn.peek_stat()
2406 );
2407
2408 assert!(wr_txn.get(&1) == Some(&1));
2411 assert!(wr_txn.get(&1) == Some(&1));
2412 assert!(wr_txn.get(&2) == Some(&2));
2413
2414 let stats = wr_txn.commit();
2415
2416 println!("== 3");
2418 let mut wr_txn = arc.write_stats(stats);
2419 assert!(
2420 CStat {
2421 max: 4,
2422 cache: 4,
2423 tlocal: 0,
2424 freq: 2,
2425 rec: 2,
2426 ghost_freq: 0,
2427 ghost_rec: 0,
2428 haunted: 0,
2429 p: 0
2430 } == wr_txn.peek_stat()
2431 );
2432 wr_txn.insert(5, 5);
2434 let stats = wr_txn.commit();
2435
2436 println!("== 4");
2438 let mut wr_txn = arc.write_stats(stats);
2439 println!("stat -> {:?}", wr_txn.peek_stat());
2440 assert!(
2441 CStat {
2442 max: 4,
2443 cache: 5,
2444 tlocal: 0,
2445 freq: 2,
2446 rec: 2,
2447 ghost_freq: 0,
2448 ghost_rec: 1,
2449 haunted: 0,
2450 p: 0
2451 } == wr_txn.peek_stat()
2452 );
2453 let rec_set: Vec<usize> = wr_txn.iter_rec().take(2).copied().collect();
2460 assert!(wr_txn.get(&rec_set[0]) == Some(&rec_set[0]));
2461 assert!(wr_txn.get(&rec_set[1]) == Some(&rec_set[1]));
2462
2463 let stats = wr_txn.commit();
2464
2465 println!("== 5");
2467 let mut wr_txn = arc.write_stats(stats);
2468 println!("stat -> {:?}", wr_txn.peek_stat());
2469 assert!(
2470 CStat {
2471 max: 4,
2472 cache: 5,
2473 tlocal: 0,
2474 freq: 4,
2475 rec: 0,
2476 ghost_freq: 0,
2477 ghost_rec: 1,
2478 haunted: 0,
2479 p: 0
2480 } == wr_txn.peek_stat()
2481 );
2482 let grec: usize = wr_txn.iter_ghost_rec().take(1).copied().next().unwrap();
2489 wr_txn.insert(grec, grec);
2490 assert!(wr_txn.get(&grec) == Some(&grec));
2491 let stats = wr_txn.commit();
2494
2495 println!("== 6");
2497 let mut wr_txn = arc.write_stats(stats);
2498 println!("stat -> {:?}", wr_txn.peek_stat());
2499 assert!(
2500 CStat {
2501 max: 4,
2502 cache: 5,
2503 tlocal: 0,
2504 freq: 3,
2505 rec: 1,
2506 ghost_freq: 1,
2507 ghost_rec: 0,
2508 haunted: 0,
2509 p: 1
2510 } == wr_txn.peek_stat()
2511 );
2512 assert!(wr_txn.peek_cache(&grec) == CacheState::Rec);
2516
2517 wr_txn.insert(10, 10);
2521 wr_txn.insert(11, 11);
2522 wr_txn.insert(12, 12);
2523 let stats = wr_txn.commit();
2524
2525 println!("== 7");
2526 let mut wr_txn = arc.write_stats(stats);
2527 println!("stat -> {:?}", wr_txn.peek_stat());
2528 assert!(
2529 CStat {
2530 max: 4,
2531 cache: 8,
2532 tlocal: 0,
2533 freq: 3,
2534 rec: 1,
2535 ghost_freq: 1,
2536 ghost_rec: 3,
2537 haunted: 0,
2538 p: 1
2539 } == wr_txn.peek_stat()
2540 );
2541 let grec_set: Vec<usize> = wr_txn.iter_ghost_rec().take(3).copied().collect();
2547 println!("{:?}", grec_set);
2548
2549 grec_set
2550 .iter()
2551 .for_each(|i| println!("{:?}", wr_txn.peek_cache(i)));
2552
2553 grec_set.iter().for_each(|i| wr_txn.insert(*i, *i));
2554
2555 grec_set
2556 .iter()
2557 .for_each(|i| println!("{:?}", wr_txn.peek_cache(i)));
2558 wr_txn.commit();
2559
2560 println!("== 8");
2561 let mut wr_txn = arc.write();
2562 println!("stat -> {:?}", wr_txn.peek_stat());
2563 assert!(
2564 CStat {
2565 max: 4,
2566 cache: 8,
2567 tlocal: 0,
2568 freq: 0,
2569 rec: 4,
2570 ghost_freq: 4,
2571 ghost_rec: 0,
2572 haunted: 0,
2573 p: 4
2574 } == wr_txn.peek_stat()
2575 );
2576
2577 grec_set
2578 .iter()
2579 .for_each(|i| println!("{:?}", wr_txn.peek_cache(i)));
2580 grec_set
2581 .iter()
2582 .for_each(|i| assert!(wr_txn.peek_cache(i) == CacheState::Rec));
2583
2584 let gfreq_set: Vec<usize> = wr_txn.iter_ghost_freq().take(4).copied().collect();
2586
2587 gfreq_set.iter().for_each(|i| wr_txn.insert(*i, *i));
2588 wr_txn.commit();
2589
2590 println!("== 9");
2591 let wr_txn = arc.write();
2592 println!("stat -> {:?}", wr_txn.peek_stat());
2593 assert!(
2594 CStat {
2595 max: 4,
2596 cache: 8,
2597 tlocal: 0,
2598 freq: 4,
2599 rec: 0,
2600 ghost_freq: 0,
2601 ghost_rec: 4,
2602 haunted: 0,
2603 p: 0
2604 } == wr_txn.peek_stat()
2605 );
2606 gfreq_set
2609 .iter()
2610 .for_each(|i| assert!(wr_txn.peek_cache(i) == CacheState::Freq));
2611
2612 let () = wr_txn.commit();
2614 }
2618
2619 #[test]
2620 fn test_cache_concurrent_basic() {
2621 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2624 .set_size(4, 4)
2625 .build()
2626 .expect("Invalid cache parameters!");
2627 {
2629 let mut rd_txn = arc.read();
2630 rd_txn.insert(1, 1);
2632 rd_txn.insert(2, 2);
2633 rd_txn.insert(3, 3);
2634 rd_txn.insert(4, 4);
2635 }
2642 arc.try_quiesce();
2643 println!("== 2");
2645 let wr_txn = arc.write();
2646 println!("{:?}", wr_txn.peek_stat());
2647 assert!(
2648 CStat {
2649 max: 4,
2650 cache: 4,
2651 tlocal: 0,
2652 freq: 0,
2653 rec: 4,
2654 ghost_freq: 0,
2655 ghost_rec: 0,
2656 haunted: 0,
2657 p: 0
2658 } == wr_txn.peek_stat()
2659 );
2660 assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
2661 assert!(wr_txn.peek_cache(&2) == CacheState::Rec);
2662 assert!(wr_txn.peek_cache(&3) == CacheState::Rec);
2663 assert!(wr_txn.peek_cache(&4) == CacheState::Rec);
2664 {
2668 let mut rd_txn = arc.read();
2669 assert!(rd_txn.get(&3) == Some(&3));
2671 assert!(rd_txn.get(&4) == Some(&4));
2672 rd_txn.insert(5, 5);
2673 rd_txn.insert(6, 6);
2674 }
2676 wr_txn.commit();
2678 println!("== 3");
2679 let wr_txn = arc.write();
2680 assert!(
2681 CStat {
2682 max: 4,
2683 cache: 6,
2684 tlocal: 0,
2685 freq: 2,
2686 rec: 2,
2687 ghost_freq: 0,
2688 ghost_rec: 2,
2689 haunted: 0,
2690 p: 0
2691 } == wr_txn.peek_stat()
2692 );
2693 assert!(wr_txn.peek_cache(&1) == CacheState::GhostRec);
2694 assert!(wr_txn.peek_cache(&2) == CacheState::GhostRec);
2695 assert!(wr_txn.peek_cache(&3) == CacheState::Freq);
2696 assert!(wr_txn.peek_cache(&4) == CacheState::Freq);
2697 assert!(wr_txn.peek_cache(&5) == CacheState::Rec);
2698 assert!(wr_txn.peek_cache(&6) == CacheState::Rec);
2699
2700 {
2702 let mut rd_txn = arc.read();
2703 rd_txn.insert(1, 1);
2705 rd_txn.insert(2, 2);
2706 }
2708
2709 wr_txn.commit();
2710 println!("== 4");
2711 let wr_txn = arc.write();
2712 assert!(
2713 CStat {
2714 max: 4,
2715 cache: 6,
2716 tlocal: 0,
2717 freq: 2,
2718 rec: 2,
2719 ghost_freq: 0,
2720 ghost_rec: 2,
2721 haunted: 0,
2722 p: 2
2723 } == wr_txn.peek_stat()
2724 );
2725 assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
2726 assert!(wr_txn.peek_cache(&2) == CacheState::Rec);
2727 assert!(wr_txn.peek_cache(&3) == CacheState::Freq);
2728 assert!(wr_txn.peek_cache(&4) == CacheState::Freq);
2729 assert!(wr_txn.peek_cache(&5) == CacheState::GhostRec);
2730 assert!(wr_txn.peek_cache(&6) == CacheState::GhostRec);
2731 }
2739
2740 #[test]
2743 fn test_cache_concurrent_cursed_1() {
2744 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2751 .set_size(4, 4)
2752 .build()
2753 .expect("Invalid cache parameters!");
2754
2755 let mut wr_txn = arc.write();
2757 let mut rd_txn = arc.read();
2759 wr_txn.insert(1, 1);
2761
2762 assert!(rd_txn.get(&1).is_none());
2764
2765 wr_txn.commit();
2767 assert!(rd_txn.get(&1).is_none());
2769 let mut wr_txn = arc.write();
2771 wr_txn.insert(10, 1);
2773 wr_txn.insert(11, 1);
2774 wr_txn.insert(12, 1);
2775 wr_txn.insert(13, 1);
2776 wr_txn.insert(14, 1);
2777 wr_txn.insert(15, 1);
2778 wr_txn.insert(16, 1);
2779 wr_txn.insert(17, 1);
2780 wr_txn.commit();
2782
2783 let wr_txn = arc.write();
2785 assert!(wr_txn.peek_cache(&1) == CacheState::Haunted);
2787 assert!(rd_txn.get(&1).is_none());
2789 rd_txn.insert(1, 100);
2791 wr_txn.commit();
2793
2794 let wr_txn = arc.write();
2796 assert!(wr_txn.peek_cache(&1) == CacheState::Haunted);
2798 assert!(rd_txn.get(&1) == Some(&100));
2800 }
2802
2803 #[test]
2804 fn test_cache_clear() {
2805 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2806 .set_size(4, 4)
2807 .build()
2808 .expect("Invalid cache parameters!");
2809
2810 let mut wr_txn = arc.write();
2812 wr_txn.insert(10, 10);
2814 wr_txn.insert(11, 11);
2815 wr_txn.insert(12, 12);
2816 wr_txn.insert(13, 13);
2817 wr_txn.insert(14, 14);
2818 wr_txn.insert(15, 15);
2819 wr_txn.insert(16, 16);
2820 wr_txn.insert(17, 17);
2821 wr_txn.commit();
2822 let mut wr_txn = arc.write();
2824
2825 let rec_set: Vec<usize> = wr_txn.iter_rec().take(2).copied().collect();
2827 println!("{:?}", rec_set);
2828 assert!(wr_txn.get(&rec_set[0]) == Some(&rec_set[0]));
2829 assert!(wr_txn.get(&rec_set[1]) == Some(&rec_set[1]));
2830
2831 wr_txn.commit();
2833 let mut wr_txn = arc.write();
2835 println!("stat -> {:?}", wr_txn.peek_stat());
2836 assert!(
2837 CStat {
2838 max: 4,
2839 cache: 8,
2840 tlocal: 0,
2841 freq: 2,
2842 rec: 2,
2843 ghost_freq: 0,
2844 ghost_rec: 4,
2845 haunted: 0,
2846 p: 0
2847 } == wr_txn.peek_stat()
2848 );
2849
2850 wr_txn.clear();
2852 wr_txn.commit();
2854 let wr_txn = arc.write();
2856 println!("stat -> {:?}", wr_txn.peek_stat());
2858 assert!(
2860 CStat {
2861 max: 4,
2862 cache: 8,
2863 tlocal: 0,
2864 freq: 0,
2865 rec: 0,
2866 ghost_freq: 2,
2867 ghost_rec: 6,
2868 haunted: 0,
2869 p: 0
2870 } == wr_txn.peek_stat()
2871 );
2872 }
2875
2876 #[test]
2877 fn test_cache_clear_rollback() {
2878 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2879 .set_size(4, 4)
2880 .build()
2881 .expect("Invalid cache parameters!");
2882
2883 let mut wr_txn = arc.write();
2885 wr_txn.insert(10, 10);
2887 wr_txn.insert(11, 11);
2888 wr_txn.insert(12, 12);
2889 wr_txn.insert(13, 13);
2890 wr_txn.insert(14, 14);
2891 wr_txn.insert(15, 15);
2892 wr_txn.insert(16, 16);
2893 wr_txn.insert(17, 17);
2894 wr_txn.commit();
2895 let mut wr_txn = arc.write();
2897 let rec_set: Vec<usize> = wr_txn.iter_rec().take(2).copied().collect();
2898 println!("{:?}", rec_set);
2899 let r = wr_txn.get(&rec_set[0]);
2900 println!("{:?}", r);
2901 assert!(r == Some(&rec_set[0]));
2902 assert!(wr_txn.get(&rec_set[1]) == Some(&rec_set[1]));
2903
2904 wr_txn.commit();
2906 let mut wr_txn = arc.write();
2908 println!("stat -> {:?}", wr_txn.peek_stat());
2909 assert!(
2910 CStat {
2911 max: 4,
2912 cache: 8,
2913 tlocal: 0,
2914 freq: 2,
2915 rec: 2,
2916 ghost_freq: 0,
2917 ghost_rec: 4,
2918 haunted: 0,
2919 p: 0
2920 } == wr_txn.peek_stat()
2921 );
2922
2923 wr_txn.clear();
2925 drop(wr_txn);
2927 let wr_txn = arc.write();
2929 println!("stat -> {:?}", wr_txn.peek_stat());
2930 assert!(
2931 CStat {
2932 max: 4,
2933 cache: 8,
2934 tlocal: 0,
2935 freq: 2,
2936 rec: 2,
2937 ghost_freq: 0,
2938 ghost_rec: 4,
2939 haunted: 0,
2940 p: 0
2941 } == wr_txn.peek_stat()
2942 );
2943 }
2944
2945 #[test]
2946 fn test_cache_clear_cursed() {
2947 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2948 .set_size(4, 4)
2949 .build()
2950 .expect("Invalid cache parameters!");
2951 let mut wr_txn = arc.write();
2954 wr_txn.insert(10, 1);
2955 wr_txn.commit();
2956 let wr_txn = arc.write();
2958 assert!(wr_txn.peek_cache(&10) == CacheState::Rec);
2959 wr_txn.commit();
2960 let mut rd_txn = arc.read();
2963 let mut wr_txn = arc.write();
2966 wr_txn.clear();
2967 wr_txn.commit();
2969
2970 assert!(rd_txn.get(&10) == Some(&1));
2973 rd_txn.insert(11, 1);
2974 std::mem::drop(rd_txn);
2976 arc.try_quiesce();
2978 let wr_txn = arc.write();
2983 assert!(wr_txn.peek_cache(&10) == CacheState::GhostRec);
2984 println!("--> {:?}", wr_txn.peek_cache(&11));
2985 assert!(wr_txn.peek_cache(&11) == CacheState::None);
2986 }
2987
2988 #[test]
2989 fn test_cache_p_weight_zero_churn() {
2990 let arc: ARCache<usize, usize> = ARCacheBuilder::new()
2991 .set_size(4, 4)
2992 .set_watermark(0)
2993 .build()
2994 .expect("Invalid cache parameters!");
2995
2996 let mut wr_txn = arc.write();
2997
2998 wr_txn.insert(1, 1);
3000 wr_txn.insert(2, 2);
3001 wr_txn.insert(3, 3);
3002 wr_txn.insert(4, 4);
3003 assert_eq!(wr_txn.get(&1), Some(&1));
3004 assert_eq!(wr_txn.get(&2), Some(&2));
3005 assert_eq!(wr_txn.get(&3), Some(&3));
3006 assert_eq!(wr_txn.get(&4), Some(&4));
3007
3008 assert_eq!(wr_txn.peek_stat().p, 0);
3009 wr_txn.commit();
3010
3011 let mut wr_txn = arc.write();
3013 assert_eq!(wr_txn.get(&1), Some(&1));
3014 assert_eq!(wr_txn.get(&2), Some(&2));
3015 assert_eq!(wr_txn.get(&3), Some(&3));
3016 assert_eq!(wr_txn.get(&4), Some(&4));
3017
3018 assert_eq!(wr_txn.peek_stat().p, 0);
3019 wr_txn.commit();
3020
3021 let mut wr_txn = arc.write();
3023 wr_txn.insert(100, 100);
3025 println!("b {:?}", wr_txn.peek_stat());
3026 assert_eq!(wr_txn.peek_stat().p, 0);
3027 wr_txn.commit();
3028
3029 let mut wr_txn = arc.write();
3031 assert_eq!(wr_txn.peek_stat().p, 0);
3032 assert_eq!(wr_txn.peek_stat().ghost_rec, 1);
3033
3034 wr_txn.insert(100, 100);
3036 wr_txn.commit();
3037
3038 let wr_txn = arc.write();
3039 println!("c {:?}", wr_txn.peek_stat());
3040 assert_eq!(wr_txn.peek_stat().p, 1);
3041 assert_eq!(wr_txn.peek_stat().ghost_rec, 0);
3042 assert_eq!(wr_txn.peek_stat().ghost_freq, 1);
3043 wr_txn.commit();
3044
3045 let mut wr_txn = arc.write();
3047 assert_eq!(wr_txn.get(&1), None);
3048 wr_txn.insert(1, 1);
3050 wr_txn.commit();
3051
3052 let wr_txn = arc.write();
3053 println!("d {:?}", wr_txn.peek_stat());
3054 assert_eq!(wr_txn.peek_stat().p, 0);
3056 assert_eq!(wr_txn.peek_stat().ghost_rec, 1);
3057 assert_eq!(wr_txn.peek_stat().ghost_freq, 0);
3058 wr_txn.commit();
3059 }
3060
3061 #[test]
3062 fn test_cache_haunted_bounds() {
3063 let arc: ARCache<usize, usize> = ARCacheBuilder::new()
3064 .set_size(4, 4)
3065 .set_watermark(0)
3066 .set_look_back_limit(4)
3067 .build()
3068 .expect("Invalid cache parameters!");
3069
3070 let mut wr_txn = arc.write();
3071
3072 wr_txn.remove(1);
3076 wr_txn.commit();
3077
3078 for _i in 0..5 {
3080 let wr_txn = arc.write();
3081 println!("l {:?}", wr_txn.peek_stat());
3082 assert_eq!(wr_txn.peek_stat().haunted, 1);
3083 assert!(wr_txn.peek_cache(&1) == CacheState::Haunted);
3084 wr_txn.commit();
3085 }
3086
3087 let wr_txn = arc.write();
3089 println!("d {:?}", wr_txn.peek_stat());
3090 assert_eq!(wr_txn.peek_stat().haunted, 0);
3091 assert_eq!(wr_txn.peek_cache(&1), CacheState::None);
3092 wr_txn.commit();
3093 }
3094
3095 #[test]
3096 fn test_cache_dirty_write() {
3097 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
3098 .set_size(4, 4)
3099 .build()
3100 .expect("Invalid cache parameters!");
3101 let mut wr_txn = arc.write();
3102 wr_txn.insert_dirty(10, 1);
3103 wr_txn.iter_mut_mark_clean().for_each(|(_k, _v)| {});
3104 wr_txn.commit();
3105 }
3106
3107 #[test]
3108 fn test_cache_read_no_tlocal() {
3109 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
3112 .set_size(4, 0)
3113 .build()
3114 .expect("Invalid cache parameters!");
3115 {
3117 let mut rd_txn = arc.read();
3118 rd_txn.insert(1, 1);
3120 rd_txn.insert(2, 2);
3121 rd_txn.insert(3, 3);
3122 rd_txn.insert(4, 4);
3123 assert!(rd_txn.get(&1).is_none());
3126 assert!(rd_txn.get(&2).is_none());
3127 assert!(rd_txn.get(&3).is_none());
3128 assert!(rd_txn.get(&4).is_none());
3129 }
3130 arc.try_quiesce();
3131 println!("== 2");
3133 let wr_txn = arc.write();
3134 assert!(
3135 CStat {
3136 max: 4,
3137 cache: 4,
3138 tlocal: 0,
3139 freq: 0,
3140 rec: 4,
3141 ghost_freq: 0,
3142 ghost_rec: 0,
3143 haunted: 0,
3144 p: 0
3145 } == wr_txn.peek_stat()
3146 );
3147 assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
3148 assert!(wr_txn.peek_cache(&2) == CacheState::Rec);
3149 assert!(wr_txn.peek_cache(&3) == CacheState::Rec);
3150 assert!(wr_txn.peek_cache(&4) == CacheState::Rec);
3151 }
3157
3158 #[derive(Clone, Debug)]
3159 struct Weighted {
3160 _i: u64,
3161 }
3162
3163 #[test]
3164 fn test_cache_weighted() {
3165 let arc: ARCache<usize, Weighted> = ARCacheBuilder::default()
3166 .set_size(4, 0)
3167 .build()
3168 .expect("Invalid cache parameters!");
3169 let mut wr_txn = arc.write();
3170
3171 assert!(
3172 CStat {
3173 max: 4,
3174 cache: 0,
3175 tlocal: 0,
3176 freq: 0,
3177 rec: 0,
3178 ghost_freq: 0,
3179 ghost_rec: 0,
3180 haunted: 0,
3181 p: 0
3182 } == wr_txn.peek_stat()
3183 );
3184
3185 wr_txn.insert_sized(1, Weighted { _i: 1 }, NonZeroUsize::new(2).unwrap());
3187 wr_txn.insert_sized(2, Weighted { _i: 2 }, NonZeroUsize::new(2).unwrap());
3188
3189 assert!(
3190 CStat {
3191 max: 4,
3192 cache: 0,
3193 tlocal: 2,
3194 freq: 0,
3195 rec: 0,
3196 ghost_freq: 0,
3197 ghost_rec: 0,
3198 haunted: 0,
3199 p: 0
3200 } == wr_txn.peek_stat()
3201 );
3202 wr_txn.commit();
3203
3204 let wr_txn = arc.write();
3207 assert!(
3208 CStat {
3209 max: 4,
3210 cache: 2,
3211 tlocal: 0,
3212 freq: 0,
3213 rec: 4,
3214 ghost_freq: 0,
3215 ghost_rec: 0,
3216 haunted: 0,
3217 p: 0
3218 } == wr_txn.peek_stat()
3219 );
3220 wr_txn.commit();
3221
3222 let mut wr_txn = arc.write();
3224 wr_txn.get(&1);
3225 wr_txn.commit();
3226
3227 let mut wr_txn = arc.write();
3228 assert!(
3229 CStat {
3230 max: 4,
3231 cache: 2,
3232 tlocal: 0,
3233 freq: 2,
3234 rec: 2,
3235 ghost_freq: 0,
3236 ghost_rec: 0,
3237 haunted: 0,
3238 p: 0
3239 } == wr_txn.peek_stat()
3240 );
3241
3242 wr_txn.insert_sized(3, Weighted { _i: 3 }, NonZeroUsize::new(2).unwrap());
3243 wr_txn.insert_sized(4, Weighted { _i: 4 }, NonZeroUsize::new(2).unwrap());
3244 wr_txn.commit();
3245
3246 let wr_txn = arc.write();
3248 assert!(
3249 CStat {
3250 max: 4,
3251 cache: 4,
3252 tlocal: 0,
3253 freq: 2,
3254 rec: 2,
3255 ghost_freq: 0,
3256 ghost_rec: 4,
3257 haunted: 0,
3258 p: 0
3259 } == wr_txn.peek_stat()
3260 );
3261 wr_txn.commit();
3262 }
3263
3264 #[test]
3265 fn test_cache_stats_reload() {
3266 let _ = tracing_subscriber::fmt::try_init();
3267
3268 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
3270 .set_size(4, 0)
3271 .build()
3272 .expect("Invalid cache parameters!");
3273
3274 let stats = WriteCountStat::default();
3275
3276 let mut wr_txn = arc.write_stats(stats);
3277 wr_txn.insert(1, 1);
3278 let stats = wr_txn.commit();
3279
3280 tracing::trace!("stats 1: {:?}", stats);
3281 }
3282
3283 #[test]
3284 fn test_cache_mut_inplace() {
3285 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
3287 .set_size(4, 0)
3288 .build()
3289 .expect("Invalid cache parameters!");
3290 let mut wr_txn = arc.write();
3291
3292 assert!(wr_txn.get_mut(&1, false).is_none());
3293 wr_txn.insert(1, 1);
3295 {
3296 let mref = wr_txn.get_mut(&1, false).unwrap();
3297 *mref = 2;
3298 }
3299 assert!(wr_txn.get_mut(&1, false) == Some(&mut 2));
3300 wr_txn.commit();
3301
3302 let mut wr_txn = arc.write();
3304 {
3305 let mref = wr_txn.get_mut(&1, false).unwrap();
3306 *mref = 3;
3307 }
3308 assert!(wr_txn.get_mut(&1, false) == Some(&mut 3));
3309 wr_txn.commit();
3310
3311 let mut wr_txn = arc.write();
3313 wr_txn.remove(1);
3314 assert!(wr_txn.get_mut(&1, false).is_none());
3315 wr_txn.commit();
3316 }
3317
3318 #[allow(dead_code)]
3319 pub static RUNNING: AtomicBool = AtomicBool::new(false);
3320
3321 #[allow(dead_code)]
3322 pub static READ_OPERATIONS: u32 = 1024;
3323 #[allow(dead_code)]
3324 pub static WRITE_OPERATIONS: u32 = 10240;
3325
3326 #[allow(dead_code)]
3327 pub static CACHE_SIZE: u32 = 64;
3328 pub static VALUE_MAX_RANGE: u32 = CACHE_SIZE * 8;
3329
3330 #[cfg(test)]
3331 fn multi_thread_worker(arc: Arc<ARCache<Box<u32>, Box<u32>>>) {
3332 while RUNNING.load(Ordering::Relaxed) {
3333 let mut rd_txn = arc.read();
3334
3335 use rand::Rng;
3336 let mut rng = rand::rng();
3337
3338 for _i in 0..VALUE_MAX_RANGE {
3339 let x = rng.random_range(0..VALUE_MAX_RANGE);
3340
3341 if rd_txn.get(&x).is_none() {
3342 rd_txn.insert(Box::new(x), Box::new(x))
3343 }
3344 }
3345 }
3346 }
3347
3348 #[allow(dead_code)]
3349 #[cfg_attr(miri, ignore)]
3350 #[cfg_attr(feature = "dhat-heap", test)]
3351 fn test_cache_stress_1() {
3352 #[cfg(feature = "dhat-heap")]
3353 let _profiler = dhat::Profiler::builder().trim_backtraces(None).build();
3354
3355 use rand::Rng;
3356 let mut rng = rand::rng();
3357
3358 let arc: Arc<ARCache<Box<u32>, Box<u32>>> = Arc::new(
3359 ARCacheBuilder::default()
3360 .set_size(CACHE_SIZE as usize, 4)
3361 .build()
3362 .expect("Invalid cache parameters!"),
3363 );
3364
3365 for _i in 0..WRITE_OPERATIONS {
3367 let mut wr_txn = arc.write();
3368
3369 let x = rng.random_range(0..VALUE_MAX_RANGE);
3370
3371 if wr_txn.get(&x).is_none() {
3372 wr_txn.insert(Box::new(x), Box::new(x))
3373 }
3374
3375 wr_txn.commit();
3377 }
3378
3379 eprintln!("writes pass");
3380
3381 let thread_count = 8;
3382
3383 RUNNING.store(true, Ordering::Relaxed);
3384
3385 let handles: Vec<_> = (0..thread_count)
3387 .map(|_| {
3388 let cache = arc.clone();
3390 thread::spawn(move || multi_thread_worker(cache))
3391 })
3392 .collect();
3393
3394 std::thread::sleep(std::time::Duration::from_secs(5));
3395
3396 for _i in 0..WRITE_OPERATIONS {
3399 let mut wr_txn = arc.write();
3400
3401 let x = rng.random_range(0..VALUE_MAX_RANGE);
3402
3403 if wr_txn.get(&x).is_none() {
3404 wr_txn.insert(Box::new(x), Box::new(x))
3405 }
3406
3407 wr_txn.commit();
3408 }
3409
3410 RUNNING.store(false, Ordering::Relaxed);
3411
3412 for handle in handles {
3413 if let Err(err) = handle.join() {
3414 std::panic::resume_unwind(err)
3415 }
3416 }
3417 }
3418
3419 #[test]
3420 fn test_set_expected_workload_negative() {
3421 let _arc: Arc<ARCache<Box<u32>, Box<u32>>> = Arc::new(
3422 ARCacheBuilder::default()
3423 .set_expected_workload(256, 31, 8, 16, true)
3424 .build()
3425 .expect("Invalid cache parameters!"),
3426 );
3427 }
3428}