concread/arcache/
mod.rs

1//! ARCache - A concurrently readable adaptive replacement cache.
2//!
3//! An ARCache is used in place of a `RwLock<LruCache>` or `Mutex<LruCache>`.
4//! This structure is transactional, meaning that readers have guaranteed
5//! point-in-time views of the cache and their items, while allowing writers
6//! to proceed with inclusions and cache state management in parallel.
7//!
8//! This means that unlike a `RwLock` which can have many readers OR one writer
9//! this cache is capable of many readers, over multiple data generations AND
10//! writers that are serialised. This formally means that this is an ACID
11//! compliant Cache.
12
13mod ll;
14/// Stats collection for [ARCache]
15pub mod stats;
16
17use self::ll::{LLNodeRef, LLWeight, LL};
18use self::stats::{ARCacheReadStat, ARCacheWriteStat};
19
20#[cfg(feature = "arcache-is-hashmap")]
21use crate::hashmap::{
22    HashMap as DataMap, HashMapReadTxn as DataMapReadTxn, HashMapWriteTxn as DataMapWriteTxn,
23};
24
25#[cfg(feature = "arcache-is-hashtrie")]
26use crate::hashtrie::{
27    HashTrie as DataMap, HashTrieReadTxn as DataMapReadTxn, HashTrieWriteTxn as DataMapWriteTxn,
28};
29
30use crossbeam_queue::ArrayQueue;
31use std::collections::HashMap as Map;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::Arc;
34use std::sync::{Mutex, RwLock};
35
36use std::borrow::Borrow;
37use std::cell::UnsafeCell;
38use std::fmt::Debug;
39use std::hash::Hash;
40use std::mem;
41use std::num::NonZeroUsize;
42use std::ops::Deref;
43use std::ops::DerefMut;
44use std::time::Instant;
45
46use tracing::trace;
47
48const READ_THREAD_CACHE_RATIO: isize = 8;
49const WRITE_THREAD_CACHE_RATIO: isize = 4;
50
51const WRITE_THREAD_CHANNEL_SIZE: usize = 64;
52const READ_THREAD_CHANNEL_SIZE: usize = 64;
53
54const TXN_LOOKBACK_LIMIT_DEFAULT: u8 = 32;
55const TXN_LOOKBACK_LIMIT_ABS_MIN: u8 = 4;
56
57const WATERMARK_DISABLE_MIN: usize = 128;
58
59const WATERMARK_DISABLE_DIVISOR: usize = 20;
60const WATERMARK_DISABLE_RATIO: usize = 18;
61
62enum ThreadCacheItem<V> {
63    Present(V, bool, usize),
64    Removed(bool),
65}
66
67struct CacheHitEvent {
68    t: Instant,
69    k_hash: u64,
70}
71
72struct CacheIncludeEvent<K, V> {
73    t: Instant,
74    k: K,
75    v: V,
76    txid: u64,
77    size: usize,
78}
79
80#[derive(Hash, Ord, PartialOrd, Eq, PartialEq, Clone, Debug)]
81struct CacheItemInner<K>
82where
83    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
84{
85    k: K,
86    txid: u64,
87    size: usize,
88}
89
90impl<K> LLWeight for CacheItemInner<K>
91where
92    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
93{
94    #[inline]
95    fn ll_weight(&self) -> usize {
96        self.size
97    }
98}
99
100#[derive(Clone, Debug)]
101enum CacheItem<K, V>
102where
103    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
104{
105    Freq(LLNodeRef<CacheItemInner<K>>, V),
106    Rec(LLNodeRef<CacheItemInner<K>>, V),
107    GhostFreq(LLNodeRef<CacheItemInner<K>>),
108    GhostRec(LLNodeRef<CacheItemInner<K>>),
109    Haunted(LLNodeRef<CacheItemInner<K>>),
110}
111
112unsafe impl<
113        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
114        V: Clone + Debug + Sync + Send + 'static,
115    > Send for CacheItem<K, V>
116{
117}
118unsafe impl<
119        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
120        V: Clone + Debug + Sync + Send + 'static,
121    > Sync for CacheItem<K, V>
122{
123}
124
125#[cfg(test)]
126#[derive(Clone, Debug, PartialEq)]
127pub(crate) enum CacheState {
128    Freq,
129    Rec,
130    GhostFreq,
131    GhostRec,
132    Haunted,
133    None,
134}
135
136#[cfg(test)]
137#[derive(Debug, PartialEq)]
138pub(crate) struct CStat {
139    max: usize,
140    cache: usize,
141    tlocal: usize,
142    freq: usize,
143    rec: usize,
144    ghost_freq: usize,
145    ghost_rec: usize,
146    haunted: usize,
147    p: usize,
148}
149
150struct ArcInner<K, V>
151where
152    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
153    V: Clone + Debug + Sync + Send + 'static,
154{
155    /// Weight of items between the two caches.
156    p: usize,
157    freq: LL<CacheItemInner<K>>,
158    rec: LL<CacheItemInner<K>>,
159    ghost_freq: LL<CacheItemInner<K>>,
160    ghost_rec: LL<CacheItemInner<K>>,
161    haunted: LL<CacheItemInner<K>>,
162    hit_queue: Arc<ArrayQueue<CacheHitEvent>>,
163    inc_queue: Arc<ArrayQueue<CacheIncludeEvent<K, V>>>,
164    min_txid: u64,
165}
166
167struct ArcShared<K, V>
168where
169    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
170    V: Clone + Debug + Sync + Send + 'static,
171{
172    // Max number of elements to cache.
173    max: usize,
174    // Max number of elements for a reader per thread.
175    read_max: usize,
176    hit_queue: Arc<ArrayQueue<CacheHitEvent>>,
177    inc_queue: Arc<ArrayQueue<CacheIncludeEvent<K, V>>>,
178    /// The number of items that are present in the cache before we start to process
179    /// the arc sets/lists.
180    watermark: usize,
181    /// If readers should attempt to quiesce the cache. Default true
182    reader_quiesce: bool,
183}
184
185/// A concurrently readable adaptive replacement cache. Operations are performed on the
186/// cache via read and write operations.
187pub struct ARCache<K, V>
188where
189    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
190    V: Clone + Debug + Sync + Send + 'static,
191{
192    // Use a unified tree, allows simpler movement of items between the
193    // cache types.
194    cache: DataMap<K, CacheItem<K, V>>,
195    // This is normally only ever taken in "read" mode, so it's effectively
196    // an uncontended barrier.
197    shared: RwLock<ArcShared<K, V>>,
198    // These are only taken during a quiesce
199    inner: Mutex<ArcInner<K, V>>,
200    // stats: CowCell<CacheStats>,
201    above_watermark: AtomicBool,
202    look_back_limit: u64,
203}
204
205unsafe impl<
206        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
207        V: Clone + Debug + Sync + Send + 'static,
208    > Send for ARCache<K, V>
209{
210}
211unsafe impl<
212        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
213        V: Clone + Debug + Sync + Send + 'static,
214    > Sync for ARCache<K, V>
215{
216}
217
218#[derive(Debug, Clone)]
219struct ReadCacheItem<K, V>
220where
221    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
222    V: Clone + Debug + Sync + Send + 'static,
223{
224    k: K,
225    v: V,
226    size: usize,
227}
228
229impl<K, V> LLWeight for ReadCacheItem<K, V>
230where
231    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
232    V: Clone + Debug + Sync + Send + 'static,
233{
234    #[inline]
235    fn ll_weight(&self) -> usize {
236        self.size
237    }
238}
239
240struct ReadCache<K, V>
241where
242    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
243    V: Clone + Debug + Sync + Send + 'static,
244{
245    // cache of our missed items to send forward.
246    // On drop we drain this to the channel
247    set: Map<K, LLNodeRef<ReadCacheItem<K, V>>>,
248    read_size: usize,
249    tlru: LL<ReadCacheItem<K, V>>,
250}
251
252/// An active read transaction over the cache. The data is this cache is guaranteed to be
253/// valid at the point in time the read is created. You may include items during a cache
254/// miss via the "insert" function.
255pub struct ARCacheReadTxn<'a, K, V, S>
256where
257    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
258    V: Clone + Debug + Sync + Send + 'static,
259    S: ARCacheReadStat + Clone,
260{
261    caller: &'a ARCache<K, V>,
262    // ro_txn to cache
263    cache: DataMapReadTxn<'a, K, CacheItem<K, V>>,
264    tlocal: Option<ReadCache<K, V>>,
265    hit_queue: Arc<ArrayQueue<CacheHitEvent>>,
266    inc_queue: Arc<ArrayQueue<CacheIncludeEvent<K, V>>>,
267    above_watermark: bool,
268    reader_quiesce: bool,
269    stats: S,
270}
271
272unsafe impl<
273        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
274        V: Clone + Debug + Sync + Send + 'static,
275        S: ARCacheReadStat + Clone + Sync + Send + 'static,
276    > Send for ARCacheReadTxn<'_, K, V, S>
277{
278}
279unsafe impl<
280        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
281        V: Clone + Debug + Sync + Send + 'static,
282        S: ARCacheReadStat + Clone + Sync + Send + 'static,
283    > Sync for ARCacheReadTxn<'_, K, V, S>
284{
285}
286
287/// An active write transaction over the cache. The data in this cache is isolated
288/// from readers, and may be rolled-back if an error occurs. Changes only become
289/// globally visible once you call "commit". Items may be added to the cache on
290/// a miss via "insert", and you can explicitly remove items by calling "remove".
291pub struct ARCacheWriteTxn<'a, K, V, S>
292where
293    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
294    V: Clone + Debug + Sync + Send + 'static,
295    S: ARCacheWriteStat<K>,
296{
297    caller: &'a ARCache<K, V>,
298    // wr_txn to cache
299    cache: DataMapWriteTxn<'a, K, CacheItem<K, V>>,
300    // Cache of missed items (w_ dirty/clean)
301    // On COMMIT we drain this to the main cache
302    tlocal: Map<K, ThreadCacheItem<V>>,
303    hit: UnsafeCell<Vec<u64>>,
304    clear: UnsafeCell<bool>,
305    above_watermark: bool,
306    // read_ops: UnsafeCell<u32>,
307    stats: S,
308}
309
310impl<
311        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
312        V: Clone + Debug + Sync + Send + 'static,
313    > CacheItem<K, V>
314{
315    fn to_vref(&self) -> Option<&V> {
316        match &self {
317            CacheItem::Freq(_, v) | CacheItem::Rec(_, v) => Some(v),
318            _ => None,
319        }
320    }
321
322    fn to_kvsref(&self) -> Option<(&K, &V, usize)> {
323        match &self {
324            CacheItem::Freq(lln, v) | CacheItem::Rec(lln, v) => {
325                let cii = lln.as_ref();
326                Some((&cii.k, v, cii.size))
327            }
328            _ => None,
329        }
330    }
331
332    #[cfg(test)]
333    fn to_state(&self) -> CacheState {
334        match &self {
335            CacheItem::Freq(_, _v) => CacheState::Freq,
336            CacheItem::Rec(_, _v) => CacheState::Rec,
337            CacheItem::GhostFreq(_) => CacheState::GhostFreq,
338            CacheItem::GhostRec(_) => CacheState::GhostRec,
339            CacheItem::Haunted(_) => CacheState::Haunted,
340        }
341    }
342}
343
344/// A configurable builder to create new concurrent Adaptive Replacement Caches.
345pub struct ARCacheBuilder {
346    max: Option<usize>,
347    read_max: Option<usize>,
348    watermark: Option<usize>,
349    reader_quiesce: bool,
350    look_back_limit: Option<u8>,
351}
352
353impl Default for ARCacheBuilder {
354    fn default() -> Self {
355        ARCacheBuilder {
356            max: None,
357            read_max: None,
358            watermark: None,
359            reader_quiesce: true,
360            look_back_limit: None,
361        }
362    }
363}
364
365impl ARCacheBuilder {
366    /// Create a new ARCache builder that you can configure before creation.
367    pub fn new() -> Self {
368        Self::default()
369    }
370
371    /// Configure a new ARCache, that derives its size based on your expected workload.
372    ///
373    /// The values are:
374    /// * Total number of items you want to have in memory (soft upper bound)
375    /// * Number of reading threads you expect concurrently (excluding write thread)
376    /// * Average expected number of cache misses per read transaction
377    /// * Average expected number of writes and/or misses per write transaction
378    ///
379    /// The cache may still exceed your provided total, and inaccurate tuning numbers
380    /// will yield a situation where you may use too-little ram, or too much. This could
381    /// be to your read misses exceeding your expected amount causing the queues to have
382    /// more items in them at a time, or your writes are larger than expected.
383    ///
384    /// If you set ex_ro_miss to zero, no read thread local cache will be configured, but
385    /// space will still be reserved for channel communication.
386    #[must_use]
387    pub fn set_expected_workload(
388        self,
389        total: usize,
390        threads: usize,
391        ex_ro_miss: usize,
392        ex_rw_miss: usize,
393        read_cache: bool,
394    ) -> Self {
395        let total = isize::try_from(total).unwrap();
396        let threads = isize::try_from(threads).unwrap();
397        let ro_miss = isize::try_from(ex_ro_miss).unwrap();
398        let wr_miss = isize::try_from(ex_rw_miss).unwrap();
399
400        // We need to clamp the expected read-miss so that the calculation doesn't end up
401        // skewing the cache to all "read cache" allocation.
402        //
403        // We clamp the read-max to an 8th of total div thread. This is because 1/8th read
404        // cache is a pretty standard ratio for a shared to per thread cache.
405        let read_max = if read_cache {
406            let read_max_limit = total / READ_THREAD_CACHE_RATIO;
407            let read_max_thread_limit = read_max_limit / threads;
408            ro_miss.clamp(0, read_max_thread_limit)
409        } else {
410            // No read cache requested
411            0
412        };
413
414        // We have to clamp rw_miss, even though we could go over-size - this is because
415        // we need to ensure that rw_miss is always < total.
416        let wr_miss_thread_limit = total / WRITE_THREAD_CACHE_RATIO;
417        let wr_miss = wr_miss.clamp(0, wr_miss_thread_limit);
418
419        let max = total - (wr_miss + (read_max * threads));
420
421        // Now make them usize.
422        let max = usize::try_from(max).unwrap();
423        let read_max = usize::try_from(read_max).unwrap();
424
425        ARCacheBuilder {
426            // stats: self.stats,
427            max: Some(max),
428            read_max: Some(read_max),
429            watermark: self.watermark,
430            reader_quiesce: self.reader_quiesce,
431            look_back_limit: self.look_back_limit,
432        }
433    }
434
435    /// Configure a new ARCache, with a capacity of `max` main cache items and `read_max`
436    /// Note that due to the way the cache operates, the number of items can and
437    /// will exceed `max` on a regular basis, so you should consider using `set_expected_workload`
438    /// and specifying your expected workload parameters to have a better derived
439    /// cache size.
440    #[must_use]
441    pub fn set_size(mut self, max: usize, read_max: usize) -> Self {
442        self.max = Some(max);
443        self.read_max = Some(read_max);
444        self
445    }
446
447    /// See [ARCache::new_size] for more information. This allows manual configuration of the data
448    /// tracking watermark. To disable this, set to 0. If watermark is greater than
449    /// max, it will be clamped to max.
450    #[must_use]
451    pub fn set_watermark(mut self, watermark: usize) -> Self {
452        self.watermark = Some(watermark);
453        self
454    }
455
456    /// Set the look back limit for ghost lists. This is a balance between the cache's ability
457    /// to have "perfect information" about all past keys to tune the p weight, and memory
458    /// consumption of storing the ghost lists. If your dataset consists of small keys/large
459    /// values you should not change this value from it's default. If your dataset contains
460    /// equally sized keys/values, then you may wish to lower this value. The lowest value is
461    /// `4`, defaults to `32`.
462    #[must_use]
463    pub fn set_look_back_limit(mut self, look_back_limit: u8) -> Self {
464        self.look_back_limit = Some(look_back_limit);
465        self
466    }
467
468    /// Enable or Disable reader cache quiescing. In some cases this can improve
469    /// reader performance, at the expense that cache includes or hits may be delayed
470    /// before acknowledgement. You must MANUALLY run periodic quiesces if you mark
471    /// this as "false" to disable reader quiescing.
472    #[must_use]
473    pub fn set_reader_quiesce(mut self, reader_quiesce: bool) -> Self {
474        self.reader_quiesce = reader_quiesce;
475        self
476    }
477
478    /// Consume this builder, returning a cache if successful. If configured parameters are
479    /// missing or incorrect, a None will be returned.
480    pub fn build<K, V>(self) -> Option<ARCache<K, V>>
481    where
482        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
483        V: Clone + Debug + Sync + Send + 'static,
484    {
485        let ARCacheBuilder {
486            // stats,
487            max,
488            read_max,
489            watermark,
490            reader_quiesce,
491            look_back_limit,
492        } = self;
493
494        let (max, read_max) = max.zip(read_max)?;
495
496        let watermark = watermark.unwrap_or(if max < WATERMARK_DISABLE_MIN {
497            0
498        } else {
499            (max / WATERMARK_DISABLE_DIVISOR) * WATERMARK_DISABLE_RATIO
500        });
501        let watermark = watermark.clamp(0, max);
502        // If the watermark is 0, always track from the start.
503        let init_watermark = watermark == 0;
504
505        let look_back_limit = look_back_limit
506            .unwrap_or(TXN_LOOKBACK_LIMIT_DEFAULT)
507            .clamp(TXN_LOOKBACK_LIMIT_ABS_MIN, u8::MAX) as u64;
508
509        // The hit queue is reasonably cheap, so we can let this grow a bit.
510        /*
511        let chan_size = max / 20;
512        let chan_size = if chan_size < 16 { 16 } else { chan_size };
513        let chan_size = chan_size.clamp(0, 128);
514        */
515        let chan_size = WRITE_THREAD_CHANNEL_SIZE;
516        let hit_queue = Arc::new(ArrayQueue::new(chan_size));
517
518        // this can oversize and take a lot of time to drain and manage, so we keep this bounded.
519        // let chan_size = chan_size.clamp(0, 64);
520        let chan_size = READ_THREAD_CHANNEL_SIZE;
521        let inc_queue = Arc::new(ArrayQueue::new(chan_size));
522
523        let shared = RwLock::new(ArcShared {
524            max,
525            read_max,
526            // stat_tx,
527            hit_queue: hit_queue.clone(),
528            inc_queue: inc_queue.clone(),
529            watermark,
530            reader_quiesce,
531        });
532        let inner = Mutex::new(ArcInner {
533            // We use p from the former stats.
534            p: 0,
535            freq: LL::new(),
536            rec: LL::new(),
537            ghost_freq: LL::new(),
538            ghost_rec: LL::new(),
539            haunted: LL::new(),
540            // stat_rx,
541            hit_queue,
542            inc_queue,
543            min_txid: 0,
544        });
545
546        Some(ARCache {
547            cache: DataMap::new(),
548            shared,
549            inner,
550            // stats: CowCell::new(stats),
551            above_watermark: AtomicBool::new(init_watermark),
552            look_back_limit,
553        })
554    }
555}
556
557impl<
558        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
559        V: Clone + Debug + Sync + Send + 'static,
560    > ARCache<K, V>
561{
562    /// Use ARCacheBuilder instead
563    #[deprecated(since = "0.2.20", note = "please use`ARCacheBuilder` instead")]
564    pub fn new(
565        total: usize,
566        threads: usize,
567        ex_ro_miss: usize,
568        ex_rw_miss: usize,
569        read_cache: bool,
570    ) -> Self {
571        ARCacheBuilder::default()
572            .set_expected_workload(total, threads, ex_ro_miss, ex_rw_miss, read_cache)
573            .build()
574            .expect("Invalid cache parameters!")
575    }
576
577    /// Use ARCacheBuilder instead
578    #[deprecated(since = "0.2.20", note = "please use`ARCacheBuilder` instead")]
579    pub fn new_size(max: usize, read_max: usize) -> Self {
580        ARCacheBuilder::default()
581            .set_size(max, read_max)
582            .build()
583            .expect("Invalid cache parameters!")
584    }
585
586    /// Use ARCacheBuilder instead
587    #[deprecated(since = "0.2.20", note = "please use`ARCacheBuilder` instead")]
588    pub fn new_size_watermark(max: usize, read_max: usize, watermark: usize) -> Self {
589        ARCacheBuilder::default()
590            .set_size(max, read_max)
591            .set_watermark(watermark)
592            .build()
593            .expect("Invalid cache parameters!")
594    }
595
596    /// Begin a read operation on the cache. This reader has a thread-local cache for items
597    /// that are localled included via `insert`, and can communicate back to the main cache
598    /// to safely include items.
599    pub fn read_stats<S>(&self, stats: S) -> ARCacheReadTxn<'_, K, V, S>
600    where
601        S: ARCacheReadStat + Clone,
602    {
603        let rshared = self.shared.read().unwrap();
604        let tlocal = if rshared.read_max > 0 {
605            Some(ReadCache {
606                set: Map::new(),
607                read_size: rshared.read_max,
608                tlru: LL::new(),
609            })
610        } else {
611            None
612        };
613        let above_watermark = self.above_watermark.load(Ordering::Relaxed);
614        ARCacheReadTxn {
615            caller: self,
616            cache: self.cache.read(),
617            tlocal,
618            // stat_tx: rshared.stat_tx.clone(),
619            hit_queue: rshared.hit_queue.clone(),
620            inc_queue: rshared.inc_queue.clone(),
621            above_watermark,
622            reader_quiesce: rshared.reader_quiesce,
623            stats,
624        }
625    }
626
627    /// Begin a read operation on the cache. This reader has a thread-local cache for items
628    /// that are localled included via `insert`, and can communicate back to the main cache
629    /// to safely include items.
630    pub fn read(&self) -> ARCacheReadTxn<'_, K, V, ()> {
631        self.read_stats(())
632    }
633
634    /// Begin a write operation on the cache. This writer has a thread-local store
635    /// for all items that have been included or dirtied in the transactions, items
636    /// may be removed from this cache (ie deleted, invalidated).
637    pub fn write(&self) -> ARCacheWriteTxn<'_, K, V, ()> {
638        self.write_stats(())
639    }
640
641    /// _
642    pub fn write_stats<S>(&self, stats: S) -> ARCacheWriteTxn<'_, K, V, S>
643    where
644        S: ARCacheWriteStat<K>,
645    {
646        let above_watermark = self.above_watermark.load(Ordering::Relaxed);
647        ARCacheWriteTxn {
648            caller: self,
649            cache: self.cache.write(),
650            tlocal: Map::new(),
651            hit: UnsafeCell::new(Vec::new()),
652            clear: UnsafeCell::new(false),
653            above_watermark,
654            // read_ops: UnsafeCell::new(0),
655            stats,
656        }
657    }
658
659    fn try_write_stats<S>(&self, stats: S) -> Result<ARCacheWriteTxn<'_, K, V, S>, S>
660    where
661        S: ARCacheWriteStat<K>,
662    {
663        match self.cache.try_write() {
664            Some(cache) => {
665                let above_watermark = self.above_watermark.load(Ordering::Relaxed);
666                Ok(ARCacheWriteTxn {
667                    caller: self,
668                    cache,
669                    tlocal: Map::new(),
670                    hit: UnsafeCell::new(Vec::new()),
671                    clear: UnsafeCell::new(false),
672                    above_watermark,
673                    // read_ops: UnsafeCell::new(0),
674                    stats,
675                })
676            }
677            None => Err(stats),
678        }
679    }
680
681    /// If the lock is available, attempt to quiesce the cache's async channel states. If the lock
682    /// is currently held, no action is taken.
683    pub fn try_quiesce_stats<S>(&self, stats: S) -> S
684    where
685        S: ARCacheWriteStat<K>,
686    {
687        // It seems like a good idea to skip this when not at wmark, but
688        // that can cause low-pressure caches to no submit includes properly.
689        // if self.above_watermark.load(Ordering::Relaxed) {
690        match self.try_write_stats(stats) {
691            Ok(wr_txn) => wr_txn.commit(),
692            Err(stats) => stats,
693        }
694    }
695
696    /// If the lock is available, attempt to quiesce the cache's async channel states. If the lock
697    /// is currently held, no action is taken.
698    pub fn try_quiesce(&self) {
699        self.try_quiesce_stats(())
700    }
701
702    fn calc_p_freq(ghost_rec_len: usize, ghost_freq_len: usize, p: &mut usize, size: usize) {
703        let delta = if ghost_rec_len > ghost_freq_len {
704            ghost_rec_len / ghost_freq_len
705        } else {
706            1
707        } * size;
708        let p_was = *p;
709        if delta < *p {
710            *p -= delta
711        } else {
712            *p = 0
713        }
714        tracing::trace!("f {} >>> {}", p_was, *p);
715    }
716
717    fn calc_p_rec(
718        cap: usize,
719        ghost_rec_len: usize,
720        ghost_freq_len: usize,
721        p: &mut usize,
722        size: usize,
723    ) {
724        let delta = if ghost_freq_len > ghost_rec_len {
725            ghost_freq_len / ghost_rec_len
726        } else {
727            1
728        } * size;
729        let p_was = *p;
730        if delta <= cap - *p {
731            *p += delta
732        } else {
733            *p = cap
734        }
735        tracing::trace!("r {} >>> {}", p_was, *p);
736    }
737
738    fn drain_tlocal_inc<S>(
739        &self,
740        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
741        inner: &mut ArcInner<K, V>,
742        shared: &ArcShared<K, V>,
743        tlocal: Map<K, ThreadCacheItem<V>>,
744        commit_txid: u64,
745        stats: &mut S,
746    ) where
747        S: ARCacheWriteStat<K>,
748    {
749        // drain tlocal into the main cache.
750        tlocal.into_iter().for_each(|(k, tcio)| {
751            let r = cache.get_mut(&k);
752            match (r, tcio) {
753                (None, ThreadCacheItem::Present(tci, clean, size)) => {
754                    assert!(clean);
755                    let llp = inner.rec.append_k(CacheItemInner {
756                        k: k.clone(),
757                        txid: commit_txid,
758                        size,
759                    });
760                    // stats.write_includes += 1;
761                    stats.include(&k);
762                    cache.insert(k, CacheItem::Rec(llp, tci));
763                }
764                (None, ThreadCacheItem::Removed(clean)) => {
765                    assert!(clean);
766                    // Mark this as haunted
767                    let llp = inner.haunted.append_k(CacheItemInner {
768                        k: k.clone(),
769                        txid: commit_txid,
770                        size: 1,
771                    });
772                    cache.insert(k, CacheItem::Haunted(llp));
773                }
774                (Some(ref mut ci), ThreadCacheItem::Removed(clean)) => {
775                    assert!(clean);
776                    // From whatever set we were in, pop and move to haunted.
777                    let mut next_state = match ci {
778                        CacheItem::Freq(llp, _v) => {
779                            let mut owned = inner.freq.extract(llp.clone());
780                            owned.as_mut().txid = commit_txid;
781                            let pointer = inner.haunted.append_n(owned);
782                            CacheItem::Haunted(pointer)
783                        }
784                        CacheItem::Rec(llp, _v) => {
785                            // Remove the node and put it into freq.
786                            let mut owned = inner.rec.extract(llp.clone());
787                            owned.as_mut().txid = commit_txid;
788                            let pointer = inner.haunted.append_n(owned);
789                            CacheItem::Haunted(pointer)
790                        }
791                        CacheItem::GhostFreq(llp) => {
792                            let mut owned = inner.ghost_freq.extract(llp.clone());
793                            owned.as_mut().txid = commit_txid;
794                            let pointer = inner.haunted.append_n(owned);
795                            CacheItem::Haunted(pointer)
796                        }
797                        CacheItem::GhostRec(llp) => {
798                            let mut owned = inner.ghost_rec.extract(llp.clone());
799                            owned.as_mut().txid = commit_txid;
800                            let pointer = inner.haunted.append_n(owned);
801                            CacheItem::Haunted(pointer)
802                        }
803                        CacheItem::Haunted(llp) => {
804                            unsafe { llp.make_mut().txid = commit_txid };
805                            CacheItem::Haunted(llp.clone())
806                        }
807                    };
808                    // Now change the state.
809                    mem::swap(*ci, &mut next_state);
810                }
811                // Done! https://github.com/rust-lang/rust/issues/68354 will stabilise
812                // in 1.44 so we can prevent a need for a clone.
813                (Some(ref mut ci), ThreadCacheItem::Present(tci, clean, size)) => {
814                    assert!(clean);
815                    //   * as we include each item, what state was it in before?
816                    // It's in the cache - what action must we take?
817                    let mut next_state = match ci {
818                        CacheItem::Freq(llp, _v) => {
819                            let mut owned = inner.freq.extract(llp.clone());
820                            owned.as_mut().txid = commit_txid;
821                            owned.as_mut().size = size;
822                            // Move the list item to it's head.
823                            stats.modify(&owned.as_ref().k);
824                            let pointer = inner.freq.append_n(owned);
825                            // Update v.
826                            CacheItem::Freq(pointer, tci)
827                        }
828                        CacheItem::Rec(llp, _v) => {
829                            // Remove the node and put it into freq.
830                            let mut owned = inner.rec.extract(llp.clone());
831                            owned.as_mut().txid = commit_txid;
832                            owned.as_mut().size = size;
833                            stats.modify(&owned.as_ref().k);
834                            let pointer = inner.freq.append_n(owned);
835                            CacheItem::Freq(pointer, tci)
836                        }
837                        CacheItem::GhostFreq(llp) => {
838                            // Adjust p
839                            Self::calc_p_freq(
840                                inner.ghost_rec.len(),
841                                inner.ghost_freq.len(),
842                                &mut inner.p,
843                                size,
844                            );
845                            let mut owned = inner.ghost_freq.extract(llp.clone());
846                            owned.as_mut().txid = commit_txid;
847                            owned.as_mut().size = size;
848                            stats.ghost_frequent_revive(&owned.as_ref().k);
849                            let pointer = inner.freq.append_n(owned);
850                            CacheItem::Freq(pointer, tci)
851                        }
852                        CacheItem::GhostRec(llp) => {
853                            // Adjust p
854                            Self::calc_p_rec(
855                                shared.max,
856                                inner.ghost_rec.len(),
857                                inner.ghost_freq.len(),
858                                &mut inner.p,
859                                size,
860                            );
861                            let mut owned = inner.ghost_rec.extract(llp.clone());
862                            owned.as_mut().txid = commit_txid;
863                            owned.as_mut().size = size;
864                            stats.ghost_recent_revive(&owned.as_ref().k);
865                            let pointer = inner.rec.append_n(owned);
866                            CacheItem::Rec(pointer, tci)
867                        }
868                        CacheItem::Haunted(llp) => {
869                            let mut owned = inner.haunted.extract(llp.clone());
870                            owned.as_mut().txid = commit_txid;
871                            owned.as_mut().size = size;
872                            stats.include_haunted(&owned.as_ref().k);
873                            let pointer = inner.rec.append_n(owned);
874                            CacheItem::Rec(pointer, tci)
875                        }
876                    };
877                    // Now change the state.
878                    mem::swap(*ci, &mut next_state);
879                }
880            }
881        });
882    }
883
884    fn drain_hit_rx(
885        &self,
886        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
887        inner: &mut ArcInner<K, V>,
888        commit_ts: Instant,
889    ) {
890        // * for each item
891        // while let Ok(ce) = inner.rx.try_recv() {
892        // TODO: Find a way to remove these clones here!
893        while let Some(ce) = inner.hit_queue.pop() {
894            let CacheHitEvent { t, k_hash } = ce;
895            if let Some(ref mut ci_slots) = unsafe { cache.get_slot_mut(k_hash) } {
896                for ref mut ci in ci_slots.iter_mut() {
897                    let mut next_state = match &ci.v {
898                        CacheItem::Freq(llp, v) => {
899                            inner.freq.touch(llp.to_owned());
900                            CacheItem::Freq(llp.to_owned(), v.to_owned())
901                        }
902                        CacheItem::Rec(llp, v) => {
903                            let owned = inner.rec.extract(llp.to_owned());
904                            let pointer = inner.freq.append_n(owned);
905                            CacheItem::Freq(pointer, v.to_owned())
906                        }
907                        // While we can't add this from nothing, we can
908                        // at least keep it in the ghost sets.
909                        CacheItem::GhostFreq(llp) => {
910                            inner.ghost_freq.touch(llp.to_owned());
911                            CacheItem::GhostFreq(llp.to_owned())
912                        }
913                        CacheItem::GhostRec(llp) => {
914                            inner.ghost_rec.touch(llp.to_owned());
915                            CacheItem::GhostRec(llp.to_owned())
916                        }
917                        CacheItem::Haunted(llp) => {
918                            // We can't do anything about this ...
919                            // Don't touch or rearrange the haunted list, it should be
920                            // in commit txid order.
921                            CacheItem::Haunted(llp.to_owned())
922                        }
923                    };
924                    mem::swap(&mut ci.v, &mut next_state);
925                } // for each item in the bucket.
926            }
927            // Do nothing, it must have been evicted.
928
929            // Stop processing the queue, we are up to "now".
930            if t >= commit_ts {
931                break;
932            }
933        }
934    }
935
936    fn drain_inc_rx<S>(
937        &self,
938        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
939        inner: &mut ArcInner<K, V>,
940        shared: &ArcShared<K, V>,
941        commit_ts: Instant,
942        stats: &mut S,
943    ) where
944        S: ARCacheWriteStat<K>,
945    {
946        while let Some(ce) = inner.inc_queue.pop() {
947            // Update if it was inc
948            let CacheIncludeEvent {
949                t,
950                k,
951                v: iv,
952                txid,
953                size,
954            } = ce;
955            let mut r = cache.get_mut(&k);
956            match r {
957                Some(ref mut ci) => {
958                    let mut next_state = match &ci {
959                        CacheItem::Freq(llp, _v) => {
960                            if llp.as_ref().txid >= txid || inner.min_txid > txid {
961                                // Our cache already has a newer value, keep it.
962                                inner.freq.touch(llp.to_owned());
963                                None
964                            } else {
965                                // The value is newer, update.
966                                let mut owned = inner.freq.extract(llp.to_owned());
967                                owned.as_mut().txid = txid;
968                                owned.as_mut().size = size;
969                                stats.modify(&owned.as_mut().k);
970                                let pointer = inner.freq.append_n(owned);
971                                Some(CacheItem::Freq(pointer, iv))
972                            }
973                        }
974                        CacheItem::Rec(llp, v) => {
975                            let mut owned = inner.rec.extract(llp.to_owned());
976                            if llp.as_ref().txid >= txid || inner.min_txid > txid {
977                                let pointer = inner.freq.append_n(owned);
978                                Some(CacheItem::Freq(pointer, v.to_owned()))
979                            } else {
980                                owned.as_mut().txid = txid;
981                                owned.as_mut().size = size;
982                                stats.modify(&owned.as_mut().k);
983                                let pointer = inner.freq.append_n(owned);
984                                Some(CacheItem::Freq(pointer, iv))
985                            }
986                        }
987                        CacheItem::GhostFreq(llp) => {
988                            // Adjust p
989                            if llp.as_ref().txid > txid || inner.min_txid > txid {
990                                // The cache version is newer, this is just a hit.
991                                let size = llp.as_ref().size;
992                                Self::calc_p_freq(
993                                    inner.ghost_rec.len(),
994                                    inner.ghost_freq.len(),
995                                    &mut inner.p,
996                                    size,
997                                );
998                                inner.ghost_freq.touch(llp.to_owned());
999                                None
1000                            } else {
1001                                // This item is newer, so we can include it.
1002                                Self::calc_p_freq(
1003                                    inner.ghost_rec.len(),
1004                                    inner.ghost_freq.len(),
1005                                    &mut inner.p,
1006                                    size,
1007                                );
1008                                let mut owned = inner.ghost_freq.extract(llp.to_owned());
1009                                owned.as_mut().txid = txid;
1010                                owned.as_mut().size = size;
1011                                stats.ghost_frequent_revive(&owned.as_mut().k);
1012                                let pointer = inner.freq.append_n(owned);
1013                                Some(CacheItem::Freq(pointer, iv))
1014                            }
1015                        }
1016                        CacheItem::GhostRec(llp) => {
1017                            // Adjust p
1018                            if llp.as_ref().txid > txid || inner.min_txid > txid {
1019                                let size = llp.as_ref().size;
1020                                Self::calc_p_rec(
1021                                    shared.max,
1022                                    inner.ghost_rec.len(),
1023                                    inner.ghost_freq.len(),
1024                                    &mut inner.p,
1025                                    size,
1026                                );
1027                                inner.ghost_rec.touch(llp.clone());
1028                                None
1029                            } else {
1030                                Self::calc_p_rec(
1031                                    shared.max,
1032                                    inner.ghost_rec.len(),
1033                                    inner.ghost_freq.len(),
1034                                    &mut inner.p,
1035                                    size,
1036                                );
1037                                let mut owned = inner.ghost_rec.extract(llp.to_owned());
1038                                owned.as_mut().txid = txid;
1039                                owned.as_mut().size = size;
1040                                stats.ghost_recent_revive(&owned.as_ref().k);
1041                                let pointer = inner.rec.append_n(owned);
1042                                Some(CacheItem::Rec(pointer, iv))
1043                            }
1044                        }
1045                        CacheItem::Haunted(llp) => {
1046                            if llp.as_ref().txid > txid || inner.min_txid > txid {
1047                                None
1048                            } else {
1049                                let mut owned = inner.haunted.extract(llp.to_owned());
1050                                owned.as_mut().txid = txid;
1051                                owned.as_mut().size = size;
1052                                stats.include_haunted(&owned.as_mut().k);
1053                                let pointer = inner.rec.append_n(owned);
1054                                Some(CacheItem::Rec(pointer, iv))
1055                            }
1056                        }
1057                    };
1058                    if let Some(ref mut next_state) = next_state {
1059                        mem::swap(*ci, next_state);
1060                    }
1061                }
1062                None => {
1063                    // This key has never been seen before.
1064                    // It's not present - include it!
1065                    if txid >= inner.min_txid {
1066                        let llp = inner.rec.append_k(CacheItemInner {
1067                            k: k.clone(),
1068                            txid,
1069                            size,
1070                        });
1071                        stats.include(&k);
1072                        cache.insert(k, CacheItem::Rec(llp, iv));
1073                    }
1074                }
1075            };
1076
1077            // Stop processing the queue, we are up to "now".
1078            if t >= commit_ts {
1079                break;
1080            }
1081        }
1082    }
1083
1084    fn drain_tlocal_hits(
1085        &self,
1086        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1087        inner: &mut ArcInner<K, V>,
1088        // shared: &ArcShared<K, V>,
1089        commit_txid: u64,
1090        hit: Vec<u64>,
1091    ) {
1092        // Stats updated by caller
1093        hit.into_iter().for_each(|k_hash| {
1094            // * everything hit must be in main cache now, so bring these
1095            //   all to the relevant item heads.
1096            // * Why do this last? Because the write is the "latest" we want all the fresh
1097            //   written items in the cache over the "read" hits, it gives us some aprox
1098            //   of time ordering, but not perfect.
1099
1100            // Find the item in the cache.
1101            // * based on it's type, promote it in the correct list, or move it.
1102            // How does this prevent incorrect promotion from rec to freq? txid?
1103            let mut r = unsafe { cache.get_slot_mut(k_hash) };
1104            match r {
1105                Some(ref mut ci_slots) => {
1106                    for ref mut ci in ci_slots.iter_mut() {
1107                        // This differs from above - we skip if we don't touch anything
1108                        // that was added in this txn. This is to prevent double touching
1109                        // anything that was included in a write.
1110
1111                        // TODO: find a way to remove these clones
1112                        let mut next_state = match &ci.v {
1113                            CacheItem::Freq(llp, v) => {
1114                                // To prevent cache hysterisis, we require a hit over
1115                                // two transactions.
1116                                if llp.as_ref().txid != commit_txid {
1117                                    inner.freq.touch(llp.to_owned());
1118                                    Some(CacheItem::Freq(llp.to_owned(), v.to_owned()))
1119                                } else {
1120                                    None
1121                                }
1122                            }
1123                            CacheItem::Rec(llp, v) => {
1124                                if llp.as_ref().txid != commit_txid {
1125                                    // println!("hit {:?} Rec -> Freq", k);
1126                                    let owned = inner.rec.extract(llp.clone());
1127                                    let pointer = inner.freq.append_n(owned);
1128                                    Some(CacheItem::Freq(pointer, v.clone()))
1129                                } else {
1130                                    None
1131                                }
1132                            }
1133                            _ => {
1134                                // Ignore hits on items that may have been cleared.
1135                                None
1136                            }
1137                        };
1138                        // Now change the state.
1139                        if let Some(ref mut next_state) = next_state {
1140                            mem::swap(&mut ci.v, next_state);
1141                        }
1142                    } // for each ci in slots
1143                }
1144                None => {
1145                    // Impossible state!
1146                    unreachable!();
1147                }
1148            }
1149        });
1150    }
1151
1152    fn evict_to_haunted_len(
1153        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1154        ll: &mut LL<CacheItemInner<K>>,
1155        to_ll: &mut LL<CacheItemInner<K>>,
1156        size: usize,
1157        txid: u64,
1158    ) {
1159        while ll.len() > size {
1160            let mut owned = ll.pop();
1161            debug_assert!(!owned.is_null());
1162
1163            // Set the item's evict txid.
1164            owned.as_mut().txid = txid;
1165
1166            let pointer = to_ll.append_n(owned);
1167            let mut r = cache.get_mut(&pointer.as_ref().k);
1168
1169            match r {
1170                Some(ref mut ci) => {
1171                    // Now change the state.
1172                    let mut next_state = CacheItem::Haunted(pointer);
1173                    mem::swap(*ci, &mut next_state);
1174                }
1175                None => {
1176                    // Impossible state!
1177                    unreachable!();
1178                }
1179            };
1180        }
1181    }
1182
1183    fn evict_to_len<S>(
1184        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1185        ll: &mut LL<CacheItemInner<K>>,
1186        to_ll: &mut LL<CacheItemInner<K>>,
1187        size: usize,
1188        txid: u64,
1189        stats: &mut S,
1190    ) where
1191        S: ARCacheWriteStat<K>,
1192    {
1193        debug_assert!(ll.len() >= size);
1194
1195        while ll.len() > size {
1196            let mut owned = ll.pop();
1197            debug_assert!(!owned.is_null());
1198            let mut r = cache.get_mut(&owned.as_ref().k);
1199            // Set the item's evict txid.
1200            owned.as_mut().txid = txid;
1201            match r {
1202                Some(ref mut ci) => {
1203                    let mut next_state = match &ci {
1204                        CacheItem::Freq(llp, _v) => {
1205                            debug_assert!(llp == &owned);
1206                            // No need to extract, already popped!
1207                            // $ll.extract(*llp);
1208                            stats.evict_from_frequent(&owned.as_ref().k);
1209                            let pointer = to_ll.append_n(owned);
1210                            CacheItem::GhostFreq(pointer)
1211                        }
1212                        CacheItem::Rec(llp, _v) => {
1213                            debug_assert!(llp == &owned);
1214                            // No need to extract, already popped!
1215                            // $ll.extract(*llp);
1216                            stats.evict_from_recent(&owned.as_mut().k);
1217                            let pointer = to_ll.append_n(owned);
1218                            CacheItem::GhostRec(pointer)
1219                        }
1220                        _ => {
1221                            // Impossible state!
1222                            unreachable!();
1223                        }
1224                    };
1225                    // Now change the state.
1226                    mem::swap(*ci, &mut next_state);
1227                }
1228                None => {
1229                    // Impossible state!
1230                    unreachable!();
1231                }
1232            }
1233        }
1234    }
1235
1236    #[allow(clippy::cognitive_complexity)]
1237    fn evict<S>(
1238        &self,
1239        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1240        inner: &mut ArcInner<K, V>,
1241        shared: &ArcShared<K, V>,
1242        commit_txid: u64,
1243        stats: &mut S,
1244    ) where
1245        S: ARCacheWriteStat<K>,
1246    {
1247        debug_assert!(inner.p <= shared.max);
1248        // Convince the compiler copying is okay.
1249        let p = inner.p;
1250
1251        if inner.rec.len() + inner.freq.len() > shared.max {
1252            // println!("Checking cache evict");
1253            trace!(
1254                "from -> rec {:?}, freq {:?}",
1255                inner.rec.len(),
1256                inner.freq.len()
1257            );
1258            let delta = (inner.rec.len() + inner.freq.len()) - shared.max;
1259            // We have overflowed by delta. As we are not "evicting as we go" we have to work out
1260            // what we should have evicted up to now.
1261            //
1262            // keep removing from rec until == p OR delta == 0, and if delta remains, then remove from freq.
1263            //
1264            // Remember P is "the maximum size of recent" or "presure on recent". If P is max then
1265            // we are pressuring churning on recent but not freq, so evict in freq.
1266            //
1267            // If P is toward 0 that means all our pressure is in frequent and we evicted things we
1268            // shouldn't have so we want more space in frequent and less in rec.
1269
1270            // delta here is the number of elements we need to remove to remain below shared.max.
1271
1272            let rec_to_len = if inner.p == 0 {
1273                trace!("p == 0 => {:?} - {}", inner.rec.len(), delta);
1274                if delta < inner.rec.len() {
1275                    // We are fully weighted to freq, so only remove in recent.
1276                    inner.rec.len() - delta
1277                } else {
1278                    // We need to fully clear rec *and* then from freq as well.
1279                    0
1280                }
1281            } else if inner.rec.len() > inner.p {
1282                // There is a partial weighting, how much do we need to move?
1283                let rec_delta = inner.rec.len() - inner.p;
1284                if rec_delta > delta {
1285                    /*
1286                    println!(
1287                        "p ({:?}) <= rec ({:?}), rec_delta ({:?}) > delta ({:?})",
1288                        inner.p,
1289                        inner.rec.len(),
1290                        rec_delta,
1291                        delta
1292                    );
1293                    */
1294                    // We will have removed enough through delta alone in rec. Technically
1295                    // this means we are still over p, but since we already removed delta
1296                    // number of elements, freq won't change dimensions.
1297                    inner.rec.len() - delta
1298                } else {
1299                    /*
1300                    println!(
1301                        "p ({:?}) <= rec ({:?}), rec_delta ({:?}) <= delta ({:?})",
1302                        inner.p,
1303                        inner.rec.len(),
1304                        rec_delta,
1305                        delta
1306                    );
1307                    */
1308                    // Remove the full recent delta, and excess will be removed from freq.
1309                    inner.rec.len() - rec_delta
1310                }
1311            } else {
1312                // rec is already below p, therefore we must need to remove in freq, and
1313                // we need to consider how much is in rec.
1314                // println!("p ({:?}) > rec ({:?})", inner.p, inner.rec.len());
1315                inner.rec.len()
1316            };
1317
1318            // Now we can get the expected sizes;
1319            let freq_to_len = shared.max - rec_to_len;
1320            // println!("move to -> rec {:?}, freq {:?}", rec_to_len, freq_to_len);
1321            debug_assert!(rec_to_len <= inner.rec.len());
1322            debug_assert!(freq_to_len <= inner.freq.len());
1323
1324            // stats.freq_evicts += (inner.freq.len() - freq_to_len) as u64;
1325            // stats.recent_evicts += (inner.rec.len() - rec_to_len) as u64;
1326            // stats.frequent_evict_add((inner.freq.len() - freq_to_len) as u64);
1327            // stats.recent_evict_add((inner.rec.len() - rec_to_len) as u64);
1328
1329            Self::evict_to_len(
1330                cache,
1331                &mut inner.rec,
1332                &mut inner.ghost_rec,
1333                rec_to_len,
1334                commit_txid,
1335                stats,
1336            );
1337            Self::evict_to_len(
1338                cache,
1339                &mut inner.freq,
1340                &mut inner.ghost_freq,
1341                freq_to_len,
1342                commit_txid,
1343                stats,
1344            );
1345
1346            // Finally, do an evict of the ghost sets if they are too long - these are weighted
1347            // inverse to the above sets. Note the freq to len in ghost rec, and rec to len in
1348            // ghost freq!
1349            if inner.ghost_rec.len() > (shared.max - p) {
1350                Self::evict_to_haunted_len(
1351                    cache,
1352                    &mut inner.ghost_rec,
1353                    &mut inner.haunted,
1354                    freq_to_len,
1355                    commit_txid,
1356                );
1357            }
1358
1359            if inner.ghost_freq.len() > p {
1360                Self::evict_to_haunted_len(
1361                    cache,
1362                    &mut inner.ghost_freq,
1363                    &mut inner.haunted,
1364                    rec_to_len,
1365                    commit_txid,
1366                );
1367            }
1368        }
1369    }
1370
1371    fn drain_ll_to_ghost<S>(
1372        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1373        ll: &mut LL<CacheItemInner<K>>,
1374        gf: &mut LL<CacheItemInner<K>>,
1375        gr: &mut LL<CacheItemInner<K>>,
1376        txid: u64,
1377        stats: &mut S,
1378    ) where
1379        S: ARCacheWriteStat<K>,
1380    {
1381        while ll.len() > 0 {
1382            let mut owned = ll.pop();
1383            debug_assert!(!owned.is_null());
1384
1385            // Set the item's eviction txid.
1386            owned.as_mut().txid = txid;
1387
1388            let mut r = cache.get_mut(&owned.as_ref().k);
1389            match r {
1390                Some(ref mut ci) => {
1391                    let mut next_state = match &ci {
1392                        CacheItem::Freq(n, _) => {
1393                            debug_assert!(n == &owned);
1394                            stats.evict_from_frequent(&owned.as_ref().k);
1395                            let pointer = gf.append_n(owned);
1396                            CacheItem::GhostFreq(pointer)
1397                        }
1398                        CacheItem::Rec(n, _) => {
1399                            debug_assert!(n == &owned);
1400                            stats.evict_from_recent(&owned.as_ref().k);
1401                            let pointer = gr.append_n(owned);
1402                            CacheItem::GhostRec(pointer)
1403                        }
1404                        _ => {
1405                            // Impossible state!
1406                            unreachable!();
1407                        }
1408                    };
1409                    // Now change the state.
1410                    mem::swap(*ci, &mut next_state);
1411                }
1412                None => {
1413                    // Impossible state!
1414                    unreachable!();
1415                }
1416            }
1417        } // end while
1418    }
1419
1420    fn drain_ll_min_txid(
1421        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1422        ll: &mut LL<CacheItemInner<K>>,
1423        min_txid: u64,
1424    ) {
1425        while let Some(node) = ll.peek_head() {
1426            if min_txid > node.txid {
1427                // Need to free from the cache.
1428                cache.remove(&node.k);
1429
1430                // Okay, this node can be trimmed.
1431                ll.drop_head();
1432            } else {
1433                // We are done with this loop, everything else
1434                // is newer.
1435                break;
1436            }
1437        }
1438    }
1439
1440    #[allow(clippy::unnecessary_mut_passed)]
1441    fn commit<S>(
1442        &self,
1443        mut cache: DataMapWriteTxn<K, CacheItem<K, V>>,
1444        tlocal: Map<K, ThreadCacheItem<V>>,
1445        hit: Vec<u64>,
1446        clear: bool,
1447        init_above_watermark: bool,
1448        // read_ops: u32,
1449        mut stats: S,
1450    ) -> S
1451    where
1452        S: ARCacheWriteStat<K>,
1453    {
1454        // What is the time?
1455        let commit_ts = Instant::now();
1456        let commit_txid = cache.get_txid();
1457        // Copy p + init cache sizes for adjustment.
1458        let mut inner = self.inner.lock().unwrap();
1459        let shared = self.shared.read().unwrap();
1460
1461        // Did we request to be cleared? If so, we move everything to a ghost set
1462        // that was live.
1463        //
1464        // we also set the min_txid watermark which prevents any inclusion of
1465        // any item that existed before this point in time.
1466        if clear {
1467            // Set the watermark of this txn.
1468            inner.min_txid = commit_txid;
1469
1470            // Indicate that we evicted all to ghost/freq
1471            // stats.frequent_evict_add(inner.freq.len() as u64);
1472            // stats.recent_evict_add(inner.rec.len() as u64);
1473
1474            // This weird looking dance is to convince rust that the mutable borrow is safe.
1475            let m_inner = inner.deref_mut();
1476
1477            let i_f = &mut m_inner.freq;
1478            let g_f = &mut m_inner.ghost_freq;
1479            let i_r = &mut m_inner.rec;
1480            let g_r = &mut m_inner.ghost_rec;
1481
1482            // Move everything active into ghost sets.
1483            Self::drain_ll_to_ghost(&mut cache, i_f, g_f, g_r, commit_txid, &mut stats);
1484            Self::drain_ll_to_ghost(&mut cache, i_r, g_f, g_r, commit_txid, &mut stats);
1485        } else {
1486            // Update the minimum txid we'll include from based on the look back limit
1487            //
1488            // If a clear happens, we don't want this setting the limit back lower than an existing
1489            // limit, so we take the greater of these values.
1490            let possible_new_limit = commit_txid.saturating_sub(self.look_back_limit);
1491            inner.min_txid = inner.min_txid.max(possible_new_limit);
1492        }
1493
1494        // Why is it okay to drain the rx/tlocal and create the cache in a temporary
1495        // oversize? Because these values in the queue/tlocal are already in memory
1496        // and we are moving them to the cache, we are not actually using any more
1497        // memory (well, not significantly more). By adding everything, then evicting
1498        // we also get better and more accurate hit patterns over the cache based on what
1499        // was used. This gives us an advantage over other cache types - we can see
1500        // patterns based on temporal usage that other caches can't, at the expense that
1501        // it may take some moments for that cache pattern to sync to the main thread.
1502
1503        self.drain_tlocal_inc(
1504            &mut cache,
1505            inner.deref_mut(),
1506            shared.deref(),
1507            tlocal,
1508            commit_txid,
1509            &mut stats,
1510        );
1511
1512        // drain rx until empty or time >= time.
1513        self.drain_inc_rx(
1514            &mut cache,
1515            inner.deref_mut(),
1516            shared.deref(),
1517            commit_ts,
1518            &mut stats,
1519        );
1520
1521        self.drain_hit_rx(&mut cache, inner.deref_mut(), commit_ts);
1522
1523        // drain the tlocal hits into the main cache.
1524
1525        // stats.write_hits += hit.len() as u64;
1526        // stats.write_read_ops += read_ops as u64;
1527
1528        self.drain_tlocal_hits(&mut cache, inner.deref_mut(), commit_txid, hit);
1529
1530        // now clean the space for each of the primary caches, evicting into the ghost sets.
1531        // * It's possible that both caches are now over-sized if rx was empty
1532        //   but wr inc many items.
1533        // * p has possibly changed width, causing a balance shift
1534        // * and ghost items have been included changing ghost list sizes.
1535        // so we need to do a clean up/balance of all the list lengths.
1536        self.evict(
1537            &mut cache,
1538            inner.deref_mut(),
1539            shared.deref(),
1540            commit_txid,
1541            &mut stats,
1542        );
1543
1544        // self.drain_stat_rx(inner.deref_mut(), stats, commit_ts);
1545
1546        // Now drain from the haunted set if required, this removes anything
1547        // lower than the min_txid.
1548        {
1549            // Tell the compiler copying is okay.
1550            let min_txid = inner.min_txid;
1551            Self::drain_ll_min_txid(&mut cache, &mut inner.haunted, min_txid);
1552        }
1553
1554        stats.p_weight(inner.p as u64);
1555        stats.shared_max(shared.max as u64);
1556        stats.freq(inner.freq.len() as u64);
1557        stats.recent(inner.rec.len() as u64);
1558        stats.all_seen_keys(cache.len() as u64);
1559
1560        // Indicate if we are at/above watermark, so that read/writers begin to indicate their
1561        // hit events so we can start to setup/order our arc sets correctly.
1562        //
1563        // If we drop below this again, they'll go back to just insert/remove content only mode.
1564        if init_above_watermark {
1565            if (inner.freq.len() + inner.rec.len()) < shared.watermark {
1566                self.above_watermark.store(false, Ordering::Relaxed);
1567            }
1568        } else if (inner.freq.len() + inner.rec.len()) >= shared.watermark {
1569            self.above_watermark.store(true, Ordering::Relaxed);
1570        }
1571
1572        // commit on the wr txn.
1573        cache.commit();
1574        // done!
1575
1576        // Return the stats to the caller.
1577        stats
1578    }
1579}
1580
1581impl<
1582        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
1583        V: Clone + Debug + Sync + Send + 'static,
1584        S: ARCacheWriteStat<K>,
1585    > ARCacheWriteTxn<'_, K, V, S>
1586{
1587    /// Commit the changes of this writer, making them globally visible. This causes
1588    /// all items written to this thread's local store to become visible in the main
1589    /// cache.
1590    ///
1591    /// To rollback (abort) and operation, just do not call commit (consider std::mem::drop
1592    /// on the write transaction)
1593    pub fn commit(self) -> S {
1594        self.caller.commit(
1595            self.cache,
1596            self.tlocal,
1597            self.hit.into_inner(),
1598            self.clear.into_inner(),
1599            self.above_watermark,
1600            // self.read_ops.into_inner(),
1601            self.stats,
1602        )
1603    }
1604
1605    /// Clear all items of the cache. This operation does not take effect until you commit.
1606    /// After calling "clear", you may then include new items which will be stored thread
1607    /// locally until you commit.
1608    pub fn clear(&mut self) {
1609        // Mark that we have been requested to clear the cache.
1610        unsafe {
1611            let clear_ptr = self.clear.get();
1612            *clear_ptr = true;
1613        }
1614        // Dump the hit log.
1615        unsafe {
1616            let hit_ptr = self.hit.get();
1617            (*hit_ptr).clear();
1618        }
1619
1620        // Throw away any read ops we did on the old values since they'll
1621        // mess up stat numbers.
1622        self.stats.cache_clear();
1623        /*
1624        unsafe {
1625            let op_ptr = self.read_ops.get();
1626            (*op_ptr) = 0;
1627        }
1628        */
1629
1630        // Dump the thread local state.
1631        self.tlocal.clear();
1632        // From this point any get will miss on the main cache.
1633        // Inserts are accepted.
1634    }
1635
1636    /// Attempt to retrieve a k-v pair from the cache. If it is present in the main cache OR
1637    /// the thread local cache, a `Some` is returned, else you will receive a `None`. On a
1638    /// `None`, you must then consult the external data source that this structure is acting
1639    /// as a cache for.
1640    pub fn get<Q>(&mut self, k: &Q) -> Option<&V>
1641    where
1642        K: Borrow<Q>,
1643        Q: Hash + Eq + Ord + ?Sized,
1644    {
1645        let k_hash: u64 = self.cache.prehash(k);
1646
1647        // Track the attempted read op
1648        /*
1649        unsafe {
1650            let op_ptr = self.read_ops.get();
1651            (*op_ptr) += 1;
1652        }
1653        */
1654        self.stats.cache_read();
1655
1656        let r: Option<&V> = if let Some(tci) = self.tlocal.get(k) {
1657            match tci {
1658                ThreadCacheItem::Present(v, _clean, _size) => {
1659                    let v = v as *const _;
1660                    unsafe { Some(&(*v)) }
1661                }
1662                ThreadCacheItem::Removed(_clean) => {
1663                    return None;
1664                }
1665            }
1666        } else {
1667            // If we have been requested to clear, the main cache is "empty"
1668            // but we can't do that until a commit, so we just flag it and avoid.
1669            let is_cleared = unsafe {
1670                let clear_ptr = self.clear.get();
1671                *clear_ptr
1672            };
1673            if !is_cleared {
1674                if let Some(v) = self.cache.get_prehashed(k, k_hash) {
1675                    (*v).to_vref()
1676                } else {
1677                    None
1678                }
1679            } else {
1680                None
1681            }
1682        };
1683
1684        if r.is_some() {
1685            self.stats.cache_hit();
1686        }
1687
1688        // How do we track this was a hit?
1689        // Remember, we don't track misses - they are *implied* by the fact they'll trigger
1690        // an inclusion from the external system. Subsequent, any further re-hit on an
1691        // included value WILL be tracked, allowing arc to adjust appropriately.
1692        if self.above_watermark && r.is_some() {
1693            unsafe {
1694                let hit_ptr = self.hit.get();
1695                (*hit_ptr).push(k_hash);
1696            }
1697        }
1698        r
1699    }
1700
1701    /// If a value is in the thread local cache, retrieve it for mutation. If the value
1702    /// is not in the thread local cache, it is retrieved and cloned from the main cache. If
1703    /// the value had been marked for removal, it must first be re-inserted.
1704    ///
1705    /// # Safety
1706    ///
1707    /// Since you are mutating the state of the value, if you have sized insertions you MAY
1708    /// break this since you can change the weight of the value to be inconsistent
1709    pub fn get_mut<Q>(&mut self, k: &Q, make_dirty: bool) -> Option<&mut V>
1710    where
1711        K: Borrow<Q>,
1712        Q: Hash + Eq + Ord + ?Sized,
1713    {
1714        // If we were requested to clear, we can not copy to the tlocal cache.
1715        let is_cleared = unsafe {
1716            let clear_ptr = self.clear.get();
1717            *clear_ptr
1718        };
1719
1720        // If the main cache has NOT been cleared (ie it has items) and our tlocal
1721        // does NOT contain this key, then we prime it.
1722        if !is_cleared && !self.tlocal.contains_key(k) {
1723            // Copy from the core cache into the tlocal.
1724            let k_hash: u64 = self.cache.prehash(k);
1725            if let Some(v) = self.cache.get_prehashed(k, k_hash) {
1726                if let Some((dk, dv, ds)) = v.to_kvsref() {
1727                    self.tlocal.insert(
1728                        dk.clone(),
1729                        ThreadCacheItem::Present(dv.clone(), !make_dirty, ds),
1730                    );
1731                }
1732            }
1733        };
1734
1735        // Now return from the tlocal, if present, a mut pointer.
1736
1737        match self.tlocal.get_mut(k) {
1738            Some(ThreadCacheItem::Present(v, clean, _size)) => {
1739                if make_dirty && *clean {
1740                    *clean = false;
1741                }
1742                let v = v as *mut _;
1743                unsafe { Some(&mut (*v)) }
1744            }
1745            _ => None,
1746        }
1747    }
1748
1749    /// Determine if this cache contains the following key.
1750    pub fn contains_key<Q>(&mut self, k: &Q) -> bool
1751    where
1752        K: Borrow<Q>,
1753        Q: Hash + Eq + Ord + ?Sized,
1754    {
1755        self.get(k).is_some()
1756    }
1757
1758    /// Add a value to the cache. This may be because you have had a cache miss and
1759    /// now wish to include in the thread local storage, or because you have written
1760    /// a new value and want it to be submitted for caching. This item is marked as
1761    /// clean, IE you have synced it to whatever associated store exists.
1762    pub fn insert(&mut self, k: K, v: V) {
1763        self.tlocal.insert(k, ThreadCacheItem::Present(v, true, 1));
1764    }
1765
1766    /// Insert an item to the cache, with an associated weight/size factor. See also `insert`
1767    pub fn insert_sized(&mut self, k: K, v: V, size: NonZeroUsize) {
1768        self.tlocal
1769            .insert(k, ThreadCacheItem::Present(v, true, size.get()));
1770    }
1771
1772    /// Remove this value from the thread local cache IE mask from from being
1773    /// returned until this thread performs an insert. This item is marked as clean
1774    /// IE you have synced it to whatever associated store exists.
1775    pub fn remove(&mut self, k: K) {
1776        self.tlocal.insert(k, ThreadCacheItem::Removed(true));
1777    }
1778
1779    /// Add a value to the cache. This may be because you have had a cache miss and
1780    /// now wish to include in the thread local storage, or because you have written
1781    /// a new value and want it to be submitted for caching. This item is marked as
1782    /// dirty, because you have *not* synced it. You MUST call iter_mut_mark_clean before calling
1783    /// `commit` on this transaction, or a panic will occur.
1784    pub fn insert_dirty(&mut self, k: K, v: V) {
1785        self.tlocal.insert(k, ThreadCacheItem::Present(v, false, 1));
1786    }
1787
1788    /// Insert a dirty item to the cache, with an associated weight/size factor. See also `insert_dirty`
1789    pub fn insert_dirty_sized(&mut self, k: K, v: V, size: NonZeroUsize) {
1790        self.tlocal
1791            .insert(k, ThreadCacheItem::Present(v, false, size.get()));
1792    }
1793
1794    /// Remove this value from the thread local cache IE mask from from being
1795    /// returned until this thread performs an insert. This item is marked as
1796    /// dirty, because you have *not* synced it. You MUST call iter_mut_mark_clean before calling
1797    /// `commit` on this transaction, or a panic will occur.
1798    pub fn remove_dirty(&mut self, k: K) {
1799        self.tlocal.insert(k, ThreadCacheItem::Removed(false));
1800    }
1801
1802    /// Determines if dirty elements exist in this cache or not.
1803    pub fn is_dirty(&self) -> bool {
1804        self.iter_dirty().take(1).next().is_some()
1805    }
1806
1807    /// Yields an iterator over all values that are currently dirty. As the iterator
1808    /// progresses, items will NOT be marked clean. This allows you to examine
1809    /// any currently dirty items in the cache.
1810    pub fn iter_dirty(&self) -> impl Iterator<Item = (&K, Option<&V>)> {
1811        self.tlocal
1812            .iter()
1813            .filter(|(_k, v)| match v {
1814                ThreadCacheItem::Present(_v, c, _size) => !c,
1815                ThreadCacheItem::Removed(c) => !c,
1816            })
1817            .map(|(k, v)| {
1818                // Get the data.
1819                let data = match v {
1820                    ThreadCacheItem::Present(v, _c, _size) => Some(v),
1821                    ThreadCacheItem::Removed(_c) => None,
1822                };
1823                (k, data)
1824            })
1825    }
1826
1827    /// Yields a mutable iterator over all values that are currently dirty. As the iterator
1828    /// progresses, items will NOT be marked clean. This allows you to modify and
1829    /// change any currently dirty items as required.
1830    pub fn iter_mut_dirty(&mut self) -> impl Iterator<Item = (&K, Option<&mut V>)> {
1831        self.tlocal
1832            .iter_mut()
1833            .filter(|(_k, v)| match v {
1834                ThreadCacheItem::Present(_v, c, _size) => !c,
1835                ThreadCacheItem::Removed(c) => !c,
1836            })
1837            .map(|(k, v)| {
1838                // Get the data.
1839                let data = match v {
1840                    ThreadCacheItem::Present(v, _c, _size) => Some(v),
1841                    ThreadCacheItem::Removed(_c) => None,
1842                };
1843                (k, data)
1844            })
1845    }
1846
1847    /// Yields an iterator over all values that are currently dirty. As the iterator
1848    /// progresses, items will be marked clean. This is where you should sync dirty
1849    /// cache content to your associated store. The iterator is `K, Option<V>`, where
1850    /// the `Option<V>` indicates if the item has been remove (None) or is updated (Some).
1851    pub fn iter_mut_mark_clean(&mut self) -> impl Iterator<Item = (&K, Option<&mut V>)> {
1852        self.tlocal
1853            .iter_mut()
1854            .filter(|(_k, v)| match v {
1855                ThreadCacheItem::Present(_v, c, _size) => !c,
1856                ThreadCacheItem::Removed(c) => !c,
1857            })
1858            .map(|(k, v)| {
1859                // Mark it clean.
1860                match v {
1861                    ThreadCacheItem::Present(_v, c, _size) => *c = true,
1862                    ThreadCacheItem::Removed(c) => *c = true,
1863                }
1864                // Get the data.
1865                let data = match v {
1866                    ThreadCacheItem::Present(v, _c, _size) => Some(v),
1867                    ThreadCacheItem::Removed(_c) => None,
1868                };
1869                (k, data)
1870            })
1871    }
1872
1873    /// Yield an iterator over all currently live and valid cache items.
1874    pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
1875        self.cache.values().filter_map(|ci| match &ci {
1876            CacheItem::Rec(lln, v) => {
1877                let cii = lln.as_ref();
1878                Some((&cii.k, v))
1879            }
1880            CacheItem::Freq(lln, v) => {
1881                let cii = lln.as_ref();
1882                Some((&cii.k, v))
1883            }
1884            _ => None,
1885        })
1886    }
1887
1888    /// Yield an iterator over all currently live and valid items in the
1889    /// recent access list.
1890    pub fn iter_rec(&self) -> impl Iterator<Item = &K> {
1891        self.cache.values().filter_map(|ci| match &ci {
1892            CacheItem::Rec(lln, _) => {
1893                let cii = lln.as_ref();
1894                Some(&cii.k)
1895            }
1896            _ => None,
1897        })
1898    }
1899
1900    /// Yield an iterator over all currently live and valid items in the
1901    /// frequent access list.
1902    pub fn iter_freq(&self) -> impl Iterator<Item = &K> {
1903        self.cache.values().filter_map(|ci| match &ci {
1904            CacheItem::Rec(lln, _) => {
1905                let cii = lln.as_ref();
1906                Some(&cii.k)
1907            }
1908            _ => None,
1909        })
1910    }
1911
1912    #[cfg(test)]
1913    pub(crate) fn iter_ghost_rec(&self) -> impl Iterator<Item = &K> {
1914        self.cache.values().filter_map(|ci| match &ci {
1915            CacheItem::GhostRec(lln) => {
1916                let cii = lln.as_ref();
1917                Some(&cii.k)
1918            }
1919            _ => None,
1920        })
1921    }
1922
1923    #[cfg(test)]
1924    pub(crate) fn iter_ghost_freq(&self) -> impl Iterator<Item = &K> {
1925        self.cache.values().filter_map(|ci| match &ci {
1926            CacheItem::GhostFreq(lln) => {
1927                let cii = lln.as_ref();
1928                Some(&cii.k)
1929            }
1930            _ => None,
1931        })
1932    }
1933
1934    #[cfg(test)]
1935    pub(crate) fn peek_hit(&self) -> &[u64] {
1936        let hit_ptr = self.hit.get();
1937        unsafe { &(*hit_ptr) }
1938    }
1939
1940    #[cfg(test)]
1941    pub(crate) fn peek_cache<Q: ?Sized>(&self, k: &Q) -> CacheState
1942    where
1943        K: Borrow<Q>,
1944        Q: Hash + Eq + Ord,
1945    {
1946        if let Some(v) = self.cache.get(k) {
1947            (*v).to_state()
1948        } else {
1949            CacheState::None
1950        }
1951    }
1952
1953    #[cfg(test)]
1954    pub(crate) fn peek_stat(&self) -> CStat {
1955        let inner = self.caller.inner.lock().unwrap();
1956        let shared = self.caller.shared.read().unwrap();
1957        CStat {
1958            max: shared.max,
1959            cache: self.cache.len(),
1960            tlocal: self.tlocal.len(),
1961            freq: inner.freq.len(),
1962            rec: inner.rec.len(),
1963            ghost_freq: inner.ghost_freq.len(),
1964            ghost_rec: inner.ghost_rec.len(),
1965            haunted: inner.haunted.len(),
1966            p: inner.p,
1967        }
1968    }
1969
1970    // to_snapshot
1971}
1972
1973impl<
1974        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
1975        V: Clone + Debug + Sync + Send + 'static,
1976        S: ARCacheReadStat + Clone,
1977    > ARCacheReadTxn<'_, K, V, S>
1978{
1979    /// Attempt to retrieve a k-v pair from the cache. If it is present in the main cache OR
1980    /// the thread local cache, a `Some` is returned, else you will receive a `None`. On a
1981    /// `None`, you must then consult the external data source that this structure is acting
1982    /// as a cache for.
1983    pub fn get<Q>(&mut self, k: &Q) -> Option<&V>
1984    where
1985        K: Borrow<Q>,
1986        Q: Hash + Eq + Ord + ?Sized,
1987    {
1988        let k_hash: u64 = self.cache.prehash(k);
1989
1990        self.stats.cache_read();
1991        // self.ops += 1;
1992
1993        let mut hits = false;
1994        let mut tlocal_hits = false;
1995
1996        let r: Option<&V> = self
1997            .tlocal
1998            .as_ref()
1999            .and_then(|cache| {
2000                cache.set.get(k).map(|v| {
2001                    // Indicate a hit on the tlocal cache.
2002                    tlocal_hits = true;
2003
2004                    if self.above_watermark {
2005                        let _ = self.hit_queue.push(CacheHitEvent {
2006                            t: Instant::now(),
2007                            k_hash,
2008                        });
2009                    }
2010                    unsafe {
2011                        let v = &v.as_ref().v as *const _;
2012                        // This discards the lifetime and repins it to the lifetime of `self`.
2013                        &(*v)
2014                    }
2015                })
2016            })
2017            .or_else(|| {
2018                self.cache.get_prehashed(k, k_hash).and_then(|v| {
2019                    (*v).to_vref().map(|vin| {
2020                        // Indicate a hit on the main cache.
2021                        hits = true;
2022
2023                        if self.above_watermark {
2024                            let _ = self.hit_queue.push(CacheHitEvent {
2025                                t: Instant::now(),
2026                                k_hash,
2027                            });
2028                        }
2029
2030                        unsafe {
2031                            let vin = vin as *const _;
2032                            &(*vin)
2033                        }
2034                    })
2035                })
2036            });
2037
2038        if tlocal_hits {
2039            self.stats.cache_local_hit()
2040        } else if hits {
2041            self.stats.cache_main_hit()
2042        };
2043
2044        r
2045    }
2046
2047    /// Determine if this cache contains the following key.
2048    pub fn contains_key<Q>(&mut self, k: &Q) -> bool
2049    where
2050        K: Borrow<Q>,
2051        Q: Hash + Eq + Ord + ?Sized,
2052    {
2053        self.get(k).is_some()
2054    }
2055
2056    /// Insert an item to the cache, with an associated weight/size factor. See also `insert`
2057    pub fn insert_sized(&mut self, k: K, v: V, size: NonZeroUsize) {
2058        let mut v = v;
2059        let size = size.get();
2060        // Send a copy forward through time and space.
2061        // let _ = self.tx.try_send(
2062        if self
2063            .inc_queue
2064            .push(CacheIncludeEvent {
2065                t: Instant::now(),
2066                k: k.clone(),
2067                v: v.clone(),
2068                txid: self.cache.get_txid(),
2069                size,
2070            })
2071            .is_ok()
2072        {
2073            self.stats.include();
2074        } else {
2075            self.stats.failed_include();
2076        }
2077
2078        // We have a cache, so lets update it.
2079        if let Some(ref mut cache) = self.tlocal {
2080            self.stats.local_include();
2081            let n = if cache.tlru.len() >= cache.read_size {
2082                let mut owned = cache.tlru.pop();
2083                // swap the old_key/old_val out
2084                let mut k_clone = k.clone();
2085                mem::swap(&mut k_clone, &mut owned.as_mut().k);
2086                mem::swap(&mut v, &mut owned.as_mut().v);
2087                // remove old K from the tree:
2088                cache.set.remove(&k_clone);
2089                // Return the owned node into the lru
2090                cache.tlru.append_n(owned)
2091            } else {
2092                // Just add it!
2093                cache.tlru.append_k(ReadCacheItem {
2094                    k: k.clone(),
2095                    v,
2096                    size,
2097                })
2098            };
2099            let r = cache.set.insert(k, n);
2100            // There should never be a previous value.
2101            assert!(r.is_none());
2102        }
2103    }
2104
2105    /// Add a value to the cache. This may be because you have had a cache miss and
2106    /// now wish to include in the thread local storage.
2107    ///
2108    /// Note that is invalid to insert an item who's key already exists in this thread local cache,
2109    /// and this is asserted IE will panic if you attempt this. It is also invalid for you to insert
2110    /// a value that does not match the source-of-truth state, IE inserting a different
2111    /// value than another thread may perceive. This is a *read* thread, so you should only be adding
2112    /// values that are relevant to this read transaction and this point in time. If you do not
2113    /// heed this warning, you may alter the fabric of time and space and have some interesting
2114    /// distortions in your data over time.
2115    pub fn insert(&mut self, k: K, v: V) {
2116        self.insert_sized(k, v, unsafe { NonZeroUsize::new_unchecked(1) })
2117    }
2118
2119    /// _
2120    pub fn finish(self) -> S {
2121        let stats = self.stats.clone();
2122        drop(self);
2123
2124        stats
2125    }
2126}
2127
2128impl<
2129        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
2130        V: Clone + Debug + Sync + Send + 'static,
2131        S: ARCacheReadStat + Clone,
2132    > Drop for ARCacheReadTxn<'_, K, V, S>
2133{
2134    fn drop(&mut self) {
2135        // We could make this check the queue sizes rather than blindly quiescing
2136        if self.reader_quiesce {
2137            self.caller.try_quiesce();
2138        }
2139    }
2140}
2141
2142#[cfg(test)]
2143mod tests {
2144    use super::stats::{TraceStat, WriteCountStat};
2145    use super::ARCache;
2146    use super::ARCacheBuilder;
2147    use super::CStat;
2148    use super::CacheState;
2149    use std::num::NonZeroUsize;
2150    use std::sync::Arc;
2151    use std::thread;
2152
2153    use std::sync::atomic::{AtomicBool, Ordering};
2154
2155    #[test]
2156    fn test_cache_arc_basic() {
2157        let arc: ARCache<usize, usize> = ARCacheBuilder::new()
2158            .set_size(4, 4)
2159            .build()
2160            .expect("Invalid cache parameters!");
2161        let mut wr_txn = arc.write();
2162
2163        assert!(wr_txn.get(&1).is_none());
2164        assert!(wr_txn.peek_hit().is_empty());
2165        wr_txn.insert(1, 1);
2166        assert!(wr_txn.get(&1) == Some(&1));
2167        assert!(wr_txn.peek_hit().len() == 1);
2168
2169        wr_txn.commit();
2170
2171        // Now we start the second txn, and see if it's in there.
2172        let mut wr_txn = arc.write();
2173        assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
2174        assert!(wr_txn.get(&1) == Some(&1));
2175        assert!(wr_txn.peek_hit().len() == 1);
2176        wr_txn.commit();
2177        // And now check it's moved to Freq due to the extra
2178        let wr_txn = arc.write();
2179        assert!(wr_txn.peek_cache(&1) == CacheState::Freq);
2180        println!("{:?}", wr_txn.peek_stat());
2181    }
2182
2183    #[test]
2184    fn test_cache_evict() {
2185        let _ = tracing_subscriber::fmt::try_init();
2186        println!("== 1");
2187        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2188            .set_size(4, 4)
2189            .build()
2190            .expect("Invalid cache parameters!");
2191        let stats = TraceStat {};
2192
2193        let mut wr_txn = arc.write_stats(stats);
2194        assert!(
2195            CStat {
2196                max: 4,
2197                cache: 0,
2198                tlocal: 0,
2199                freq: 0,
2200                rec: 0,
2201                ghost_freq: 0,
2202                ghost_rec: 0,
2203                haunted: 0,
2204                p: 0
2205            } == wr_txn.peek_stat()
2206        );
2207
2208        // In the first txn we insert 4 items.
2209        wr_txn.insert(1, 1);
2210        wr_txn.insert(2, 2);
2211        wr_txn.insert(3, 3);
2212        wr_txn.insert(4, 4);
2213
2214        assert!(
2215            CStat {
2216                max: 4,
2217                cache: 0,
2218                tlocal: 4,
2219                freq: 0,
2220                rec: 0,
2221                ghost_freq: 0,
2222                ghost_rec: 0,
2223                haunted: 0,
2224                p: 0
2225            } == wr_txn.peek_stat()
2226        );
2227        let stats = wr_txn.commit();
2228
2229        // Now we start the second txn, and check the stats.
2230        println!("== 2");
2231        let mut wr_txn = arc.write_stats(stats);
2232        assert!(
2233            CStat {
2234                max: 4,
2235                cache: 4,
2236                tlocal: 0,
2237                freq: 0,
2238                rec: 4,
2239                ghost_freq: 0,
2240                ghost_rec: 0,
2241                haunted: 0,
2242                p: 0
2243            } == wr_txn.peek_stat()
2244        );
2245
2246        // Now touch two items, this promote to the freq set.
2247        // Remember, a double hit doesn't weight any more than 1 hit.
2248        assert!(wr_txn.get(&1) == Some(&1));
2249        assert!(wr_txn.get(&1) == Some(&1));
2250        assert!(wr_txn.get(&2) == Some(&2));
2251
2252        let stats = wr_txn.commit();
2253
2254        // Now we start the third txn, and check the stats.
2255        println!("== 3");
2256        let mut wr_txn = arc.write_stats(stats);
2257        assert!(
2258            CStat {
2259                max: 4,
2260                cache: 4,
2261                tlocal: 0,
2262                freq: 2,
2263                rec: 2,
2264                ghost_freq: 0,
2265                ghost_rec: 0,
2266                haunted: 0,
2267                p: 0
2268            } == wr_txn.peek_stat()
2269        );
2270        // Add one more item, this will trigger an evict.
2271        wr_txn.insert(5, 5);
2272        let stats = wr_txn.commit();
2273
2274        // Now we start the fourth txn, and check the stats.
2275        println!("== 4");
2276        let mut wr_txn = arc.write_stats(stats);
2277        println!("stat -> {:?}", wr_txn.peek_stat());
2278        assert!(
2279            CStat {
2280                max: 4,
2281                cache: 5,
2282                tlocal: 0,
2283                freq: 2,
2284                rec: 2,
2285                ghost_freq: 0,
2286                ghost_rec: 1,
2287                haunted: 0,
2288                p: 0
2289            } == wr_txn.peek_stat()
2290        );
2291        // And assert what's in the sets to be sure of what went where.
2292        // 🚨 Can no longer peek these with hashmap backing as the keys may
2293        // be evicted out-of-order, but the stats are correct!
2294
2295        // Now touch the two recent items to bring them also to freq
2296
2297        let rec_set: Vec<usize> = wr_txn.iter_rec().take(2).copied().collect();
2298        assert!(wr_txn.get(&rec_set[0]) == Some(&rec_set[0]));
2299        assert!(wr_txn.get(&rec_set[1]) == Some(&rec_set[1]));
2300
2301        let stats = wr_txn.commit();
2302
2303        // Now we start the fifth txn, and check the stats.
2304        println!("== 5");
2305        let mut wr_txn = arc.write_stats(stats);
2306        println!("stat -> {:?}", wr_txn.peek_stat());
2307        assert!(
2308            CStat {
2309                max: 4,
2310                cache: 5,
2311                tlocal: 0,
2312                freq: 4,
2313                rec: 0,
2314                ghost_freq: 0,
2315                ghost_rec: 1,
2316                haunted: 0,
2317                p: 0
2318            } == wr_txn.peek_stat()
2319        );
2320        // And assert what's in the sets to be sure of what went where.
2321        // 🚨 Can no longer peek these with hashmap backing as the keys may
2322        // be evicted out-of-order, but the stats are correct!
2323
2324        // Now touch the one item that's in ghost rec - this will trigger
2325        // an evict from ghost freq
2326        let grec: usize = wr_txn.iter_ghost_rec().take(1).copied().next().unwrap();
2327        wr_txn.insert(grec, grec);
2328        assert!(wr_txn.get(&grec) == Some(&grec));
2329        // When we add 3, we are basically issuing a demand that the rec set should be
2330        // allowed to grow as we had a potential cache miss here.
2331        let stats = wr_txn.commit();
2332
2333        // Now we start the sixth txn, and check the stats.
2334        println!("== 6");
2335        let mut wr_txn = arc.write_stats(stats);
2336        println!("stat -> {:?}", wr_txn.peek_stat());
2337        assert!(
2338            CStat {
2339                max: 4,
2340                cache: 5,
2341                tlocal: 0,
2342                freq: 3,
2343                rec: 1,
2344                ghost_freq: 1,
2345                ghost_rec: 0,
2346                haunted: 0,
2347                p: 1
2348            } == wr_txn.peek_stat()
2349        );
2350        // And assert what's in the sets to be sure of what went where.
2351        // 🚨 Can no longer peek these with hashmap backing as the keys may
2352        // be evicted out-of-order, but the stats are correct!
2353        assert!(wr_txn.peek_cache(&grec) == CacheState::Rec);
2354
2355        // Right, seventh txn - we show how a cache scan doesn't cause p shifting or evict.
2356        // tl;dr - attempt to include a bunch in a scan, and it will be ignored as p is low,
2357        // and any miss on rec won't shift p unless it's in the ghost rec.
2358        wr_txn.insert(10, 10);
2359        wr_txn.insert(11, 11);
2360        wr_txn.insert(12, 12);
2361        let stats = wr_txn.commit();
2362
2363        println!("== 7");
2364        let mut wr_txn = arc.write_stats(stats);
2365        println!("stat -> {:?}", wr_txn.peek_stat());
2366        assert!(
2367            CStat {
2368                max: 4,
2369                cache: 8,
2370                tlocal: 0,
2371                freq: 3,
2372                rec: 1,
2373                ghost_freq: 1,
2374                ghost_rec: 3,
2375                haunted: 0,
2376                p: 1
2377            } == wr_txn.peek_stat()
2378        );
2379        // 🚨 Can no longer peek these with hashmap backing as the keys may
2380        // be evicted out-of-order, but the stats are correct!
2381
2382        // Eight txn - now that we had a demand for items before, we re-demand them - this will trigger
2383        // a shift in p, causing some more to be in the rec cache.
2384        let grec_set: Vec<usize> = wr_txn.iter_ghost_rec().take(3).copied().collect();
2385        println!("{:?}", grec_set);
2386
2387        grec_set
2388            .iter()
2389            .for_each(|i| println!("{:?}", wr_txn.peek_cache(i)));
2390
2391        grec_set.iter().for_each(|i| wr_txn.insert(*i, *i));
2392
2393        grec_set
2394            .iter()
2395            .for_each(|i| println!("{:?}", wr_txn.peek_cache(i)));
2396        wr_txn.commit();
2397
2398        println!("== 8");
2399        let mut wr_txn = arc.write();
2400        println!("stat -> {:?}", wr_txn.peek_stat());
2401        assert!(
2402            CStat {
2403                max: 4,
2404                cache: 8,
2405                tlocal: 0,
2406                freq: 0,
2407                rec: 4,
2408                ghost_freq: 4,
2409                ghost_rec: 0,
2410                haunted: 0,
2411                p: 4
2412            } == wr_txn.peek_stat()
2413        );
2414
2415        grec_set
2416            .iter()
2417            .for_each(|i| println!("{:?}", wr_txn.peek_cache(i)));
2418        grec_set
2419            .iter()
2420            .for_each(|i| assert!(wr_txn.peek_cache(i) == CacheState::Rec));
2421
2422        // Now lets go back the other way - we want freq items to come back.
2423        let gfreq_set: Vec<usize> = wr_txn.iter_ghost_freq().take(4).copied().collect();
2424
2425        gfreq_set.iter().for_each(|i| wr_txn.insert(*i, *i));
2426        wr_txn.commit();
2427
2428        println!("== 9");
2429        let wr_txn = arc.write();
2430        println!("stat -> {:?}", wr_txn.peek_stat());
2431        assert!(
2432            CStat {
2433                max: 4,
2434                cache: 8,
2435                tlocal: 0,
2436                freq: 4,
2437                rec: 0,
2438                ghost_freq: 0,
2439                ghost_rec: 4,
2440                haunted: 0,
2441                p: 0
2442            } == wr_txn.peek_stat()
2443        );
2444        // 🚨 Can no longer peek these with hashmap backing as the keys may
2445        // be evicted out-of-order, but the stats are correct!
2446        gfreq_set
2447            .iter()
2448            .for_each(|i| assert!(wr_txn.peek_cache(i) == CacheState::Freq));
2449
2450        // And done!
2451        let () = wr_txn.commit();
2452        // See what stats did
2453        // let stats = arc.view_stats();
2454        // println!("{:?}", *stats);
2455    }
2456
2457    #[test]
2458    fn test_cache_concurrent_basic() {
2459        // Now we want to check some basic interactions of read and write together.
2460        // Setup the cache.
2461        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2462            .set_size(4, 4)
2463            .build()
2464            .expect("Invalid cache parameters!");
2465        // start a rd
2466        {
2467            let mut rd_txn = arc.read();
2468            // add items to the rd
2469            rd_txn.insert(1, 1);
2470            rd_txn.insert(2, 2);
2471            rd_txn.insert(3, 3);
2472            rd_txn.insert(4, 4);
2473            // Should be in the tlocal
2474            // assert!(rd_txn.get(&1).is_some());
2475            // assert!(rd_txn.get(&2).is_some());
2476            // assert!(rd_txn.get(&3).is_some());
2477            // assert!(rd_txn.get(&4).is_some());
2478            // end the rd
2479        }
2480        arc.try_quiesce();
2481        // What state is the cache now in?
2482        println!("== 2");
2483        let wr_txn = arc.write();
2484        println!("{:?}", wr_txn.peek_stat());
2485        assert!(
2486            CStat {
2487                max: 4,
2488                cache: 4,
2489                tlocal: 0,
2490                freq: 0,
2491                rec: 4,
2492                ghost_freq: 0,
2493                ghost_rec: 0,
2494                haunted: 0,
2495                p: 0
2496            } == wr_txn.peek_stat()
2497        );
2498        assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
2499        assert!(wr_txn.peek_cache(&2) == CacheState::Rec);
2500        assert!(wr_txn.peek_cache(&3) == CacheState::Rec);
2501        assert!(wr_txn.peek_cache(&4) == CacheState::Rec);
2502        // Magic! Without a single write op we included items!
2503        // Lets have the read touch two items, and then add two new.
2504        // This will trigger evict on 1/2
2505        {
2506            let mut rd_txn = arc.read();
2507            // add items to the rd
2508            assert!(rd_txn.get(&3) == Some(&3));
2509            assert!(rd_txn.get(&4) == Some(&4));
2510            rd_txn.insert(5, 5);
2511            rd_txn.insert(6, 6);
2512            // end the rd
2513        }
2514        // Now commit and check the state.
2515        wr_txn.commit();
2516        println!("== 3");
2517        let wr_txn = arc.write();
2518        assert!(
2519            CStat {
2520                max: 4,
2521                cache: 6,
2522                tlocal: 0,
2523                freq: 2,
2524                rec: 2,
2525                ghost_freq: 0,
2526                ghost_rec: 2,
2527                haunted: 0,
2528                p: 0
2529            } == wr_txn.peek_stat()
2530        );
2531        assert!(wr_txn.peek_cache(&1) == CacheState::GhostRec);
2532        assert!(wr_txn.peek_cache(&2) == CacheState::GhostRec);
2533        assert!(wr_txn.peek_cache(&3) == CacheState::Freq);
2534        assert!(wr_txn.peek_cache(&4) == CacheState::Freq);
2535        assert!(wr_txn.peek_cache(&5) == CacheState::Rec);
2536        assert!(wr_txn.peek_cache(&6) == CacheState::Rec);
2537
2538        // Now trigger hits on 1/2 which will cause a shift in P.
2539        {
2540            let mut rd_txn = arc.read();
2541            // add items to the rd
2542            rd_txn.insert(1, 1);
2543            rd_txn.insert(2, 2);
2544            // end the rd
2545        }
2546
2547        wr_txn.commit();
2548        println!("== 4");
2549        let wr_txn = arc.write();
2550        assert!(
2551            CStat {
2552                max: 4,
2553                cache: 6,
2554                tlocal: 0,
2555                freq: 2,
2556                rec: 2,
2557                ghost_freq: 0,
2558                ghost_rec: 2,
2559                haunted: 0,
2560                p: 2
2561            } == wr_txn.peek_stat()
2562        );
2563        assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
2564        assert!(wr_txn.peek_cache(&2) == CacheState::Rec);
2565        assert!(wr_txn.peek_cache(&3) == CacheState::Freq);
2566        assert!(wr_txn.peek_cache(&4) == CacheState::Freq);
2567        assert!(wr_txn.peek_cache(&5) == CacheState::GhostRec);
2568        assert!(wr_txn.peek_cache(&6) == CacheState::GhostRec);
2569        // See what stats did
2570        // let stats = arc.view_stats();
2571        // println!("stats 1: {:?}", *stats);
2572        // assert!(stats.reader_hits == 2);
2573        // assert!(stats.reader_includes == 8);
2574        // assert!(stats.reader_tlocal_includes == 8);
2575        // assert!(stats.reader_tlocal_hits == 0);
2576    }
2577
2578    // Test edge cases that are horrifying and could destroy peoples lives
2579    // and sanity.
2580    #[test]
2581    fn test_cache_concurrent_cursed_1() {
2582        // Case 1 - It's possible for a read transaction to last for a long time,
2583        // and then have a cache include, which may cause an attempt to include
2584        // an outdated value into the cache. To handle this the haunted set exists
2585        // so that all keys and their eviction ids are always tracked for all of time
2586        // to ensure that we never incorrectly include a value that may have been updated
2587        // more recently.
2588        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2589            .set_size(4, 4)
2590            .build()
2591            .expect("Invalid cache parameters!");
2592
2593        // Start a wr
2594        let mut wr_txn = arc.write();
2595        // Start a rd
2596        let mut rd_txn = arc.read();
2597        // Add the value 1,1 via the wr.
2598        wr_txn.insert(1, 1);
2599
2600        // assert 1 is not in rd.
2601        assert!(rd_txn.get(&1).is_none());
2602
2603        // Commit wr
2604        wr_txn.commit();
2605        // Even after the commit, it's not in rd.
2606        assert!(rd_txn.get(&1).is_none());
2607        // begin wr
2608        let mut wr_txn = arc.write();
2609        // We now need to flood the cache, to cause ghost rec eviction.
2610        wr_txn.insert(10, 1);
2611        wr_txn.insert(11, 1);
2612        wr_txn.insert(12, 1);
2613        wr_txn.insert(13, 1);
2614        wr_txn.insert(14, 1);
2615        wr_txn.insert(15, 1);
2616        wr_txn.insert(16, 1);
2617        wr_txn.insert(17, 1);
2618        // commit wr
2619        wr_txn.commit();
2620
2621        // begin wr
2622        let wr_txn = arc.write();
2623        // assert that 1 is haunted.
2624        assert!(wr_txn.peek_cache(&1) == CacheState::Haunted);
2625        // assert 1 is not in rd.
2626        assert!(rd_txn.get(&1).is_none());
2627        // now that 1 is hanuted, in rd attempt to insert 1, X
2628        rd_txn.insert(1, 100);
2629        // commit wr
2630        wr_txn.commit();
2631
2632        // start wr
2633        let wr_txn = arc.write();
2634        // assert that 1 is still haunted.
2635        assert!(wr_txn.peek_cache(&1) == CacheState::Haunted);
2636        // assert that 1, x is in rd.
2637        assert!(rd_txn.get(&1) == Some(&100));
2638        // done!
2639    }
2640
2641    #[test]
2642    fn test_cache_clear() {
2643        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2644            .set_size(4, 4)
2645            .build()
2646            .expect("Invalid cache parameters!");
2647
2648        // Start a wr
2649        let mut wr_txn = arc.write();
2650        // Add a bunch of values, and touch some twice.
2651        wr_txn.insert(10, 10);
2652        wr_txn.insert(11, 11);
2653        wr_txn.insert(12, 12);
2654        wr_txn.insert(13, 13);
2655        wr_txn.insert(14, 14);
2656        wr_txn.insert(15, 15);
2657        wr_txn.insert(16, 16);
2658        wr_txn.insert(17, 17);
2659        wr_txn.commit();
2660        // Begin a new write.
2661        let mut wr_txn = arc.write();
2662
2663        // Touch two values that are in the rec set.
2664        let rec_set: Vec<usize> = wr_txn.iter_rec().take(2).copied().collect();
2665        println!("{:?}", rec_set);
2666        assert!(wr_txn.get(&rec_set[0]) == Some(&rec_set[0]));
2667        assert!(wr_txn.get(&rec_set[1]) == Some(&rec_set[1]));
2668
2669        // commit wr
2670        wr_txn.commit();
2671        // Begin a new write.
2672        let mut wr_txn = arc.write();
2673        println!("stat -> {:?}", wr_txn.peek_stat());
2674        assert!(
2675            CStat {
2676                max: 4,
2677                cache: 8,
2678                tlocal: 0,
2679                freq: 2,
2680                rec: 2,
2681                ghost_freq: 0,
2682                ghost_rec: 4,
2683                haunted: 0,
2684                p: 0
2685            } == wr_txn.peek_stat()
2686        );
2687
2688        // Clear
2689        wr_txn.clear();
2690        // Now commit
2691        wr_txn.commit();
2692        // Now check their states.
2693        let wr_txn = arc.write();
2694        // See what stats did
2695        println!("stat -> {:?}", wr_txn.peek_stat());
2696        // stat -> CStat { max: 4, cache: 8, tlocal: 0, freq: 0, rec: 0, ghost_freq: 2, ghost_rec: 6, haunted: 0, p: 0 }
2697        assert!(
2698            CStat {
2699                max: 4,
2700                cache: 8,
2701                tlocal: 0,
2702                freq: 0,
2703                rec: 0,
2704                ghost_freq: 2,
2705                ghost_rec: 6,
2706                haunted: 0,
2707                p: 0
2708            } == wr_txn.peek_stat()
2709        );
2710        // let stats = arc.view_stats();
2711        // println!("{:?}", *stats);
2712    }
2713
2714    #[test]
2715    fn test_cache_clear_rollback() {
2716        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2717            .set_size(4, 4)
2718            .build()
2719            .expect("Invalid cache parameters!");
2720
2721        // Start a wr
2722        let mut wr_txn = arc.write();
2723        // Add a bunch of values, and touch some twice.
2724        wr_txn.insert(10, 10);
2725        wr_txn.insert(11, 11);
2726        wr_txn.insert(12, 12);
2727        wr_txn.insert(13, 13);
2728        wr_txn.insert(14, 14);
2729        wr_txn.insert(15, 15);
2730        wr_txn.insert(16, 16);
2731        wr_txn.insert(17, 17);
2732        wr_txn.commit();
2733        // Begin a new write.
2734        let mut wr_txn = arc.write();
2735        let rec_set: Vec<usize> = wr_txn.iter_rec().take(2).copied().collect();
2736        println!("{:?}", rec_set);
2737        let r = wr_txn.get(&rec_set[0]);
2738        println!("{:?}", r);
2739        assert!(r == Some(&rec_set[0]));
2740        assert!(wr_txn.get(&rec_set[1]) == Some(&rec_set[1]));
2741
2742        // commit wr
2743        wr_txn.commit();
2744        // Begin a new write.
2745        let mut wr_txn = arc.write();
2746        println!("stat -> {:?}", wr_txn.peek_stat());
2747        assert!(
2748            CStat {
2749                max: 4,
2750                cache: 8,
2751                tlocal: 0,
2752                freq: 2,
2753                rec: 2,
2754                ghost_freq: 0,
2755                ghost_rec: 4,
2756                haunted: 0,
2757                p: 0
2758            } == wr_txn.peek_stat()
2759        );
2760
2761        // Clear
2762        wr_txn.clear();
2763        // Now abort the clear - should do nothing!
2764        drop(wr_txn);
2765        // Check the states, should not have changed
2766        let wr_txn = arc.write();
2767        println!("stat -> {:?}", wr_txn.peek_stat());
2768        assert!(
2769            CStat {
2770                max: 4,
2771                cache: 8,
2772                tlocal: 0,
2773                freq: 2,
2774                rec: 2,
2775                ghost_freq: 0,
2776                ghost_rec: 4,
2777                haunted: 0,
2778                p: 0
2779            } == wr_txn.peek_stat()
2780        );
2781    }
2782
2783    #[test]
2784    fn test_cache_clear_cursed() {
2785        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2786            .set_size(4, 4)
2787            .build()
2788            .expect("Invalid cache parameters!");
2789        // Setup for the test
2790        // --
2791        let mut wr_txn = arc.write();
2792        wr_txn.insert(10, 1);
2793        wr_txn.commit();
2794        // --
2795        let wr_txn = arc.write();
2796        assert!(wr_txn.peek_cache(&10) == CacheState::Rec);
2797        wr_txn.commit();
2798        // --
2799        // Okay, now the test starts. First, we begin a read
2800        let mut rd_txn = arc.read();
2801        // Then while that read exists, we open a write, and conduct
2802        // a cache clear.
2803        let mut wr_txn = arc.write();
2804        wr_txn.clear();
2805        // Commit the clear write.
2806        wr_txn.commit();
2807
2808        // Now on the read, we perform a touch of an item, and we include
2809        // something that was not yet in the cache.
2810        assert!(rd_txn.get(&10) == Some(&1));
2811        rd_txn.insert(11, 1);
2812        // Complete the read
2813        std::mem::drop(rd_txn);
2814        // Perform a cache quiesce
2815        arc.try_quiesce();
2816        // --
2817
2818        // Assert that the items that we provided were NOT included, and are
2819        // in the correct states.
2820        let wr_txn = arc.write();
2821        assert!(wr_txn.peek_cache(&10) == CacheState::GhostRec);
2822        println!("--> {:?}", wr_txn.peek_cache(&11));
2823        assert!(wr_txn.peek_cache(&11) == CacheState::None);
2824    }
2825
2826    #[test]
2827    fn test_cache_p_weight_zero_churn() {
2828        let arc: ARCache<usize, usize> = ARCacheBuilder::new()
2829            .set_size(4, 4)
2830            .set_watermark(0)
2831            .build()
2832            .expect("Invalid cache parameters!");
2833
2834        let mut wr_txn = arc.write();
2835
2836        // First we need to load up frequent.
2837        wr_txn.insert(1, 1);
2838        wr_txn.insert(2, 2);
2839        wr_txn.insert(3, 3);
2840        wr_txn.insert(4, 4);
2841        assert_eq!(wr_txn.get(&1), Some(&1));
2842        assert_eq!(wr_txn.get(&2), Some(&2));
2843        assert_eq!(wr_txn.get(&3), Some(&3));
2844        assert_eq!(wr_txn.get(&4), Some(&4));
2845
2846        assert_eq!(wr_txn.peek_stat().p, 0);
2847        wr_txn.commit();
2848
2849        // Hitting again in a new txn moves to freq
2850        let mut wr_txn = arc.write();
2851        assert_eq!(wr_txn.get(&1), Some(&1));
2852        assert_eq!(wr_txn.get(&2), Some(&2));
2853        assert_eq!(wr_txn.get(&3), Some(&3));
2854        assert_eq!(wr_txn.get(&4), Some(&4));
2855
2856        assert_eq!(wr_txn.peek_stat().p, 0);
2857        wr_txn.commit();
2858
2859        // Now include new items. The goal is we want to shift p to at least 1.
2860        let mut wr_txn = arc.write();
2861        // Won't fit, moves to ghost recent
2862        wr_txn.insert(100, 100);
2863        println!("b {:?}", wr_txn.peek_stat());
2864        assert_eq!(wr_txn.peek_stat().p, 0);
2865        wr_txn.commit();
2866
2867        // Include again, causes evict in freq.
2868        let mut wr_txn = arc.write();
2869        assert_eq!(wr_txn.peek_stat().p, 0);
2870        assert_eq!(wr_txn.peek_stat().ghost_rec, 1);
2871
2872        // Causes shift in P, stays in recent.
2873        wr_txn.insert(100, 100);
2874        wr_txn.commit();
2875
2876        let wr_txn = arc.write();
2877        println!("c {:?}", wr_txn.peek_stat());
2878        assert_eq!(wr_txn.peek_stat().p, 1);
2879        assert_eq!(wr_txn.peek_stat().ghost_rec, 0);
2880        assert_eq!(wr_txn.peek_stat().ghost_freq, 1);
2881        wr_txn.commit();
2882
2883        // Now, we want to bring back from ghost freq.
2884        let mut wr_txn = arc.write();
2885        assert_eq!(wr_txn.get(&1), None);
2886        // We missed, so re-include.
2887        wr_txn.insert(1, 1);
2888        wr_txn.commit();
2889
2890        let wr_txn = arc.write();
2891        println!("d {:?}", wr_txn.peek_stat());
2892        // Weights P back to 0.
2893        assert_eq!(wr_txn.peek_stat().p, 0);
2894        assert_eq!(wr_txn.peek_stat().ghost_rec, 1);
2895        assert_eq!(wr_txn.peek_stat().ghost_freq, 0);
2896        wr_txn.commit();
2897    }
2898
2899    #[test]
2900    fn test_cache_haunted_bounds() {
2901        let arc: ARCache<usize, usize> = ARCacheBuilder::new()
2902            .set_size(4, 4)
2903            .set_watermark(0)
2904            .set_look_back_limit(4)
2905            .build()
2906            .expect("Invalid cache parameters!");
2907
2908        let mut wr_txn = arc.write();
2909
2910        // We want to test that haunted items are removed after 4 generations.
2911
2912        // Setup a removed key which immediately goes to haunted.
2913        wr_txn.remove(1);
2914        wr_txn.commit();
2915
2916        // Now write and commit 4 times to advance the txid.
2917        for _i in 0..5 {
2918            let wr_txn = arc.write();
2919            println!("l {:?}", wr_txn.peek_stat());
2920            assert_eq!(wr_txn.peek_stat().haunted, 1);
2921            assert!(wr_txn.peek_cache(&1) == CacheState::Haunted);
2922            wr_txn.commit();
2923        }
2924
2925        // Now it's removed.
2926        let wr_txn = arc.write();
2927        println!("d {:?}", wr_txn.peek_stat());
2928        assert_eq!(wr_txn.peek_stat().haunted, 0);
2929        assert_eq!(wr_txn.peek_cache(&1), CacheState::None);
2930        wr_txn.commit();
2931    }
2932
2933    #[test]
2934    fn test_cache_dirty_write() {
2935        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2936            .set_size(4, 4)
2937            .build()
2938            .expect("Invalid cache parameters!");
2939        let mut wr_txn = arc.write();
2940        wr_txn.insert_dirty(10, 1);
2941        wr_txn.iter_mut_mark_clean().for_each(|(_k, _v)| {});
2942        wr_txn.commit();
2943    }
2944
2945    #[test]
2946    fn test_cache_read_no_tlocal() {
2947        // Check a cache with no read local thread capacity
2948        // Setup the cache.
2949        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2950            .set_size(4, 0)
2951            .build()
2952            .expect("Invalid cache parameters!");
2953        // start a rd
2954        {
2955            let mut rd_txn = arc.read();
2956            // add items to the rd
2957            rd_txn.insert(1, 1);
2958            rd_txn.insert(2, 2);
2959            rd_txn.insert(3, 3);
2960            rd_txn.insert(4, 4);
2961            // end the rd
2962            // Everything should be missing frm the tlocal.
2963            assert!(rd_txn.get(&1).is_none());
2964            assert!(rd_txn.get(&2).is_none());
2965            assert!(rd_txn.get(&3).is_none());
2966            assert!(rd_txn.get(&4).is_none());
2967        }
2968        arc.try_quiesce();
2969        // What state is the cache now in?
2970        println!("== 2");
2971        let wr_txn = arc.write();
2972        assert!(
2973            CStat {
2974                max: 4,
2975                cache: 4,
2976                tlocal: 0,
2977                freq: 0,
2978                rec: 4,
2979                ghost_freq: 0,
2980                ghost_rec: 0,
2981                haunted: 0,
2982                p: 0
2983            } == wr_txn.peek_stat()
2984        );
2985        assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
2986        assert!(wr_txn.peek_cache(&2) == CacheState::Rec);
2987        assert!(wr_txn.peek_cache(&3) == CacheState::Rec);
2988        assert!(wr_txn.peek_cache(&4) == CacheState::Rec);
2989        // let stats = arc.view_stats();
2990        // println!("stats 1: {:?}", *stats);
2991        // assert!(stats.reader_includes == 4);
2992        // assert!(stats.reader_tlocal_includes == 0);
2993        // assert!(stats.reader_tlocal_hits == 0);
2994    }
2995
2996    #[derive(Clone, Debug)]
2997    struct Weighted {
2998        _i: u64,
2999    }
3000
3001    #[test]
3002    fn test_cache_weighted() {
3003        let arc: ARCache<usize, Weighted> = ARCacheBuilder::default()
3004            .set_size(4, 0)
3005            .build()
3006            .expect("Invalid cache parameters!");
3007        let mut wr_txn = arc.write();
3008
3009        assert!(
3010            CStat {
3011                max: 4,
3012                cache: 0,
3013                tlocal: 0,
3014                freq: 0,
3015                rec: 0,
3016                ghost_freq: 0,
3017                ghost_rec: 0,
3018                haunted: 0,
3019                p: 0
3020            } == wr_txn.peek_stat()
3021        );
3022
3023        // In the first txn we insert 2 weight 2 items.
3024        wr_txn.insert_sized(1, Weighted { _i: 1 }, NonZeroUsize::new(2).unwrap());
3025        wr_txn.insert_sized(2, Weighted { _i: 2 }, NonZeroUsize::new(2).unwrap());
3026
3027        assert!(
3028            CStat {
3029                max: 4,
3030                cache: 0,
3031                tlocal: 2,
3032                freq: 0,
3033                rec: 0,
3034                ghost_freq: 0,
3035                ghost_rec: 0,
3036                haunted: 0,
3037                p: 0
3038            } == wr_txn.peek_stat()
3039        );
3040        wr_txn.commit();
3041
3042        // Now once committed, the proper sizes kick in.
3043
3044        let wr_txn = arc.write();
3045        assert!(
3046            CStat {
3047                max: 4,
3048                cache: 2,
3049                tlocal: 0,
3050                freq: 0,
3051                rec: 4,
3052                ghost_freq: 0,
3053                ghost_rec: 0,
3054                haunted: 0,
3055                p: 0
3056            } == wr_txn.peek_stat()
3057        );
3058        wr_txn.commit();
3059
3060        // Check the numbers move properly.
3061        let mut wr_txn = arc.write();
3062        wr_txn.get(&1);
3063        wr_txn.commit();
3064
3065        let mut wr_txn = arc.write();
3066        assert!(
3067            CStat {
3068                max: 4,
3069                cache: 2,
3070                tlocal: 0,
3071                freq: 2,
3072                rec: 2,
3073                ghost_freq: 0,
3074                ghost_rec: 0,
3075                haunted: 0,
3076                p: 0
3077            } == wr_txn.peek_stat()
3078        );
3079
3080        wr_txn.insert_sized(3, Weighted { _i: 3 }, NonZeroUsize::new(2).unwrap());
3081        wr_txn.insert_sized(4, Weighted { _i: 4 }, NonZeroUsize::new(2).unwrap());
3082        wr_txn.commit();
3083
3084        // Check the evicts
3085        let wr_txn = arc.write();
3086        assert!(
3087            CStat {
3088                max: 4,
3089                cache: 4,
3090                tlocal: 0,
3091                freq: 2,
3092                rec: 2,
3093                ghost_freq: 0,
3094                ghost_rec: 4,
3095                haunted: 0,
3096                p: 0
3097            } == wr_txn.peek_stat()
3098        );
3099        wr_txn.commit();
3100    }
3101
3102    #[test]
3103    fn test_cache_stats_reload() {
3104        let _ = tracing_subscriber::fmt::try_init();
3105
3106        // Make a cache
3107        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
3108            .set_size(4, 0)
3109            .build()
3110            .expect("Invalid cache parameters!");
3111
3112        let stats = WriteCountStat::default();
3113
3114        let mut wr_txn = arc.write_stats(stats);
3115        wr_txn.insert(1, 1);
3116        let stats = wr_txn.commit();
3117
3118        tracing::trace!("stats 1: {:?}", stats);
3119    }
3120
3121    #[test]
3122    fn test_cache_mut_inplace() {
3123        // Make a cache
3124        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
3125            .set_size(4, 0)
3126            .build()
3127            .expect("Invalid cache parameters!");
3128        let mut wr_txn = arc.write();
3129
3130        assert!(wr_txn.get_mut(&1, false).is_none());
3131        // It was inserted, can mutate. This is the tlocal present state.
3132        wr_txn.insert(1, 1);
3133        {
3134            let mref = wr_txn.get_mut(&1, false).unwrap();
3135            *mref = 2;
3136        }
3137        assert!(wr_txn.get_mut(&1, false) == Some(&mut 2));
3138        wr_txn.commit();
3139
3140        // It's in the main cache, can mutate immediately and the tlocal is primed.
3141        let mut wr_txn = arc.write();
3142        {
3143            let mref = wr_txn.get_mut(&1, false).unwrap();
3144            *mref = 3;
3145        }
3146        assert!(wr_txn.get_mut(&1, false) == Some(&mut 3));
3147        wr_txn.commit();
3148
3149        // Marked for remove, can not mut.
3150        let mut wr_txn = arc.write();
3151        wr_txn.remove(1);
3152        assert!(wr_txn.get_mut(&1, false).is_none());
3153        wr_txn.commit();
3154    }
3155
3156    #[allow(dead_code)]
3157
3158    pub static RUNNING: AtomicBool = AtomicBool::new(false);
3159
3160    #[cfg(test)]
3161    fn multi_thread_worker(arc: Arc<ARCache<Box<u32>, Box<u32>>>) {
3162        while RUNNING.load(Ordering::Relaxed) {
3163            let mut rd_txn = arc.read();
3164
3165            for _i in 0..128 {
3166                let x = rand::random::<u32>();
3167
3168                if rd_txn.get(&x).is_none() {
3169                    rd_txn.insert(Box::new(x), Box::new(x))
3170                }
3171            }
3172        }
3173    }
3174
3175    #[allow(dead_code)]
3176    #[cfg_attr(miri, ignore)]
3177    #[cfg_attr(feature = "dhat-heap", test)]
3178    #[cfg(test)]
3179    fn test_cache_stress_1() {
3180        #[cfg(feature = "dhat-heap")]
3181        let _profiler = dhat::Profiler::builder().trim_backtraces(None).build();
3182
3183        let arc: Arc<ARCache<Box<u32>, Box<u32>>> = Arc::new(
3184            ARCacheBuilder::default()
3185                .set_size(64, 4)
3186                .build()
3187                .expect("Invalid cache parameters!"),
3188        );
3189
3190        let thread_count = 4;
3191
3192        RUNNING.store(true, Ordering::Relaxed);
3193
3194        let handles: Vec<_> = (0..thread_count)
3195            .into_iter()
3196            .map(|_| {
3197                // Build the threads.
3198                let cache = arc.clone();
3199                thread::spawn(move || multi_thread_worker(cache))
3200            })
3201            .collect();
3202
3203        for x in 0..1024 {
3204            let mut wr_txn = arc.write();
3205
3206            if wr_txn.get(&x).is_none() {
3207                wr_txn.insert(Box::new(x), Box::new(x))
3208            }
3209
3210            wr_txn.commit();
3211        }
3212
3213        RUNNING.store(false, Ordering::Relaxed);
3214
3215        for handle in handles {
3216            handle.join().unwrap();
3217        }
3218
3219        drop(arc);
3220    }
3221
3222    #[test]
3223    fn test_set_expected_workload_negative() {
3224        let _arc: Arc<ARCache<Box<u32>, Box<u32>>> = Arc::new(
3225            ARCacheBuilder::default()
3226                .set_expected_workload(256, 31, 8, 16, true)
3227                .build()
3228                .expect("Invalid cache parameters!"),
3229        );
3230    }
3231}