1mod ll;
14pub mod stats;
16
17use self::ll::{LLNodeRef, LLWeight, LL};
18use self::stats::{ARCacheReadStat, ARCacheWriteStat};
19
20#[cfg(feature = "arcache-is-hashmap")]
21use crate::hashmap::{
22 HashMap as DataMap, HashMapReadTxn as DataMapReadTxn, HashMapWriteTxn as DataMapWriteTxn,
23};
24
25#[cfg(feature = "arcache-is-hashtrie")]
26use crate::hashtrie::{
27 HashTrie as DataMap, HashTrieReadTxn as DataMapReadTxn, HashTrieWriteTxn as DataMapWriteTxn,
28};
29
30use crossbeam_queue::ArrayQueue;
31use std::collections::HashMap as Map;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::Arc;
34use std::sync::{Mutex, RwLock};
35
36use std::borrow::Borrow;
37use std::cell::UnsafeCell;
38use std::fmt::Debug;
39use std::hash::Hash;
40use std::mem;
41use std::num::NonZeroUsize;
42use std::ops::Deref;
43use std::ops::DerefMut;
44use std::time::Instant;
45
46use tracing::trace;
47
48const READ_THREAD_CACHE_RATIO: isize = 8;
49const WRITE_THREAD_CACHE_RATIO: isize = 4;
50
51const WRITE_THREAD_CHANNEL_SIZE: usize = 64;
52const READ_THREAD_CHANNEL_SIZE: usize = 64;
53
54const TXN_LOOKBACK_LIMIT_DEFAULT: u8 = 32;
55const TXN_LOOKBACK_LIMIT_ABS_MIN: u8 = 4;
56
57const WATERMARK_DISABLE_MIN: usize = 128;
58
59const WATERMARK_DISABLE_DIVISOR: usize = 20;
60const WATERMARK_DISABLE_RATIO: usize = 18;
61
62enum ThreadCacheItem<V> {
63 Present(V, bool, usize),
64 Removed(bool),
65}
66
67struct CacheHitEvent {
68 t: Instant,
69 k_hash: u64,
70}
71
72struct CacheIncludeEvent<K, V> {
73 t: Instant,
74 k: K,
75 v: V,
76 txid: u64,
77 size: usize,
78}
79
80#[derive(Hash, Ord, PartialOrd, Eq, PartialEq, Clone, Debug)]
81struct CacheItemInner<K>
82where
83 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
84{
85 k: K,
86 txid: u64,
87 size: usize,
88}
89
90impl<K> LLWeight for CacheItemInner<K>
91where
92 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
93{
94 #[inline]
95 fn ll_weight(&self) -> usize {
96 self.size
97 }
98}
99
100#[derive(Clone, Debug)]
101enum CacheItem<K, V>
102where
103 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
104{
105 Freq(LLNodeRef<CacheItemInner<K>>, V),
106 Rec(LLNodeRef<CacheItemInner<K>>, V),
107 GhostFreq(LLNodeRef<CacheItemInner<K>>),
108 GhostRec(LLNodeRef<CacheItemInner<K>>),
109 Haunted(LLNodeRef<CacheItemInner<K>>),
110}
111
112unsafe impl<
113 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
114 V: Clone + Debug + Sync + Send + 'static,
115 > Send for CacheItem<K, V>
116{
117}
118unsafe impl<
119 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
120 V: Clone + Debug + Sync + Send + 'static,
121 > Sync for CacheItem<K, V>
122{
123}
124
125#[cfg(test)]
126#[derive(Clone, Debug, PartialEq)]
127pub(crate) enum CacheState {
128 Freq,
129 Rec,
130 GhostFreq,
131 GhostRec,
132 Haunted,
133 None,
134}
135
136#[cfg(test)]
137#[derive(Debug, PartialEq)]
138pub(crate) struct CStat {
139 max: usize,
140 cache: usize,
141 tlocal: usize,
142 freq: usize,
143 rec: usize,
144 ghost_freq: usize,
145 ghost_rec: usize,
146 haunted: usize,
147 p: usize,
148}
149
150struct ArcInner<K, V>
151where
152 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
153 V: Clone + Debug + Sync + Send + 'static,
154{
155 p: usize,
157 freq: LL<CacheItemInner<K>>,
158 rec: LL<CacheItemInner<K>>,
159 ghost_freq: LL<CacheItemInner<K>>,
160 ghost_rec: LL<CacheItemInner<K>>,
161 haunted: LL<CacheItemInner<K>>,
162 hit_queue: Arc<ArrayQueue<CacheHitEvent>>,
163 inc_queue: Arc<ArrayQueue<CacheIncludeEvent<K, V>>>,
164 min_txid: u64,
165}
166
167struct ArcShared<K, V>
168where
169 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
170 V: Clone + Debug + Sync + Send + 'static,
171{
172 max: usize,
174 read_max: usize,
176 hit_queue: Arc<ArrayQueue<CacheHitEvent>>,
177 inc_queue: Arc<ArrayQueue<CacheIncludeEvent<K, V>>>,
178 watermark: usize,
181 reader_quiesce: bool,
183}
184
185pub struct ARCache<K, V>
188where
189 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
190 V: Clone + Debug + Sync + Send + 'static,
191{
192 cache: DataMap<K, CacheItem<K, V>>,
195 shared: RwLock<ArcShared<K, V>>,
198 inner: Mutex<ArcInner<K, V>>,
200 above_watermark: AtomicBool,
202 look_back_limit: u64,
203}
204
205unsafe impl<
206 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
207 V: Clone + Debug + Sync + Send + 'static,
208 > Send for ARCache<K, V>
209{
210}
211unsafe impl<
212 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
213 V: Clone + Debug + Sync + Send + 'static,
214 > Sync for ARCache<K, V>
215{
216}
217
218#[derive(Debug, Clone)]
219struct ReadCacheItem<K, V>
220where
221 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
222 V: Clone + Debug + Sync + Send + 'static,
223{
224 k: K,
225 v: V,
226 size: usize,
227}
228
229impl<K, V> LLWeight for ReadCacheItem<K, V>
230where
231 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
232 V: Clone + Debug + Sync + Send + 'static,
233{
234 #[inline]
235 fn ll_weight(&self) -> usize {
236 self.size
237 }
238}
239
240struct ReadCache<K, V>
241where
242 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
243 V: Clone + Debug + Sync + Send + 'static,
244{
245 set: Map<K, LLNodeRef<ReadCacheItem<K, V>>>,
248 read_size: usize,
249 tlru: LL<ReadCacheItem<K, V>>,
250}
251
252pub struct ARCacheReadTxn<'a, K, V, S>
256where
257 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
258 V: Clone + Debug + Sync + Send + 'static,
259 S: ARCacheReadStat + Clone,
260{
261 caller: &'a ARCache<K, V>,
262 cache: DataMapReadTxn<'a, K, CacheItem<K, V>>,
264 tlocal: Option<ReadCache<K, V>>,
265 hit_queue: Arc<ArrayQueue<CacheHitEvent>>,
266 inc_queue: Arc<ArrayQueue<CacheIncludeEvent<K, V>>>,
267 above_watermark: bool,
268 reader_quiesce: bool,
269 stats: S,
270}
271
272unsafe impl<
273 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
274 V: Clone + Debug + Sync + Send + 'static,
275 S: ARCacheReadStat + Clone + Sync + Send + 'static,
276 > Send for ARCacheReadTxn<'_, K, V, S>
277{
278}
279unsafe impl<
280 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
281 V: Clone + Debug + Sync + Send + 'static,
282 S: ARCacheReadStat + Clone + Sync + Send + 'static,
283 > Sync for ARCacheReadTxn<'_, K, V, S>
284{
285}
286
287pub struct ARCacheWriteTxn<'a, K, V, S>
292where
293 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
294 V: Clone + Debug + Sync + Send + 'static,
295 S: ARCacheWriteStat<K>,
296{
297 caller: &'a ARCache<K, V>,
298 cache: DataMapWriteTxn<'a, K, CacheItem<K, V>>,
300 tlocal: Map<K, ThreadCacheItem<V>>,
303 hit: UnsafeCell<Vec<u64>>,
304 clear: UnsafeCell<bool>,
305 above_watermark: bool,
306 stats: S,
308}
309
310impl<
311 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
312 V: Clone + Debug + Sync + Send + 'static,
313 > CacheItem<K, V>
314{
315 fn to_vref(&self) -> Option<&V> {
316 match &self {
317 CacheItem::Freq(_, v) | CacheItem::Rec(_, v) => Some(v),
318 _ => None,
319 }
320 }
321
322 fn to_kvsref(&self) -> Option<(&K, &V, usize)> {
323 match &self {
324 CacheItem::Freq(lln, v) | CacheItem::Rec(lln, v) => {
325 let cii = lln.as_ref();
326 Some((&cii.k, v, cii.size))
327 }
328 _ => None,
329 }
330 }
331
332 #[cfg(test)]
333 fn to_state(&self) -> CacheState {
334 match &self {
335 CacheItem::Freq(_, _v) => CacheState::Freq,
336 CacheItem::Rec(_, _v) => CacheState::Rec,
337 CacheItem::GhostFreq(_) => CacheState::GhostFreq,
338 CacheItem::GhostRec(_) => CacheState::GhostRec,
339 CacheItem::Haunted(_) => CacheState::Haunted,
340 }
341 }
342}
343
344pub struct ARCacheBuilder {
346 max: Option<usize>,
347 read_max: Option<usize>,
348 watermark: Option<usize>,
349 reader_quiesce: bool,
350 look_back_limit: Option<u8>,
351}
352
353impl Default for ARCacheBuilder {
354 fn default() -> Self {
355 ARCacheBuilder {
356 max: None,
357 read_max: None,
358 watermark: None,
359 reader_quiesce: true,
360 look_back_limit: None,
361 }
362 }
363}
364
365impl ARCacheBuilder {
366 pub fn new() -> Self {
368 Self::default()
369 }
370
371 #[must_use]
387 pub fn set_expected_workload(
388 self,
389 total: usize,
390 threads: usize,
391 ex_ro_miss: usize,
392 ex_rw_miss: usize,
393 read_cache: bool,
394 ) -> Self {
395 let total = isize::try_from(total).unwrap();
396 let threads = isize::try_from(threads).unwrap();
397 let ro_miss = isize::try_from(ex_ro_miss).unwrap();
398 let wr_miss = isize::try_from(ex_rw_miss).unwrap();
399
400 let read_max = if read_cache {
406 let read_max_limit = total / READ_THREAD_CACHE_RATIO;
407 let read_max_thread_limit = read_max_limit / threads;
408 ro_miss.clamp(0, read_max_thread_limit)
409 } else {
410 0
412 };
413
414 let wr_miss_thread_limit = total / WRITE_THREAD_CACHE_RATIO;
417 let wr_miss = wr_miss.clamp(0, wr_miss_thread_limit);
418
419 let max = total - (wr_miss + (read_max * threads));
420
421 let max = usize::try_from(max).unwrap();
423 let read_max = usize::try_from(read_max).unwrap();
424
425 ARCacheBuilder {
426 max: Some(max),
428 read_max: Some(read_max),
429 watermark: self.watermark,
430 reader_quiesce: self.reader_quiesce,
431 look_back_limit: self.look_back_limit,
432 }
433 }
434
435 #[must_use]
441 pub fn set_size(mut self, max: usize, read_max: usize) -> Self {
442 self.max = Some(max);
443 self.read_max = Some(read_max);
444 self
445 }
446
447 #[must_use]
451 pub fn set_watermark(mut self, watermark: usize) -> Self {
452 self.watermark = Some(watermark);
453 self
454 }
455
456 #[must_use]
463 pub fn set_look_back_limit(mut self, look_back_limit: u8) -> Self {
464 self.look_back_limit = Some(look_back_limit);
465 self
466 }
467
468 #[must_use]
473 pub fn set_reader_quiesce(mut self, reader_quiesce: bool) -> Self {
474 self.reader_quiesce = reader_quiesce;
475 self
476 }
477
478 pub fn build<K, V>(self) -> Option<ARCache<K, V>>
481 where
482 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
483 V: Clone + Debug + Sync + Send + 'static,
484 {
485 let ARCacheBuilder {
486 max,
488 read_max,
489 watermark,
490 reader_quiesce,
491 look_back_limit,
492 } = self;
493
494 let (max, read_max) = max.zip(read_max)?;
495
496 let watermark = watermark.unwrap_or(if max < WATERMARK_DISABLE_MIN {
497 0
498 } else {
499 (max / WATERMARK_DISABLE_DIVISOR) * WATERMARK_DISABLE_RATIO
500 });
501 let watermark = watermark.clamp(0, max);
502 let init_watermark = watermark == 0;
504
505 let look_back_limit = look_back_limit
506 .unwrap_or(TXN_LOOKBACK_LIMIT_DEFAULT)
507 .clamp(TXN_LOOKBACK_LIMIT_ABS_MIN, u8::MAX) as u64;
508
509 let chan_size = WRITE_THREAD_CHANNEL_SIZE;
516 let hit_queue = Arc::new(ArrayQueue::new(chan_size));
517
518 let chan_size = READ_THREAD_CHANNEL_SIZE;
521 let inc_queue = Arc::new(ArrayQueue::new(chan_size));
522
523 let shared = RwLock::new(ArcShared {
524 max,
525 read_max,
526 hit_queue: hit_queue.clone(),
528 inc_queue: inc_queue.clone(),
529 watermark,
530 reader_quiesce,
531 });
532 let inner = Mutex::new(ArcInner {
533 p: 0,
535 freq: LL::new(),
536 rec: LL::new(),
537 ghost_freq: LL::new(),
538 ghost_rec: LL::new(),
539 haunted: LL::new(),
540 hit_queue,
542 inc_queue,
543 min_txid: 0,
544 });
545
546 Some(ARCache {
547 cache: DataMap::new(),
548 shared,
549 inner,
550 above_watermark: AtomicBool::new(init_watermark),
552 look_back_limit,
553 })
554 }
555}
556
557impl<
558 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
559 V: Clone + Debug + Sync + Send + 'static,
560 > ARCache<K, V>
561{
562 #[deprecated(since = "0.2.20", note = "please use`ARCacheBuilder` instead")]
564 pub fn new(
565 total: usize,
566 threads: usize,
567 ex_ro_miss: usize,
568 ex_rw_miss: usize,
569 read_cache: bool,
570 ) -> Self {
571 ARCacheBuilder::default()
572 .set_expected_workload(total, threads, ex_ro_miss, ex_rw_miss, read_cache)
573 .build()
574 .expect("Invalid cache parameters!")
575 }
576
577 #[deprecated(since = "0.2.20", note = "please use`ARCacheBuilder` instead")]
579 pub fn new_size(max: usize, read_max: usize) -> Self {
580 ARCacheBuilder::default()
581 .set_size(max, read_max)
582 .build()
583 .expect("Invalid cache parameters!")
584 }
585
586 #[deprecated(since = "0.2.20", note = "please use`ARCacheBuilder` instead")]
588 pub fn new_size_watermark(max: usize, read_max: usize, watermark: usize) -> Self {
589 ARCacheBuilder::default()
590 .set_size(max, read_max)
591 .set_watermark(watermark)
592 .build()
593 .expect("Invalid cache parameters!")
594 }
595
596 pub fn read_stats<S>(&self, stats: S) -> ARCacheReadTxn<'_, K, V, S>
600 where
601 S: ARCacheReadStat + Clone,
602 {
603 let rshared = self.shared.read().unwrap();
604 let tlocal = if rshared.read_max > 0 {
605 Some(ReadCache {
606 set: Map::new(),
607 read_size: rshared.read_max,
608 tlru: LL::new(),
609 })
610 } else {
611 None
612 };
613 let above_watermark = self.above_watermark.load(Ordering::Relaxed);
614 ARCacheReadTxn {
615 caller: self,
616 cache: self.cache.read(),
617 tlocal,
618 hit_queue: rshared.hit_queue.clone(),
620 inc_queue: rshared.inc_queue.clone(),
621 above_watermark,
622 reader_quiesce: rshared.reader_quiesce,
623 stats,
624 }
625 }
626
627 pub fn read(&self) -> ARCacheReadTxn<'_, K, V, ()> {
631 self.read_stats(())
632 }
633
634 pub fn write(&self) -> ARCacheWriteTxn<'_, K, V, ()> {
638 self.write_stats(())
639 }
640
641 pub fn write_stats<S>(&self, stats: S) -> ARCacheWriteTxn<'_, K, V, S>
643 where
644 S: ARCacheWriteStat<K>,
645 {
646 let above_watermark = self.above_watermark.load(Ordering::Relaxed);
647 ARCacheWriteTxn {
648 caller: self,
649 cache: self.cache.write(),
650 tlocal: Map::new(),
651 hit: UnsafeCell::new(Vec::new()),
652 clear: UnsafeCell::new(false),
653 above_watermark,
654 stats,
656 }
657 }
658
659 fn try_write_stats<S>(&self, stats: S) -> Result<ARCacheWriteTxn<'_, K, V, S>, S>
660 where
661 S: ARCacheWriteStat<K>,
662 {
663 match self.cache.try_write() {
664 Some(cache) => {
665 let above_watermark = self.above_watermark.load(Ordering::Relaxed);
666 Ok(ARCacheWriteTxn {
667 caller: self,
668 cache,
669 tlocal: Map::new(),
670 hit: UnsafeCell::new(Vec::new()),
671 clear: UnsafeCell::new(false),
672 above_watermark,
673 stats,
675 })
676 }
677 None => Err(stats),
678 }
679 }
680
681 pub fn try_quiesce_stats<S>(&self, stats: S) -> S
684 where
685 S: ARCacheWriteStat<K>,
686 {
687 match self.try_write_stats(stats) {
691 Ok(wr_txn) => wr_txn.commit(),
692 Err(stats) => stats,
693 }
694 }
695
696 pub fn try_quiesce(&self) {
699 self.try_quiesce_stats(())
700 }
701
702 fn calc_p_freq(ghost_rec_len: usize, ghost_freq_len: usize, p: &mut usize, size: usize) {
703 let delta = if ghost_rec_len > ghost_freq_len {
704 ghost_rec_len / ghost_freq_len
705 } else {
706 1
707 } * size;
708 let p_was = *p;
709 if delta < *p {
710 *p -= delta
711 } else {
712 *p = 0
713 }
714 tracing::trace!("f {} >>> {}", p_was, *p);
715 }
716
717 fn calc_p_rec(
718 cap: usize,
719 ghost_rec_len: usize,
720 ghost_freq_len: usize,
721 p: &mut usize,
722 size: usize,
723 ) {
724 let delta = if ghost_freq_len > ghost_rec_len {
725 ghost_freq_len / ghost_rec_len
726 } else {
727 1
728 } * size;
729 let p_was = *p;
730 if delta <= cap - *p {
731 *p += delta
732 } else {
733 *p = cap
734 }
735 tracing::trace!("r {} >>> {}", p_was, *p);
736 }
737
738 fn drain_tlocal_inc<S>(
739 &self,
740 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
741 inner: &mut ArcInner<K, V>,
742 shared: &ArcShared<K, V>,
743 tlocal: Map<K, ThreadCacheItem<V>>,
744 commit_txid: u64,
745 stats: &mut S,
746 ) where
747 S: ARCacheWriteStat<K>,
748 {
749 tlocal.into_iter().for_each(|(k, tcio)| {
751 let r = cache.get_mut(&k);
752 match (r, tcio) {
753 (None, ThreadCacheItem::Present(tci, clean, size)) => {
754 assert!(clean);
755 let llp = inner.rec.append_k(CacheItemInner {
756 k: k.clone(),
757 txid: commit_txid,
758 size,
759 });
760 stats.include(&k);
762 cache.insert(k, CacheItem::Rec(llp, tci));
763 }
764 (None, ThreadCacheItem::Removed(clean)) => {
765 assert!(clean);
766 let llp = inner.haunted.append_k(CacheItemInner {
768 k: k.clone(),
769 txid: commit_txid,
770 size: 1,
771 });
772 cache.insert(k, CacheItem::Haunted(llp));
773 }
774 (Some(ref mut ci), ThreadCacheItem::Removed(clean)) => {
775 assert!(clean);
776 let mut next_state = match ci {
778 CacheItem::Freq(llp, _v) => {
779 let mut owned = inner.freq.extract(llp.clone());
780 owned.as_mut().txid = commit_txid;
781 let pointer = inner.haunted.append_n(owned);
782 CacheItem::Haunted(pointer)
783 }
784 CacheItem::Rec(llp, _v) => {
785 let mut owned = inner.rec.extract(llp.clone());
787 owned.as_mut().txid = commit_txid;
788 let pointer = inner.haunted.append_n(owned);
789 CacheItem::Haunted(pointer)
790 }
791 CacheItem::GhostFreq(llp) => {
792 let mut owned = inner.ghost_freq.extract(llp.clone());
793 owned.as_mut().txid = commit_txid;
794 let pointer = inner.haunted.append_n(owned);
795 CacheItem::Haunted(pointer)
796 }
797 CacheItem::GhostRec(llp) => {
798 let mut owned = inner.ghost_rec.extract(llp.clone());
799 owned.as_mut().txid = commit_txid;
800 let pointer = inner.haunted.append_n(owned);
801 CacheItem::Haunted(pointer)
802 }
803 CacheItem::Haunted(llp) => {
804 unsafe { llp.make_mut().txid = commit_txid };
805 CacheItem::Haunted(llp.clone())
806 }
807 };
808 mem::swap(*ci, &mut next_state);
810 }
811 (Some(ref mut ci), ThreadCacheItem::Present(tci, clean, size)) => {
814 assert!(clean);
815 let mut next_state = match ci {
818 CacheItem::Freq(llp, _v) => {
819 let mut owned = inner.freq.extract(llp.clone());
820 owned.as_mut().txid = commit_txid;
821 owned.as_mut().size = size;
822 stats.modify(&owned.as_ref().k);
824 let pointer = inner.freq.append_n(owned);
825 CacheItem::Freq(pointer, tci)
827 }
828 CacheItem::Rec(llp, _v) => {
829 let mut owned = inner.rec.extract(llp.clone());
831 owned.as_mut().txid = commit_txid;
832 owned.as_mut().size = size;
833 stats.modify(&owned.as_ref().k);
834 let pointer = inner.freq.append_n(owned);
835 CacheItem::Freq(pointer, tci)
836 }
837 CacheItem::GhostFreq(llp) => {
838 Self::calc_p_freq(
840 inner.ghost_rec.len(),
841 inner.ghost_freq.len(),
842 &mut inner.p,
843 size,
844 );
845 let mut owned = inner.ghost_freq.extract(llp.clone());
846 owned.as_mut().txid = commit_txid;
847 owned.as_mut().size = size;
848 stats.ghost_frequent_revive(&owned.as_ref().k);
849 let pointer = inner.freq.append_n(owned);
850 CacheItem::Freq(pointer, tci)
851 }
852 CacheItem::GhostRec(llp) => {
853 Self::calc_p_rec(
855 shared.max,
856 inner.ghost_rec.len(),
857 inner.ghost_freq.len(),
858 &mut inner.p,
859 size,
860 );
861 let mut owned = inner.ghost_rec.extract(llp.clone());
862 owned.as_mut().txid = commit_txid;
863 owned.as_mut().size = size;
864 stats.ghost_recent_revive(&owned.as_ref().k);
865 let pointer = inner.rec.append_n(owned);
866 CacheItem::Rec(pointer, tci)
867 }
868 CacheItem::Haunted(llp) => {
869 let mut owned = inner.haunted.extract(llp.clone());
870 owned.as_mut().txid = commit_txid;
871 owned.as_mut().size = size;
872 stats.include_haunted(&owned.as_ref().k);
873 let pointer = inner.rec.append_n(owned);
874 CacheItem::Rec(pointer, tci)
875 }
876 };
877 mem::swap(*ci, &mut next_state);
879 }
880 }
881 });
882 }
883
884 fn drain_hit_rx(
885 &self,
886 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
887 inner: &mut ArcInner<K, V>,
888 commit_ts: Instant,
889 ) {
890 while let Some(ce) = inner.hit_queue.pop() {
894 let CacheHitEvent { t, k_hash } = ce;
895 if let Some(ref mut ci_slots) = unsafe { cache.get_slot_mut(k_hash) } {
896 for ref mut ci in ci_slots.iter_mut() {
897 let mut next_state = match &ci.v {
898 CacheItem::Freq(llp, v) => {
899 inner.freq.touch(llp.to_owned());
900 CacheItem::Freq(llp.to_owned(), v.to_owned())
901 }
902 CacheItem::Rec(llp, v) => {
903 let owned = inner.rec.extract(llp.to_owned());
904 let pointer = inner.freq.append_n(owned);
905 CacheItem::Freq(pointer, v.to_owned())
906 }
907 CacheItem::GhostFreq(llp) => {
910 inner.ghost_freq.touch(llp.to_owned());
911 CacheItem::GhostFreq(llp.to_owned())
912 }
913 CacheItem::GhostRec(llp) => {
914 inner.ghost_rec.touch(llp.to_owned());
915 CacheItem::GhostRec(llp.to_owned())
916 }
917 CacheItem::Haunted(llp) => {
918 CacheItem::Haunted(llp.to_owned())
922 }
923 };
924 mem::swap(&mut ci.v, &mut next_state);
925 } }
927 if t >= commit_ts {
931 break;
932 }
933 }
934 }
935
936 fn drain_inc_rx<S>(
937 &self,
938 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
939 inner: &mut ArcInner<K, V>,
940 shared: &ArcShared<K, V>,
941 commit_ts: Instant,
942 stats: &mut S,
943 ) where
944 S: ARCacheWriteStat<K>,
945 {
946 while let Some(ce) = inner.inc_queue.pop() {
947 let CacheIncludeEvent {
949 t,
950 k,
951 v: iv,
952 txid,
953 size,
954 } = ce;
955 let mut r = cache.get_mut(&k);
956 match r {
957 Some(ref mut ci) => {
958 let mut next_state = match &ci {
959 CacheItem::Freq(llp, _v) => {
960 if llp.as_ref().txid >= txid || inner.min_txid > txid {
961 inner.freq.touch(llp.to_owned());
963 None
964 } else {
965 let mut owned = inner.freq.extract(llp.to_owned());
967 owned.as_mut().txid = txid;
968 owned.as_mut().size = size;
969 stats.modify(&owned.as_mut().k);
970 let pointer = inner.freq.append_n(owned);
971 Some(CacheItem::Freq(pointer, iv))
972 }
973 }
974 CacheItem::Rec(llp, v) => {
975 let mut owned = inner.rec.extract(llp.to_owned());
976 if llp.as_ref().txid >= txid || inner.min_txid > txid {
977 let pointer = inner.freq.append_n(owned);
978 Some(CacheItem::Freq(pointer, v.to_owned()))
979 } else {
980 owned.as_mut().txid = txid;
981 owned.as_mut().size = size;
982 stats.modify(&owned.as_mut().k);
983 let pointer = inner.freq.append_n(owned);
984 Some(CacheItem::Freq(pointer, iv))
985 }
986 }
987 CacheItem::GhostFreq(llp) => {
988 if llp.as_ref().txid > txid || inner.min_txid > txid {
990 let size = llp.as_ref().size;
992 Self::calc_p_freq(
993 inner.ghost_rec.len(),
994 inner.ghost_freq.len(),
995 &mut inner.p,
996 size,
997 );
998 inner.ghost_freq.touch(llp.to_owned());
999 None
1000 } else {
1001 Self::calc_p_freq(
1003 inner.ghost_rec.len(),
1004 inner.ghost_freq.len(),
1005 &mut inner.p,
1006 size,
1007 );
1008 let mut owned = inner.ghost_freq.extract(llp.to_owned());
1009 owned.as_mut().txid = txid;
1010 owned.as_mut().size = size;
1011 stats.ghost_frequent_revive(&owned.as_mut().k);
1012 let pointer = inner.freq.append_n(owned);
1013 Some(CacheItem::Freq(pointer, iv))
1014 }
1015 }
1016 CacheItem::GhostRec(llp) => {
1017 if llp.as_ref().txid > txid || inner.min_txid > txid {
1019 let size = llp.as_ref().size;
1020 Self::calc_p_rec(
1021 shared.max,
1022 inner.ghost_rec.len(),
1023 inner.ghost_freq.len(),
1024 &mut inner.p,
1025 size,
1026 );
1027 inner.ghost_rec.touch(llp.clone());
1028 None
1029 } else {
1030 Self::calc_p_rec(
1031 shared.max,
1032 inner.ghost_rec.len(),
1033 inner.ghost_freq.len(),
1034 &mut inner.p,
1035 size,
1036 );
1037 let mut owned = inner.ghost_rec.extract(llp.to_owned());
1038 owned.as_mut().txid = txid;
1039 owned.as_mut().size = size;
1040 stats.ghost_recent_revive(&owned.as_ref().k);
1041 let pointer = inner.rec.append_n(owned);
1042 Some(CacheItem::Rec(pointer, iv))
1043 }
1044 }
1045 CacheItem::Haunted(llp) => {
1046 if llp.as_ref().txid > txid || inner.min_txid > txid {
1047 None
1048 } else {
1049 let mut owned = inner.haunted.extract(llp.to_owned());
1050 owned.as_mut().txid = txid;
1051 owned.as_mut().size = size;
1052 stats.include_haunted(&owned.as_mut().k);
1053 let pointer = inner.rec.append_n(owned);
1054 Some(CacheItem::Rec(pointer, iv))
1055 }
1056 }
1057 };
1058 if let Some(ref mut next_state) = next_state {
1059 mem::swap(*ci, next_state);
1060 }
1061 }
1062 None => {
1063 if txid >= inner.min_txid {
1066 let llp = inner.rec.append_k(CacheItemInner {
1067 k: k.clone(),
1068 txid,
1069 size,
1070 });
1071 stats.include(&k);
1072 cache.insert(k, CacheItem::Rec(llp, iv));
1073 }
1074 }
1075 };
1076
1077 if t >= commit_ts {
1079 break;
1080 }
1081 }
1082 }
1083
1084 fn drain_tlocal_hits(
1085 &self,
1086 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1087 inner: &mut ArcInner<K, V>,
1088 commit_txid: u64,
1090 hit: Vec<u64>,
1091 ) {
1092 hit.into_iter().for_each(|k_hash| {
1094 let mut r = unsafe { cache.get_slot_mut(k_hash) };
1104 match r {
1105 Some(ref mut ci_slots) => {
1106 for ref mut ci in ci_slots.iter_mut() {
1107 let mut next_state = match &ci.v {
1113 CacheItem::Freq(llp, v) => {
1114 if llp.as_ref().txid != commit_txid {
1117 inner.freq.touch(llp.to_owned());
1118 Some(CacheItem::Freq(llp.to_owned(), v.to_owned()))
1119 } else {
1120 None
1121 }
1122 }
1123 CacheItem::Rec(llp, v) => {
1124 if llp.as_ref().txid != commit_txid {
1125 let owned = inner.rec.extract(llp.clone());
1127 let pointer = inner.freq.append_n(owned);
1128 Some(CacheItem::Freq(pointer, v.clone()))
1129 } else {
1130 None
1131 }
1132 }
1133 _ => {
1134 None
1136 }
1137 };
1138 if let Some(ref mut next_state) = next_state {
1140 mem::swap(&mut ci.v, next_state);
1141 }
1142 } }
1144 None => {
1145 unreachable!();
1147 }
1148 }
1149 });
1150 }
1151
1152 fn evict_to_haunted_len(
1153 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1154 ll: &mut LL<CacheItemInner<K>>,
1155 to_ll: &mut LL<CacheItemInner<K>>,
1156 size: usize,
1157 txid: u64,
1158 ) {
1159 while ll.len() > size {
1160 let mut owned = ll.pop();
1161 debug_assert!(!owned.is_null());
1162
1163 owned.as_mut().txid = txid;
1165
1166 let pointer = to_ll.append_n(owned);
1167 let mut r = cache.get_mut(&pointer.as_ref().k);
1168
1169 match r {
1170 Some(ref mut ci) => {
1171 let mut next_state = CacheItem::Haunted(pointer);
1173 mem::swap(*ci, &mut next_state);
1174 }
1175 None => {
1176 unreachable!();
1178 }
1179 };
1180 }
1181 }
1182
1183 fn evict_to_len<S>(
1184 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1185 ll: &mut LL<CacheItemInner<K>>,
1186 to_ll: &mut LL<CacheItemInner<K>>,
1187 size: usize,
1188 txid: u64,
1189 stats: &mut S,
1190 ) where
1191 S: ARCacheWriteStat<K>,
1192 {
1193 debug_assert!(ll.len() >= size);
1194
1195 while ll.len() > size {
1196 let mut owned = ll.pop();
1197 debug_assert!(!owned.is_null());
1198 let mut r = cache.get_mut(&owned.as_ref().k);
1199 owned.as_mut().txid = txid;
1201 match r {
1202 Some(ref mut ci) => {
1203 let mut next_state = match &ci {
1204 CacheItem::Freq(llp, _v) => {
1205 debug_assert!(llp == &owned);
1206 stats.evict_from_frequent(&owned.as_ref().k);
1209 let pointer = to_ll.append_n(owned);
1210 CacheItem::GhostFreq(pointer)
1211 }
1212 CacheItem::Rec(llp, _v) => {
1213 debug_assert!(llp == &owned);
1214 stats.evict_from_recent(&owned.as_mut().k);
1217 let pointer = to_ll.append_n(owned);
1218 CacheItem::GhostRec(pointer)
1219 }
1220 _ => {
1221 unreachable!();
1223 }
1224 };
1225 mem::swap(*ci, &mut next_state);
1227 }
1228 None => {
1229 unreachable!();
1231 }
1232 }
1233 }
1234 }
1235
1236 #[allow(clippy::cognitive_complexity)]
1237 fn evict<S>(
1238 &self,
1239 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1240 inner: &mut ArcInner<K, V>,
1241 shared: &ArcShared<K, V>,
1242 commit_txid: u64,
1243 stats: &mut S,
1244 ) where
1245 S: ARCacheWriteStat<K>,
1246 {
1247 debug_assert!(inner.p <= shared.max);
1248 let p = inner.p;
1250
1251 if inner.rec.len() + inner.freq.len() > shared.max {
1252 trace!(
1254 "from -> rec {:?}, freq {:?}",
1255 inner.rec.len(),
1256 inner.freq.len()
1257 );
1258 let delta = (inner.rec.len() + inner.freq.len()) - shared.max;
1259 let rec_to_len = if inner.p == 0 {
1273 trace!("p == 0 => {:?} - {}", inner.rec.len(), delta);
1274 if delta < inner.rec.len() {
1275 inner.rec.len() - delta
1277 } else {
1278 0
1280 }
1281 } else if inner.rec.len() > inner.p {
1282 let rec_delta = inner.rec.len() - inner.p;
1284 if rec_delta > delta {
1285 inner.rec.len() - delta
1298 } else {
1299 inner.rec.len() - rec_delta
1310 }
1311 } else {
1312 inner.rec.len()
1316 };
1317
1318 let freq_to_len = shared.max - rec_to_len;
1320 debug_assert!(rec_to_len <= inner.rec.len());
1322 debug_assert!(freq_to_len <= inner.freq.len());
1323
1324 Self::evict_to_len(
1330 cache,
1331 &mut inner.rec,
1332 &mut inner.ghost_rec,
1333 rec_to_len,
1334 commit_txid,
1335 stats,
1336 );
1337 Self::evict_to_len(
1338 cache,
1339 &mut inner.freq,
1340 &mut inner.ghost_freq,
1341 freq_to_len,
1342 commit_txid,
1343 stats,
1344 );
1345
1346 if inner.ghost_rec.len() > (shared.max - p) {
1350 Self::evict_to_haunted_len(
1351 cache,
1352 &mut inner.ghost_rec,
1353 &mut inner.haunted,
1354 freq_to_len,
1355 commit_txid,
1356 );
1357 }
1358
1359 if inner.ghost_freq.len() > p {
1360 Self::evict_to_haunted_len(
1361 cache,
1362 &mut inner.ghost_freq,
1363 &mut inner.haunted,
1364 rec_to_len,
1365 commit_txid,
1366 );
1367 }
1368 }
1369 }
1370
1371 fn drain_ll_to_ghost<S>(
1372 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1373 ll: &mut LL<CacheItemInner<K>>,
1374 gf: &mut LL<CacheItemInner<K>>,
1375 gr: &mut LL<CacheItemInner<K>>,
1376 txid: u64,
1377 stats: &mut S,
1378 ) where
1379 S: ARCacheWriteStat<K>,
1380 {
1381 while ll.len() > 0 {
1382 let mut owned = ll.pop();
1383 debug_assert!(!owned.is_null());
1384
1385 owned.as_mut().txid = txid;
1387
1388 let mut r = cache.get_mut(&owned.as_ref().k);
1389 match r {
1390 Some(ref mut ci) => {
1391 let mut next_state = match &ci {
1392 CacheItem::Freq(n, _) => {
1393 debug_assert!(n == &owned);
1394 stats.evict_from_frequent(&owned.as_ref().k);
1395 let pointer = gf.append_n(owned);
1396 CacheItem::GhostFreq(pointer)
1397 }
1398 CacheItem::Rec(n, _) => {
1399 debug_assert!(n == &owned);
1400 stats.evict_from_recent(&owned.as_ref().k);
1401 let pointer = gr.append_n(owned);
1402 CacheItem::GhostRec(pointer)
1403 }
1404 _ => {
1405 unreachable!();
1407 }
1408 };
1409 mem::swap(*ci, &mut next_state);
1411 }
1412 None => {
1413 unreachable!();
1415 }
1416 }
1417 } }
1419
1420 fn drain_ll_min_txid(
1421 cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1422 ll: &mut LL<CacheItemInner<K>>,
1423 min_txid: u64,
1424 ) {
1425 while let Some(node) = ll.peek_head() {
1426 if min_txid > node.txid {
1427 cache.remove(&node.k);
1429
1430 ll.drop_head();
1432 } else {
1433 break;
1436 }
1437 }
1438 }
1439
1440 #[allow(clippy::unnecessary_mut_passed)]
1441 fn commit<S>(
1442 &self,
1443 mut cache: DataMapWriteTxn<K, CacheItem<K, V>>,
1444 tlocal: Map<K, ThreadCacheItem<V>>,
1445 hit: Vec<u64>,
1446 clear: bool,
1447 init_above_watermark: bool,
1448 mut stats: S,
1450 ) -> S
1451 where
1452 S: ARCacheWriteStat<K>,
1453 {
1454 let commit_ts = Instant::now();
1456 let commit_txid = cache.get_txid();
1457 let mut inner = self.inner.lock().unwrap();
1459 let shared = self.shared.read().unwrap();
1460
1461 if clear {
1467 inner.min_txid = commit_txid;
1469
1470 let m_inner = inner.deref_mut();
1476
1477 let i_f = &mut m_inner.freq;
1478 let g_f = &mut m_inner.ghost_freq;
1479 let i_r = &mut m_inner.rec;
1480 let g_r = &mut m_inner.ghost_rec;
1481
1482 Self::drain_ll_to_ghost(&mut cache, i_f, g_f, g_r, commit_txid, &mut stats);
1484 Self::drain_ll_to_ghost(&mut cache, i_r, g_f, g_r, commit_txid, &mut stats);
1485 } else {
1486 let possible_new_limit = commit_txid.saturating_sub(self.look_back_limit);
1491 inner.min_txid = inner.min_txid.max(possible_new_limit);
1492 }
1493
1494 self.drain_tlocal_inc(
1504 &mut cache,
1505 inner.deref_mut(),
1506 shared.deref(),
1507 tlocal,
1508 commit_txid,
1509 &mut stats,
1510 );
1511
1512 self.drain_inc_rx(
1514 &mut cache,
1515 inner.deref_mut(),
1516 shared.deref(),
1517 commit_ts,
1518 &mut stats,
1519 );
1520
1521 self.drain_hit_rx(&mut cache, inner.deref_mut(), commit_ts);
1522
1523 self.drain_tlocal_hits(&mut cache, inner.deref_mut(), commit_txid, hit);
1529
1530 self.evict(
1537 &mut cache,
1538 inner.deref_mut(),
1539 shared.deref(),
1540 commit_txid,
1541 &mut stats,
1542 );
1543
1544 {
1549 let min_txid = inner.min_txid;
1551 Self::drain_ll_min_txid(&mut cache, &mut inner.haunted, min_txid);
1552 }
1553
1554 stats.p_weight(inner.p as u64);
1555 stats.shared_max(shared.max as u64);
1556 stats.freq(inner.freq.len() as u64);
1557 stats.recent(inner.rec.len() as u64);
1558 stats.all_seen_keys(cache.len() as u64);
1559
1560 if init_above_watermark {
1565 if (inner.freq.len() + inner.rec.len()) < shared.watermark {
1566 self.above_watermark.store(false, Ordering::Relaxed);
1567 }
1568 } else if (inner.freq.len() + inner.rec.len()) >= shared.watermark {
1569 self.above_watermark.store(true, Ordering::Relaxed);
1570 }
1571
1572 cache.commit();
1574 stats
1578 }
1579}
1580
1581impl<
1582 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
1583 V: Clone + Debug + Sync + Send + 'static,
1584 S: ARCacheWriteStat<K>,
1585 > ARCacheWriteTxn<'_, K, V, S>
1586{
1587 pub fn commit(self) -> S {
1594 self.caller.commit(
1595 self.cache,
1596 self.tlocal,
1597 self.hit.into_inner(),
1598 self.clear.into_inner(),
1599 self.above_watermark,
1600 self.stats,
1602 )
1603 }
1604
1605 pub fn clear(&mut self) {
1609 unsafe {
1611 let clear_ptr = self.clear.get();
1612 *clear_ptr = true;
1613 }
1614 unsafe {
1616 let hit_ptr = self.hit.get();
1617 (*hit_ptr).clear();
1618 }
1619
1620 self.stats.cache_clear();
1623 self.tlocal.clear();
1632 }
1635
1636 pub fn get<Q>(&mut self, k: &Q) -> Option<&V>
1641 where
1642 K: Borrow<Q>,
1643 Q: Hash + Eq + Ord + ?Sized,
1644 {
1645 let k_hash: u64 = self.cache.prehash(k);
1646
1647 self.stats.cache_read();
1655
1656 let r: Option<&V> = if let Some(tci) = self.tlocal.get(k) {
1657 match tci {
1658 ThreadCacheItem::Present(v, _clean, _size) => {
1659 let v = v as *const _;
1660 unsafe { Some(&(*v)) }
1661 }
1662 ThreadCacheItem::Removed(_clean) => {
1663 return None;
1664 }
1665 }
1666 } else {
1667 let is_cleared = unsafe {
1670 let clear_ptr = self.clear.get();
1671 *clear_ptr
1672 };
1673 if !is_cleared {
1674 if let Some(v) = self.cache.get_prehashed(k, k_hash) {
1675 (*v).to_vref()
1676 } else {
1677 None
1678 }
1679 } else {
1680 None
1681 }
1682 };
1683
1684 if r.is_some() {
1685 self.stats.cache_hit();
1686 }
1687
1688 if self.above_watermark && r.is_some() {
1693 unsafe {
1694 let hit_ptr = self.hit.get();
1695 (*hit_ptr).push(k_hash);
1696 }
1697 }
1698 r
1699 }
1700
1701 pub fn get_mut<Q>(&mut self, k: &Q, make_dirty: bool) -> Option<&mut V>
1710 where
1711 K: Borrow<Q>,
1712 Q: Hash + Eq + Ord + ?Sized,
1713 {
1714 let is_cleared = unsafe {
1716 let clear_ptr = self.clear.get();
1717 *clear_ptr
1718 };
1719
1720 if !is_cleared && !self.tlocal.contains_key(k) {
1723 let k_hash: u64 = self.cache.prehash(k);
1725 if let Some(v) = self.cache.get_prehashed(k, k_hash) {
1726 if let Some((dk, dv, ds)) = v.to_kvsref() {
1727 self.tlocal.insert(
1728 dk.clone(),
1729 ThreadCacheItem::Present(dv.clone(), !make_dirty, ds),
1730 );
1731 }
1732 }
1733 };
1734
1735 match self.tlocal.get_mut(k) {
1738 Some(ThreadCacheItem::Present(v, clean, _size)) => {
1739 if make_dirty && *clean {
1740 *clean = false;
1741 }
1742 let v = v as *mut _;
1743 unsafe { Some(&mut (*v)) }
1744 }
1745 _ => None,
1746 }
1747 }
1748
1749 pub fn contains_key<Q>(&mut self, k: &Q) -> bool
1751 where
1752 K: Borrow<Q>,
1753 Q: Hash + Eq + Ord + ?Sized,
1754 {
1755 self.get(k).is_some()
1756 }
1757
1758 pub fn insert(&mut self, k: K, v: V) {
1763 self.tlocal.insert(k, ThreadCacheItem::Present(v, true, 1));
1764 }
1765
1766 pub fn insert_sized(&mut self, k: K, v: V, size: NonZeroUsize) {
1768 self.tlocal
1769 .insert(k, ThreadCacheItem::Present(v, true, size.get()));
1770 }
1771
1772 pub fn remove(&mut self, k: K) {
1776 self.tlocal.insert(k, ThreadCacheItem::Removed(true));
1777 }
1778
1779 pub fn insert_dirty(&mut self, k: K, v: V) {
1785 self.tlocal.insert(k, ThreadCacheItem::Present(v, false, 1));
1786 }
1787
1788 pub fn insert_dirty_sized(&mut self, k: K, v: V, size: NonZeroUsize) {
1790 self.tlocal
1791 .insert(k, ThreadCacheItem::Present(v, false, size.get()));
1792 }
1793
1794 pub fn remove_dirty(&mut self, k: K) {
1799 self.tlocal.insert(k, ThreadCacheItem::Removed(false));
1800 }
1801
1802 pub fn is_dirty(&self) -> bool {
1804 self.iter_dirty().take(1).next().is_some()
1805 }
1806
1807 pub fn iter_dirty(&self) -> impl Iterator<Item = (&K, Option<&V>)> {
1811 self.tlocal
1812 .iter()
1813 .filter(|(_k, v)| match v {
1814 ThreadCacheItem::Present(_v, c, _size) => !c,
1815 ThreadCacheItem::Removed(c) => !c,
1816 })
1817 .map(|(k, v)| {
1818 let data = match v {
1820 ThreadCacheItem::Present(v, _c, _size) => Some(v),
1821 ThreadCacheItem::Removed(_c) => None,
1822 };
1823 (k, data)
1824 })
1825 }
1826
1827 pub fn iter_mut_dirty(&mut self) -> impl Iterator<Item = (&K, Option<&mut V>)> {
1831 self.tlocal
1832 .iter_mut()
1833 .filter(|(_k, v)| match v {
1834 ThreadCacheItem::Present(_v, c, _size) => !c,
1835 ThreadCacheItem::Removed(c) => !c,
1836 })
1837 .map(|(k, v)| {
1838 let data = match v {
1840 ThreadCacheItem::Present(v, _c, _size) => Some(v),
1841 ThreadCacheItem::Removed(_c) => None,
1842 };
1843 (k, data)
1844 })
1845 }
1846
1847 pub fn iter_mut_mark_clean(&mut self) -> impl Iterator<Item = (&K, Option<&mut V>)> {
1852 self.tlocal
1853 .iter_mut()
1854 .filter(|(_k, v)| match v {
1855 ThreadCacheItem::Present(_v, c, _size) => !c,
1856 ThreadCacheItem::Removed(c) => !c,
1857 })
1858 .map(|(k, v)| {
1859 match v {
1861 ThreadCacheItem::Present(_v, c, _size) => *c = true,
1862 ThreadCacheItem::Removed(c) => *c = true,
1863 }
1864 let data = match v {
1866 ThreadCacheItem::Present(v, _c, _size) => Some(v),
1867 ThreadCacheItem::Removed(_c) => None,
1868 };
1869 (k, data)
1870 })
1871 }
1872
1873 pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
1875 self.cache.values().filter_map(|ci| match &ci {
1876 CacheItem::Rec(lln, v) => {
1877 let cii = lln.as_ref();
1878 Some((&cii.k, v))
1879 }
1880 CacheItem::Freq(lln, v) => {
1881 let cii = lln.as_ref();
1882 Some((&cii.k, v))
1883 }
1884 _ => None,
1885 })
1886 }
1887
1888 pub fn iter_rec(&self) -> impl Iterator<Item = &K> {
1891 self.cache.values().filter_map(|ci| match &ci {
1892 CacheItem::Rec(lln, _) => {
1893 let cii = lln.as_ref();
1894 Some(&cii.k)
1895 }
1896 _ => None,
1897 })
1898 }
1899
1900 pub fn iter_freq(&self) -> impl Iterator<Item = &K> {
1903 self.cache.values().filter_map(|ci| match &ci {
1904 CacheItem::Rec(lln, _) => {
1905 let cii = lln.as_ref();
1906 Some(&cii.k)
1907 }
1908 _ => None,
1909 })
1910 }
1911
1912 #[cfg(test)]
1913 pub(crate) fn iter_ghost_rec(&self) -> impl Iterator<Item = &K> {
1914 self.cache.values().filter_map(|ci| match &ci {
1915 CacheItem::GhostRec(lln) => {
1916 let cii = lln.as_ref();
1917 Some(&cii.k)
1918 }
1919 _ => None,
1920 })
1921 }
1922
1923 #[cfg(test)]
1924 pub(crate) fn iter_ghost_freq(&self) -> impl Iterator<Item = &K> {
1925 self.cache.values().filter_map(|ci| match &ci {
1926 CacheItem::GhostFreq(lln) => {
1927 let cii = lln.as_ref();
1928 Some(&cii.k)
1929 }
1930 _ => None,
1931 })
1932 }
1933
1934 #[cfg(test)]
1935 pub(crate) fn peek_hit(&self) -> &[u64] {
1936 let hit_ptr = self.hit.get();
1937 unsafe { &(*hit_ptr) }
1938 }
1939
1940 #[cfg(test)]
1941 pub(crate) fn peek_cache<Q: ?Sized>(&self, k: &Q) -> CacheState
1942 where
1943 K: Borrow<Q>,
1944 Q: Hash + Eq + Ord,
1945 {
1946 if let Some(v) = self.cache.get(k) {
1947 (*v).to_state()
1948 } else {
1949 CacheState::None
1950 }
1951 }
1952
1953 #[cfg(test)]
1954 pub(crate) fn peek_stat(&self) -> CStat {
1955 let inner = self.caller.inner.lock().unwrap();
1956 let shared = self.caller.shared.read().unwrap();
1957 CStat {
1958 max: shared.max,
1959 cache: self.cache.len(),
1960 tlocal: self.tlocal.len(),
1961 freq: inner.freq.len(),
1962 rec: inner.rec.len(),
1963 ghost_freq: inner.ghost_freq.len(),
1964 ghost_rec: inner.ghost_rec.len(),
1965 haunted: inner.haunted.len(),
1966 p: inner.p,
1967 }
1968 }
1969
1970 }
1972
1973impl<
1974 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
1975 V: Clone + Debug + Sync + Send + 'static,
1976 S: ARCacheReadStat + Clone,
1977 > ARCacheReadTxn<'_, K, V, S>
1978{
1979 pub fn get<Q>(&mut self, k: &Q) -> Option<&V>
1984 where
1985 K: Borrow<Q>,
1986 Q: Hash + Eq + Ord + ?Sized,
1987 {
1988 let k_hash: u64 = self.cache.prehash(k);
1989
1990 self.stats.cache_read();
1991 let mut hits = false;
1994 let mut tlocal_hits = false;
1995
1996 let r: Option<&V> = self
1997 .tlocal
1998 .as_ref()
1999 .and_then(|cache| {
2000 cache.set.get(k).map(|v| {
2001 tlocal_hits = true;
2003
2004 if self.above_watermark {
2005 let _ = self.hit_queue.push(CacheHitEvent {
2006 t: Instant::now(),
2007 k_hash,
2008 });
2009 }
2010 unsafe {
2011 let v = &v.as_ref().v as *const _;
2012 &(*v)
2014 }
2015 })
2016 })
2017 .or_else(|| {
2018 self.cache.get_prehashed(k, k_hash).and_then(|v| {
2019 (*v).to_vref().map(|vin| {
2020 hits = true;
2022
2023 if self.above_watermark {
2024 let _ = self.hit_queue.push(CacheHitEvent {
2025 t: Instant::now(),
2026 k_hash,
2027 });
2028 }
2029
2030 unsafe {
2031 let vin = vin as *const _;
2032 &(*vin)
2033 }
2034 })
2035 })
2036 });
2037
2038 if tlocal_hits {
2039 self.stats.cache_local_hit()
2040 } else if hits {
2041 self.stats.cache_main_hit()
2042 };
2043
2044 r
2045 }
2046
2047 pub fn contains_key<Q>(&mut self, k: &Q) -> bool
2049 where
2050 K: Borrow<Q>,
2051 Q: Hash + Eq + Ord + ?Sized,
2052 {
2053 self.get(k).is_some()
2054 }
2055
2056 pub fn insert_sized(&mut self, k: K, v: V, size: NonZeroUsize) {
2058 let mut v = v;
2059 let size = size.get();
2060 if self
2063 .inc_queue
2064 .push(CacheIncludeEvent {
2065 t: Instant::now(),
2066 k: k.clone(),
2067 v: v.clone(),
2068 txid: self.cache.get_txid(),
2069 size,
2070 })
2071 .is_ok()
2072 {
2073 self.stats.include();
2074 } else {
2075 self.stats.failed_include();
2076 }
2077
2078 if let Some(ref mut cache) = self.tlocal {
2080 self.stats.local_include();
2081 let n = if cache.tlru.len() >= cache.read_size {
2082 let mut owned = cache.tlru.pop();
2083 let mut k_clone = k.clone();
2085 mem::swap(&mut k_clone, &mut owned.as_mut().k);
2086 mem::swap(&mut v, &mut owned.as_mut().v);
2087 cache.set.remove(&k_clone);
2089 cache.tlru.append_n(owned)
2091 } else {
2092 cache.tlru.append_k(ReadCacheItem {
2094 k: k.clone(),
2095 v,
2096 size,
2097 })
2098 };
2099 let r = cache.set.insert(k, n);
2100 assert!(r.is_none());
2102 }
2103 }
2104
2105 pub fn insert(&mut self, k: K, v: V) {
2116 self.insert_sized(k, v, unsafe { NonZeroUsize::new_unchecked(1) })
2117 }
2118
2119 pub fn finish(self) -> S {
2121 let stats = self.stats.clone();
2122 drop(self);
2123
2124 stats
2125 }
2126}
2127
2128impl<
2129 K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
2130 V: Clone + Debug + Sync + Send + 'static,
2131 S: ARCacheReadStat + Clone,
2132 > Drop for ARCacheReadTxn<'_, K, V, S>
2133{
2134 fn drop(&mut self) {
2135 if self.reader_quiesce {
2137 self.caller.try_quiesce();
2138 }
2139 }
2140}
2141
2142#[cfg(test)]
2143mod tests {
2144 use super::stats::{TraceStat, WriteCountStat};
2145 use super::ARCache;
2146 use super::ARCacheBuilder;
2147 use super::CStat;
2148 use super::CacheState;
2149 use std::num::NonZeroUsize;
2150 use std::sync::Arc;
2151 use std::thread;
2152
2153 use std::sync::atomic::{AtomicBool, Ordering};
2154
2155 #[test]
2156 fn test_cache_arc_basic() {
2157 let arc: ARCache<usize, usize> = ARCacheBuilder::new()
2158 .set_size(4, 4)
2159 .build()
2160 .expect("Invalid cache parameters!");
2161 let mut wr_txn = arc.write();
2162
2163 assert!(wr_txn.get(&1).is_none());
2164 assert!(wr_txn.peek_hit().is_empty());
2165 wr_txn.insert(1, 1);
2166 assert!(wr_txn.get(&1) == Some(&1));
2167 assert!(wr_txn.peek_hit().len() == 1);
2168
2169 wr_txn.commit();
2170
2171 let mut wr_txn = arc.write();
2173 assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
2174 assert!(wr_txn.get(&1) == Some(&1));
2175 assert!(wr_txn.peek_hit().len() == 1);
2176 wr_txn.commit();
2177 let wr_txn = arc.write();
2179 assert!(wr_txn.peek_cache(&1) == CacheState::Freq);
2180 println!("{:?}", wr_txn.peek_stat());
2181 }
2182
2183 #[test]
2184 fn test_cache_evict() {
2185 let _ = tracing_subscriber::fmt::try_init();
2186 println!("== 1");
2187 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2188 .set_size(4, 4)
2189 .build()
2190 .expect("Invalid cache parameters!");
2191 let stats = TraceStat {};
2192
2193 let mut wr_txn = arc.write_stats(stats);
2194 assert!(
2195 CStat {
2196 max: 4,
2197 cache: 0,
2198 tlocal: 0,
2199 freq: 0,
2200 rec: 0,
2201 ghost_freq: 0,
2202 ghost_rec: 0,
2203 haunted: 0,
2204 p: 0
2205 } == wr_txn.peek_stat()
2206 );
2207
2208 wr_txn.insert(1, 1);
2210 wr_txn.insert(2, 2);
2211 wr_txn.insert(3, 3);
2212 wr_txn.insert(4, 4);
2213
2214 assert!(
2215 CStat {
2216 max: 4,
2217 cache: 0,
2218 tlocal: 4,
2219 freq: 0,
2220 rec: 0,
2221 ghost_freq: 0,
2222 ghost_rec: 0,
2223 haunted: 0,
2224 p: 0
2225 } == wr_txn.peek_stat()
2226 );
2227 let stats = wr_txn.commit();
2228
2229 println!("== 2");
2231 let mut wr_txn = arc.write_stats(stats);
2232 assert!(
2233 CStat {
2234 max: 4,
2235 cache: 4,
2236 tlocal: 0,
2237 freq: 0,
2238 rec: 4,
2239 ghost_freq: 0,
2240 ghost_rec: 0,
2241 haunted: 0,
2242 p: 0
2243 } == wr_txn.peek_stat()
2244 );
2245
2246 assert!(wr_txn.get(&1) == Some(&1));
2249 assert!(wr_txn.get(&1) == Some(&1));
2250 assert!(wr_txn.get(&2) == Some(&2));
2251
2252 let stats = wr_txn.commit();
2253
2254 println!("== 3");
2256 let mut wr_txn = arc.write_stats(stats);
2257 assert!(
2258 CStat {
2259 max: 4,
2260 cache: 4,
2261 tlocal: 0,
2262 freq: 2,
2263 rec: 2,
2264 ghost_freq: 0,
2265 ghost_rec: 0,
2266 haunted: 0,
2267 p: 0
2268 } == wr_txn.peek_stat()
2269 );
2270 wr_txn.insert(5, 5);
2272 let stats = wr_txn.commit();
2273
2274 println!("== 4");
2276 let mut wr_txn = arc.write_stats(stats);
2277 println!("stat -> {:?}", wr_txn.peek_stat());
2278 assert!(
2279 CStat {
2280 max: 4,
2281 cache: 5,
2282 tlocal: 0,
2283 freq: 2,
2284 rec: 2,
2285 ghost_freq: 0,
2286 ghost_rec: 1,
2287 haunted: 0,
2288 p: 0
2289 } == wr_txn.peek_stat()
2290 );
2291 let rec_set: Vec<usize> = wr_txn.iter_rec().take(2).copied().collect();
2298 assert!(wr_txn.get(&rec_set[0]) == Some(&rec_set[0]));
2299 assert!(wr_txn.get(&rec_set[1]) == Some(&rec_set[1]));
2300
2301 let stats = wr_txn.commit();
2302
2303 println!("== 5");
2305 let mut wr_txn = arc.write_stats(stats);
2306 println!("stat -> {:?}", wr_txn.peek_stat());
2307 assert!(
2308 CStat {
2309 max: 4,
2310 cache: 5,
2311 tlocal: 0,
2312 freq: 4,
2313 rec: 0,
2314 ghost_freq: 0,
2315 ghost_rec: 1,
2316 haunted: 0,
2317 p: 0
2318 } == wr_txn.peek_stat()
2319 );
2320 let grec: usize = wr_txn.iter_ghost_rec().take(1).copied().next().unwrap();
2327 wr_txn.insert(grec, grec);
2328 assert!(wr_txn.get(&grec) == Some(&grec));
2329 let stats = wr_txn.commit();
2332
2333 println!("== 6");
2335 let mut wr_txn = arc.write_stats(stats);
2336 println!("stat -> {:?}", wr_txn.peek_stat());
2337 assert!(
2338 CStat {
2339 max: 4,
2340 cache: 5,
2341 tlocal: 0,
2342 freq: 3,
2343 rec: 1,
2344 ghost_freq: 1,
2345 ghost_rec: 0,
2346 haunted: 0,
2347 p: 1
2348 } == wr_txn.peek_stat()
2349 );
2350 assert!(wr_txn.peek_cache(&grec) == CacheState::Rec);
2354
2355 wr_txn.insert(10, 10);
2359 wr_txn.insert(11, 11);
2360 wr_txn.insert(12, 12);
2361 let stats = wr_txn.commit();
2362
2363 println!("== 7");
2364 let mut wr_txn = arc.write_stats(stats);
2365 println!("stat -> {:?}", wr_txn.peek_stat());
2366 assert!(
2367 CStat {
2368 max: 4,
2369 cache: 8,
2370 tlocal: 0,
2371 freq: 3,
2372 rec: 1,
2373 ghost_freq: 1,
2374 ghost_rec: 3,
2375 haunted: 0,
2376 p: 1
2377 } == wr_txn.peek_stat()
2378 );
2379 let grec_set: Vec<usize> = wr_txn.iter_ghost_rec().take(3).copied().collect();
2385 println!("{:?}", grec_set);
2386
2387 grec_set
2388 .iter()
2389 .for_each(|i| println!("{:?}", wr_txn.peek_cache(i)));
2390
2391 grec_set.iter().for_each(|i| wr_txn.insert(*i, *i));
2392
2393 grec_set
2394 .iter()
2395 .for_each(|i| println!("{:?}", wr_txn.peek_cache(i)));
2396 wr_txn.commit();
2397
2398 println!("== 8");
2399 let mut wr_txn = arc.write();
2400 println!("stat -> {:?}", wr_txn.peek_stat());
2401 assert!(
2402 CStat {
2403 max: 4,
2404 cache: 8,
2405 tlocal: 0,
2406 freq: 0,
2407 rec: 4,
2408 ghost_freq: 4,
2409 ghost_rec: 0,
2410 haunted: 0,
2411 p: 4
2412 } == wr_txn.peek_stat()
2413 );
2414
2415 grec_set
2416 .iter()
2417 .for_each(|i| println!("{:?}", wr_txn.peek_cache(i)));
2418 grec_set
2419 .iter()
2420 .for_each(|i| assert!(wr_txn.peek_cache(i) == CacheState::Rec));
2421
2422 let gfreq_set: Vec<usize> = wr_txn.iter_ghost_freq().take(4).copied().collect();
2424
2425 gfreq_set.iter().for_each(|i| wr_txn.insert(*i, *i));
2426 wr_txn.commit();
2427
2428 println!("== 9");
2429 let wr_txn = arc.write();
2430 println!("stat -> {:?}", wr_txn.peek_stat());
2431 assert!(
2432 CStat {
2433 max: 4,
2434 cache: 8,
2435 tlocal: 0,
2436 freq: 4,
2437 rec: 0,
2438 ghost_freq: 0,
2439 ghost_rec: 4,
2440 haunted: 0,
2441 p: 0
2442 } == wr_txn.peek_stat()
2443 );
2444 gfreq_set
2447 .iter()
2448 .for_each(|i| assert!(wr_txn.peek_cache(i) == CacheState::Freq));
2449
2450 let () = wr_txn.commit();
2452 }
2456
2457 #[test]
2458 fn test_cache_concurrent_basic() {
2459 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2462 .set_size(4, 4)
2463 .build()
2464 .expect("Invalid cache parameters!");
2465 {
2467 let mut rd_txn = arc.read();
2468 rd_txn.insert(1, 1);
2470 rd_txn.insert(2, 2);
2471 rd_txn.insert(3, 3);
2472 rd_txn.insert(4, 4);
2473 }
2480 arc.try_quiesce();
2481 println!("== 2");
2483 let wr_txn = arc.write();
2484 println!("{:?}", wr_txn.peek_stat());
2485 assert!(
2486 CStat {
2487 max: 4,
2488 cache: 4,
2489 tlocal: 0,
2490 freq: 0,
2491 rec: 4,
2492 ghost_freq: 0,
2493 ghost_rec: 0,
2494 haunted: 0,
2495 p: 0
2496 } == wr_txn.peek_stat()
2497 );
2498 assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
2499 assert!(wr_txn.peek_cache(&2) == CacheState::Rec);
2500 assert!(wr_txn.peek_cache(&3) == CacheState::Rec);
2501 assert!(wr_txn.peek_cache(&4) == CacheState::Rec);
2502 {
2506 let mut rd_txn = arc.read();
2507 assert!(rd_txn.get(&3) == Some(&3));
2509 assert!(rd_txn.get(&4) == Some(&4));
2510 rd_txn.insert(5, 5);
2511 rd_txn.insert(6, 6);
2512 }
2514 wr_txn.commit();
2516 println!("== 3");
2517 let wr_txn = arc.write();
2518 assert!(
2519 CStat {
2520 max: 4,
2521 cache: 6,
2522 tlocal: 0,
2523 freq: 2,
2524 rec: 2,
2525 ghost_freq: 0,
2526 ghost_rec: 2,
2527 haunted: 0,
2528 p: 0
2529 } == wr_txn.peek_stat()
2530 );
2531 assert!(wr_txn.peek_cache(&1) == CacheState::GhostRec);
2532 assert!(wr_txn.peek_cache(&2) == CacheState::GhostRec);
2533 assert!(wr_txn.peek_cache(&3) == CacheState::Freq);
2534 assert!(wr_txn.peek_cache(&4) == CacheState::Freq);
2535 assert!(wr_txn.peek_cache(&5) == CacheState::Rec);
2536 assert!(wr_txn.peek_cache(&6) == CacheState::Rec);
2537
2538 {
2540 let mut rd_txn = arc.read();
2541 rd_txn.insert(1, 1);
2543 rd_txn.insert(2, 2);
2544 }
2546
2547 wr_txn.commit();
2548 println!("== 4");
2549 let wr_txn = arc.write();
2550 assert!(
2551 CStat {
2552 max: 4,
2553 cache: 6,
2554 tlocal: 0,
2555 freq: 2,
2556 rec: 2,
2557 ghost_freq: 0,
2558 ghost_rec: 2,
2559 haunted: 0,
2560 p: 2
2561 } == wr_txn.peek_stat()
2562 );
2563 assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
2564 assert!(wr_txn.peek_cache(&2) == CacheState::Rec);
2565 assert!(wr_txn.peek_cache(&3) == CacheState::Freq);
2566 assert!(wr_txn.peek_cache(&4) == CacheState::Freq);
2567 assert!(wr_txn.peek_cache(&5) == CacheState::GhostRec);
2568 assert!(wr_txn.peek_cache(&6) == CacheState::GhostRec);
2569 }
2577
2578 #[test]
2581 fn test_cache_concurrent_cursed_1() {
2582 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2589 .set_size(4, 4)
2590 .build()
2591 .expect("Invalid cache parameters!");
2592
2593 let mut wr_txn = arc.write();
2595 let mut rd_txn = arc.read();
2597 wr_txn.insert(1, 1);
2599
2600 assert!(rd_txn.get(&1).is_none());
2602
2603 wr_txn.commit();
2605 assert!(rd_txn.get(&1).is_none());
2607 let mut wr_txn = arc.write();
2609 wr_txn.insert(10, 1);
2611 wr_txn.insert(11, 1);
2612 wr_txn.insert(12, 1);
2613 wr_txn.insert(13, 1);
2614 wr_txn.insert(14, 1);
2615 wr_txn.insert(15, 1);
2616 wr_txn.insert(16, 1);
2617 wr_txn.insert(17, 1);
2618 wr_txn.commit();
2620
2621 let wr_txn = arc.write();
2623 assert!(wr_txn.peek_cache(&1) == CacheState::Haunted);
2625 assert!(rd_txn.get(&1).is_none());
2627 rd_txn.insert(1, 100);
2629 wr_txn.commit();
2631
2632 let wr_txn = arc.write();
2634 assert!(wr_txn.peek_cache(&1) == CacheState::Haunted);
2636 assert!(rd_txn.get(&1) == Some(&100));
2638 }
2640
2641 #[test]
2642 fn test_cache_clear() {
2643 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2644 .set_size(4, 4)
2645 .build()
2646 .expect("Invalid cache parameters!");
2647
2648 let mut wr_txn = arc.write();
2650 wr_txn.insert(10, 10);
2652 wr_txn.insert(11, 11);
2653 wr_txn.insert(12, 12);
2654 wr_txn.insert(13, 13);
2655 wr_txn.insert(14, 14);
2656 wr_txn.insert(15, 15);
2657 wr_txn.insert(16, 16);
2658 wr_txn.insert(17, 17);
2659 wr_txn.commit();
2660 let mut wr_txn = arc.write();
2662
2663 let rec_set: Vec<usize> = wr_txn.iter_rec().take(2).copied().collect();
2665 println!("{:?}", rec_set);
2666 assert!(wr_txn.get(&rec_set[0]) == Some(&rec_set[0]));
2667 assert!(wr_txn.get(&rec_set[1]) == Some(&rec_set[1]));
2668
2669 wr_txn.commit();
2671 let mut wr_txn = arc.write();
2673 println!("stat -> {:?}", wr_txn.peek_stat());
2674 assert!(
2675 CStat {
2676 max: 4,
2677 cache: 8,
2678 tlocal: 0,
2679 freq: 2,
2680 rec: 2,
2681 ghost_freq: 0,
2682 ghost_rec: 4,
2683 haunted: 0,
2684 p: 0
2685 } == wr_txn.peek_stat()
2686 );
2687
2688 wr_txn.clear();
2690 wr_txn.commit();
2692 let wr_txn = arc.write();
2694 println!("stat -> {:?}", wr_txn.peek_stat());
2696 assert!(
2698 CStat {
2699 max: 4,
2700 cache: 8,
2701 tlocal: 0,
2702 freq: 0,
2703 rec: 0,
2704 ghost_freq: 2,
2705 ghost_rec: 6,
2706 haunted: 0,
2707 p: 0
2708 } == wr_txn.peek_stat()
2709 );
2710 }
2713
2714 #[test]
2715 fn test_cache_clear_rollback() {
2716 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2717 .set_size(4, 4)
2718 .build()
2719 .expect("Invalid cache parameters!");
2720
2721 let mut wr_txn = arc.write();
2723 wr_txn.insert(10, 10);
2725 wr_txn.insert(11, 11);
2726 wr_txn.insert(12, 12);
2727 wr_txn.insert(13, 13);
2728 wr_txn.insert(14, 14);
2729 wr_txn.insert(15, 15);
2730 wr_txn.insert(16, 16);
2731 wr_txn.insert(17, 17);
2732 wr_txn.commit();
2733 let mut wr_txn = arc.write();
2735 let rec_set: Vec<usize> = wr_txn.iter_rec().take(2).copied().collect();
2736 println!("{:?}", rec_set);
2737 let r = wr_txn.get(&rec_set[0]);
2738 println!("{:?}", r);
2739 assert!(r == Some(&rec_set[0]));
2740 assert!(wr_txn.get(&rec_set[1]) == Some(&rec_set[1]));
2741
2742 wr_txn.commit();
2744 let mut wr_txn = arc.write();
2746 println!("stat -> {:?}", wr_txn.peek_stat());
2747 assert!(
2748 CStat {
2749 max: 4,
2750 cache: 8,
2751 tlocal: 0,
2752 freq: 2,
2753 rec: 2,
2754 ghost_freq: 0,
2755 ghost_rec: 4,
2756 haunted: 0,
2757 p: 0
2758 } == wr_txn.peek_stat()
2759 );
2760
2761 wr_txn.clear();
2763 drop(wr_txn);
2765 let wr_txn = arc.write();
2767 println!("stat -> {:?}", wr_txn.peek_stat());
2768 assert!(
2769 CStat {
2770 max: 4,
2771 cache: 8,
2772 tlocal: 0,
2773 freq: 2,
2774 rec: 2,
2775 ghost_freq: 0,
2776 ghost_rec: 4,
2777 haunted: 0,
2778 p: 0
2779 } == wr_txn.peek_stat()
2780 );
2781 }
2782
2783 #[test]
2784 fn test_cache_clear_cursed() {
2785 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2786 .set_size(4, 4)
2787 .build()
2788 .expect("Invalid cache parameters!");
2789 let mut wr_txn = arc.write();
2792 wr_txn.insert(10, 1);
2793 wr_txn.commit();
2794 let wr_txn = arc.write();
2796 assert!(wr_txn.peek_cache(&10) == CacheState::Rec);
2797 wr_txn.commit();
2798 let mut rd_txn = arc.read();
2801 let mut wr_txn = arc.write();
2804 wr_txn.clear();
2805 wr_txn.commit();
2807
2808 assert!(rd_txn.get(&10) == Some(&1));
2811 rd_txn.insert(11, 1);
2812 std::mem::drop(rd_txn);
2814 arc.try_quiesce();
2816 let wr_txn = arc.write();
2821 assert!(wr_txn.peek_cache(&10) == CacheState::GhostRec);
2822 println!("--> {:?}", wr_txn.peek_cache(&11));
2823 assert!(wr_txn.peek_cache(&11) == CacheState::None);
2824 }
2825
2826 #[test]
2827 fn test_cache_p_weight_zero_churn() {
2828 let arc: ARCache<usize, usize> = ARCacheBuilder::new()
2829 .set_size(4, 4)
2830 .set_watermark(0)
2831 .build()
2832 .expect("Invalid cache parameters!");
2833
2834 let mut wr_txn = arc.write();
2835
2836 wr_txn.insert(1, 1);
2838 wr_txn.insert(2, 2);
2839 wr_txn.insert(3, 3);
2840 wr_txn.insert(4, 4);
2841 assert_eq!(wr_txn.get(&1), Some(&1));
2842 assert_eq!(wr_txn.get(&2), Some(&2));
2843 assert_eq!(wr_txn.get(&3), Some(&3));
2844 assert_eq!(wr_txn.get(&4), Some(&4));
2845
2846 assert_eq!(wr_txn.peek_stat().p, 0);
2847 wr_txn.commit();
2848
2849 let mut wr_txn = arc.write();
2851 assert_eq!(wr_txn.get(&1), Some(&1));
2852 assert_eq!(wr_txn.get(&2), Some(&2));
2853 assert_eq!(wr_txn.get(&3), Some(&3));
2854 assert_eq!(wr_txn.get(&4), Some(&4));
2855
2856 assert_eq!(wr_txn.peek_stat().p, 0);
2857 wr_txn.commit();
2858
2859 let mut wr_txn = arc.write();
2861 wr_txn.insert(100, 100);
2863 println!("b {:?}", wr_txn.peek_stat());
2864 assert_eq!(wr_txn.peek_stat().p, 0);
2865 wr_txn.commit();
2866
2867 let mut wr_txn = arc.write();
2869 assert_eq!(wr_txn.peek_stat().p, 0);
2870 assert_eq!(wr_txn.peek_stat().ghost_rec, 1);
2871
2872 wr_txn.insert(100, 100);
2874 wr_txn.commit();
2875
2876 let wr_txn = arc.write();
2877 println!("c {:?}", wr_txn.peek_stat());
2878 assert_eq!(wr_txn.peek_stat().p, 1);
2879 assert_eq!(wr_txn.peek_stat().ghost_rec, 0);
2880 assert_eq!(wr_txn.peek_stat().ghost_freq, 1);
2881 wr_txn.commit();
2882
2883 let mut wr_txn = arc.write();
2885 assert_eq!(wr_txn.get(&1), None);
2886 wr_txn.insert(1, 1);
2888 wr_txn.commit();
2889
2890 let wr_txn = arc.write();
2891 println!("d {:?}", wr_txn.peek_stat());
2892 assert_eq!(wr_txn.peek_stat().p, 0);
2894 assert_eq!(wr_txn.peek_stat().ghost_rec, 1);
2895 assert_eq!(wr_txn.peek_stat().ghost_freq, 0);
2896 wr_txn.commit();
2897 }
2898
2899 #[test]
2900 fn test_cache_haunted_bounds() {
2901 let arc: ARCache<usize, usize> = ARCacheBuilder::new()
2902 .set_size(4, 4)
2903 .set_watermark(0)
2904 .set_look_back_limit(4)
2905 .build()
2906 .expect("Invalid cache parameters!");
2907
2908 let mut wr_txn = arc.write();
2909
2910 wr_txn.remove(1);
2914 wr_txn.commit();
2915
2916 for _i in 0..5 {
2918 let wr_txn = arc.write();
2919 println!("l {:?}", wr_txn.peek_stat());
2920 assert_eq!(wr_txn.peek_stat().haunted, 1);
2921 assert!(wr_txn.peek_cache(&1) == CacheState::Haunted);
2922 wr_txn.commit();
2923 }
2924
2925 let wr_txn = arc.write();
2927 println!("d {:?}", wr_txn.peek_stat());
2928 assert_eq!(wr_txn.peek_stat().haunted, 0);
2929 assert_eq!(wr_txn.peek_cache(&1), CacheState::None);
2930 wr_txn.commit();
2931 }
2932
2933 #[test]
2934 fn test_cache_dirty_write() {
2935 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2936 .set_size(4, 4)
2937 .build()
2938 .expect("Invalid cache parameters!");
2939 let mut wr_txn = arc.write();
2940 wr_txn.insert_dirty(10, 1);
2941 wr_txn.iter_mut_mark_clean().for_each(|(_k, _v)| {});
2942 wr_txn.commit();
2943 }
2944
2945 #[test]
2946 fn test_cache_read_no_tlocal() {
2947 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2950 .set_size(4, 0)
2951 .build()
2952 .expect("Invalid cache parameters!");
2953 {
2955 let mut rd_txn = arc.read();
2956 rd_txn.insert(1, 1);
2958 rd_txn.insert(2, 2);
2959 rd_txn.insert(3, 3);
2960 rd_txn.insert(4, 4);
2961 assert!(rd_txn.get(&1).is_none());
2964 assert!(rd_txn.get(&2).is_none());
2965 assert!(rd_txn.get(&3).is_none());
2966 assert!(rd_txn.get(&4).is_none());
2967 }
2968 arc.try_quiesce();
2969 println!("== 2");
2971 let wr_txn = arc.write();
2972 assert!(
2973 CStat {
2974 max: 4,
2975 cache: 4,
2976 tlocal: 0,
2977 freq: 0,
2978 rec: 4,
2979 ghost_freq: 0,
2980 ghost_rec: 0,
2981 haunted: 0,
2982 p: 0
2983 } == wr_txn.peek_stat()
2984 );
2985 assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
2986 assert!(wr_txn.peek_cache(&2) == CacheState::Rec);
2987 assert!(wr_txn.peek_cache(&3) == CacheState::Rec);
2988 assert!(wr_txn.peek_cache(&4) == CacheState::Rec);
2989 }
2995
2996 #[derive(Clone, Debug)]
2997 struct Weighted {
2998 _i: u64,
2999 }
3000
3001 #[test]
3002 fn test_cache_weighted() {
3003 let arc: ARCache<usize, Weighted> = ARCacheBuilder::default()
3004 .set_size(4, 0)
3005 .build()
3006 .expect("Invalid cache parameters!");
3007 let mut wr_txn = arc.write();
3008
3009 assert!(
3010 CStat {
3011 max: 4,
3012 cache: 0,
3013 tlocal: 0,
3014 freq: 0,
3015 rec: 0,
3016 ghost_freq: 0,
3017 ghost_rec: 0,
3018 haunted: 0,
3019 p: 0
3020 } == wr_txn.peek_stat()
3021 );
3022
3023 wr_txn.insert_sized(1, Weighted { _i: 1 }, NonZeroUsize::new(2).unwrap());
3025 wr_txn.insert_sized(2, Weighted { _i: 2 }, NonZeroUsize::new(2).unwrap());
3026
3027 assert!(
3028 CStat {
3029 max: 4,
3030 cache: 0,
3031 tlocal: 2,
3032 freq: 0,
3033 rec: 0,
3034 ghost_freq: 0,
3035 ghost_rec: 0,
3036 haunted: 0,
3037 p: 0
3038 } == wr_txn.peek_stat()
3039 );
3040 wr_txn.commit();
3041
3042 let wr_txn = arc.write();
3045 assert!(
3046 CStat {
3047 max: 4,
3048 cache: 2,
3049 tlocal: 0,
3050 freq: 0,
3051 rec: 4,
3052 ghost_freq: 0,
3053 ghost_rec: 0,
3054 haunted: 0,
3055 p: 0
3056 } == wr_txn.peek_stat()
3057 );
3058 wr_txn.commit();
3059
3060 let mut wr_txn = arc.write();
3062 wr_txn.get(&1);
3063 wr_txn.commit();
3064
3065 let mut wr_txn = arc.write();
3066 assert!(
3067 CStat {
3068 max: 4,
3069 cache: 2,
3070 tlocal: 0,
3071 freq: 2,
3072 rec: 2,
3073 ghost_freq: 0,
3074 ghost_rec: 0,
3075 haunted: 0,
3076 p: 0
3077 } == wr_txn.peek_stat()
3078 );
3079
3080 wr_txn.insert_sized(3, Weighted { _i: 3 }, NonZeroUsize::new(2).unwrap());
3081 wr_txn.insert_sized(4, Weighted { _i: 4 }, NonZeroUsize::new(2).unwrap());
3082 wr_txn.commit();
3083
3084 let wr_txn = arc.write();
3086 assert!(
3087 CStat {
3088 max: 4,
3089 cache: 4,
3090 tlocal: 0,
3091 freq: 2,
3092 rec: 2,
3093 ghost_freq: 0,
3094 ghost_rec: 4,
3095 haunted: 0,
3096 p: 0
3097 } == wr_txn.peek_stat()
3098 );
3099 wr_txn.commit();
3100 }
3101
3102 #[test]
3103 fn test_cache_stats_reload() {
3104 let _ = tracing_subscriber::fmt::try_init();
3105
3106 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
3108 .set_size(4, 0)
3109 .build()
3110 .expect("Invalid cache parameters!");
3111
3112 let stats = WriteCountStat::default();
3113
3114 let mut wr_txn = arc.write_stats(stats);
3115 wr_txn.insert(1, 1);
3116 let stats = wr_txn.commit();
3117
3118 tracing::trace!("stats 1: {:?}", stats);
3119 }
3120
3121 #[test]
3122 fn test_cache_mut_inplace() {
3123 let arc: ARCache<usize, usize> = ARCacheBuilder::default()
3125 .set_size(4, 0)
3126 .build()
3127 .expect("Invalid cache parameters!");
3128 let mut wr_txn = arc.write();
3129
3130 assert!(wr_txn.get_mut(&1, false).is_none());
3131 wr_txn.insert(1, 1);
3133 {
3134 let mref = wr_txn.get_mut(&1, false).unwrap();
3135 *mref = 2;
3136 }
3137 assert!(wr_txn.get_mut(&1, false) == Some(&mut 2));
3138 wr_txn.commit();
3139
3140 let mut wr_txn = arc.write();
3142 {
3143 let mref = wr_txn.get_mut(&1, false).unwrap();
3144 *mref = 3;
3145 }
3146 assert!(wr_txn.get_mut(&1, false) == Some(&mut 3));
3147 wr_txn.commit();
3148
3149 let mut wr_txn = arc.write();
3151 wr_txn.remove(1);
3152 assert!(wr_txn.get_mut(&1, false).is_none());
3153 wr_txn.commit();
3154 }
3155
3156 #[allow(dead_code)]
3157
3158 pub static RUNNING: AtomicBool = AtomicBool::new(false);
3159
3160 #[cfg(test)]
3161 fn multi_thread_worker(arc: Arc<ARCache<Box<u32>, Box<u32>>>) {
3162 while RUNNING.load(Ordering::Relaxed) {
3163 let mut rd_txn = arc.read();
3164
3165 for _i in 0..128 {
3166 let x = rand::random::<u32>();
3167
3168 if rd_txn.get(&x).is_none() {
3169 rd_txn.insert(Box::new(x), Box::new(x))
3170 }
3171 }
3172 }
3173 }
3174
3175 #[allow(dead_code)]
3176 #[cfg_attr(miri, ignore)]
3177 #[cfg_attr(feature = "dhat-heap", test)]
3178 #[cfg(test)]
3179 fn test_cache_stress_1() {
3180 #[cfg(feature = "dhat-heap")]
3181 let _profiler = dhat::Profiler::builder().trim_backtraces(None).build();
3182
3183 let arc: Arc<ARCache<Box<u32>, Box<u32>>> = Arc::new(
3184 ARCacheBuilder::default()
3185 .set_size(64, 4)
3186 .build()
3187 .expect("Invalid cache parameters!"),
3188 );
3189
3190 let thread_count = 4;
3191
3192 RUNNING.store(true, Ordering::Relaxed);
3193
3194 let handles: Vec<_> = (0..thread_count)
3195 .into_iter()
3196 .map(|_| {
3197 let cache = arc.clone();
3199 thread::spawn(move || multi_thread_worker(cache))
3200 })
3201 .collect();
3202
3203 for x in 0..1024 {
3204 let mut wr_txn = arc.write();
3205
3206 if wr_txn.get(&x).is_none() {
3207 wr_txn.insert(Box::new(x), Box::new(x))
3208 }
3209
3210 wr_txn.commit();
3211 }
3212
3213 RUNNING.store(false, Ordering::Relaxed);
3214
3215 for handle in handles {
3216 handle.join().unwrap();
3217 }
3218
3219 drop(arc);
3220 }
3221
3222 #[test]
3223 fn test_set_expected_workload_negative() {
3224 let _arc: Arc<ARCache<Box<u32>, Box<u32>>> = Arc::new(
3225 ARCacheBuilder::default()
3226 .set_expected_workload(256, 31, 8, 16, true)
3227 .build()
3228 .expect("Invalid cache parameters!"),
3229 );
3230 }
3231}