Skip to main content

concread/arcache/
mod.rs

1//! ARCache - A concurrently readable adaptive replacement cache.
2//!
3//! An ARCache is used in place of a `RwLock<LruCache>` or `Mutex<LruCache>`.
4//! This structure is transactional, meaning that readers have guaranteed
5//! point-in-time views of the cache and their items, while allowing writers
6//! to proceed with inclusions and cache state management in parallel.
7//!
8//! This means that unlike a `RwLock` which can have many readers OR one writer
9//! this cache is capable of many readers, over multiple data generations AND
10//! writers that are serialised. This formally means that this is an ACID
11//! compliant Cache.
12
13mod ll;
14/// Stats collection for [ARCache]
15pub mod stats;
16
17use self::ll::{LLNodeRef, LLWeight, LL};
18use self::stats::{ARCacheReadStat, ARCacheWriteStat};
19
20#[cfg(feature = "arcache-is-hashmap")]
21use crate::hashmap::{
22    HashMap as DataMap, HashMapReadTxn as DataMapReadTxn, HashMapWriteTxn as DataMapWriteTxn,
23};
24
25#[cfg(all(feature = "arcache-is-hashtrie", not(feature = "arcache-is-hashmap")))]
26use crate::hashtrie::{
27    HashTrie as DataMap, HashTrieReadTxn as DataMapReadTxn, HashTrieWriteTxn as DataMapWriteTxn,
28};
29
30use crossbeam_queue::ArrayQueue;
31use std::collections::HashMap as Map;
32use std::sync::atomic::{AtomicBool, Ordering};
33use std::sync::Arc;
34use std::sync::{Mutex, RwLock};
35
36use std::borrow::Borrow;
37use std::cell::UnsafeCell;
38use std::fmt::Debug;
39use std::hash::Hash;
40use std::mem;
41use std::num::NonZeroUsize;
42use std::ops::Deref;
43use std::ops::DerefMut;
44use std::time::Instant;
45
46use tracing::trace;
47
48const READ_THREAD_CACHE_RATIO: isize = 8;
49const WRITE_THREAD_CACHE_RATIO: isize = 4;
50
51const WRITE_THREAD_CHANNEL_SIZE: usize = 64;
52const READ_THREAD_CHANNEL_SIZE: usize = 64;
53
54const TXN_LOOKBACK_LIMIT_DEFAULT: u8 = 32;
55const TXN_LOOKBACK_LIMIT_ABS_MIN: u8 = 4;
56
57const WATERMARK_DISABLE_MIN: usize = 128;
58
59const WATERMARK_DISABLE_DIVISOR: usize = 20;
60const WATERMARK_DISABLE_RATIO: usize = 18;
61
62const HAUNTED_SIZE: usize = 1;
63
64enum ThreadCacheItem<V> {
65    Present(V, bool, usize),
66    Removed(bool),
67}
68
69struct CacheHitEvent {
70    t: Instant,
71    k_hash: u64,
72}
73
74struct CacheIncludeEvent<K, V> {
75    t: Instant,
76    k: K,
77    v: V,
78    txid: u64,
79    size: usize,
80}
81
82#[derive(Hash, Ord, PartialOrd, Eq, PartialEq, Clone, Debug)]
83struct CacheItemInner<K>
84where
85    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
86{
87    k: K,
88    txid: u64,
89    size: usize,
90}
91
92impl<K> LLWeight for CacheItemInner<K>
93where
94    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
95{
96    #[inline]
97    fn ll_weight(&self) -> usize {
98        self.size
99    }
100}
101
102#[derive(Clone, Debug)]
103enum CacheItem<K, V>
104where
105    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
106{
107    Freq(LLNodeRef<CacheItemInner<K>>, V),
108    Rec(LLNodeRef<CacheItemInner<K>>, V),
109    GhostFreq(LLNodeRef<CacheItemInner<K>>),
110    GhostRec(LLNodeRef<CacheItemInner<K>>),
111    Haunted(LLNodeRef<CacheItemInner<K>>),
112}
113
114unsafe impl<
115        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
116        V: Clone + Debug + Sync + Send + 'static,
117    > Send for CacheItem<K, V>
118{
119}
120unsafe impl<
121        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
122        V: Clone + Debug + Sync + Send + 'static,
123    > Sync for CacheItem<K, V>
124{
125}
126
127#[cfg(test)]
128#[derive(Clone, Debug, PartialEq)]
129pub(crate) enum CacheState {
130    Freq,
131    Rec,
132    GhostFreq,
133    GhostRec,
134    Haunted,
135    None,
136}
137
138#[cfg(test)]
139#[derive(Debug, PartialEq)]
140pub(crate) struct CStat {
141    max: usize,
142    cache: usize,
143    tlocal: usize,
144    freq: usize,
145    rec: usize,
146    ghost_freq: usize,
147    ghost_rec: usize,
148    haunted: usize,
149    p: usize,
150}
151
152struct ArcInner<K, V>
153where
154    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
155    V: Clone + Debug + Sync + Send + 'static,
156{
157    /// Weight of items between the two caches.
158    p: usize,
159    freq: LL<CacheItemInner<K>>,
160    rec: LL<CacheItemInner<K>>,
161    ghost_freq: LL<CacheItemInner<K>>,
162    ghost_rec: LL<CacheItemInner<K>>,
163    haunted: LL<CacheItemInner<K>>,
164    hit_queue: Arc<ArrayQueue<CacheHitEvent>>,
165    inc_queue: Arc<ArrayQueue<CacheIncludeEvent<K, V>>>,
166    min_txid: u64,
167}
168
169struct ArcShared<K, V>
170where
171    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
172    V: Clone + Debug + Sync + Send + 'static,
173{
174    // Max number of elements to cache.
175    max: usize,
176    // Max number of elements for a reader per thread.
177    read_max: usize,
178    hit_queue: Arc<ArrayQueue<CacheHitEvent>>,
179    inc_queue: Arc<ArrayQueue<CacheIncludeEvent<K, V>>>,
180    /// The number of items that are present in the cache before we start to process
181    /// the arc sets/lists.
182    watermark: usize,
183    /// If readers should attempt to quiesce the cache. Default true
184    reader_quiesce: bool,
185}
186
187/// A concurrently readable adaptive replacement cache. Operations are performed on the
188/// cache via read and write operations.
189pub struct ARCache<K, V>
190where
191    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
192    V: Clone + Debug + Sync + Send + 'static,
193{
194    // Use a unified tree, allows simpler movement of items between the
195    // cache types.
196    cache: DataMap<K, CacheItem<K, V>>,
197    // This is normally only ever taken in "read" mode, so it's effectively
198    // an uncontended barrier.
199    shared: RwLock<ArcShared<K, V>>,
200    // These are only taken during a quiesce
201    inner: Mutex<ArcInner<K, V>>,
202    // stats: CowCell<CacheStats>,
203    above_watermark: AtomicBool,
204    look_back_limit: u64,
205}
206
207unsafe impl<
208        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
209        V: Clone + Debug + Sync + Send + 'static,
210    > Send for ARCache<K, V>
211{
212}
213unsafe impl<
214        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
215        V: Clone + Debug + Sync + Send + 'static,
216    > Sync for ARCache<K, V>
217{
218}
219
220#[derive(Debug, Clone)]
221struct ReadCacheItem<K, V>
222where
223    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
224    V: Clone + Debug + Sync + Send + 'static,
225{
226    k: K,
227    v: V,
228    size: usize,
229}
230
231impl<K, V> LLWeight for ReadCacheItem<K, V>
232where
233    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
234    V: Clone + Debug + Sync + Send + 'static,
235{
236    #[inline]
237    fn ll_weight(&self) -> usize {
238        self.size
239    }
240}
241
242struct ReadCache<K, V>
243where
244    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
245    V: Clone + Debug + Sync + Send + 'static,
246{
247    // cache of our missed items to send forward.
248    // On drop we drain this to the channel
249    set: Map<K, LLNodeRef<ReadCacheItem<K, V>>>,
250    read_size: usize,
251    tlru: LL<ReadCacheItem<K, V>>,
252}
253
254/// An active read transaction over the cache. The data is this cache is guaranteed to be
255/// valid at the point in time the read is created. You may include items during a cache
256/// miss via the "insert" function.
257pub struct ARCacheReadTxn<'a, K, V, S>
258where
259    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
260    V: Clone + Debug + Sync + Send + 'static,
261    S: ARCacheReadStat + Clone,
262{
263    caller: &'a ARCache<K, V>,
264    // ro_txn to cache
265    cache: DataMapReadTxn<'a, K, CacheItem<K, V>>,
266    tlocal: Option<ReadCache<K, V>>,
267    hit_queue: Arc<ArrayQueue<CacheHitEvent>>,
268    inc_queue: Arc<ArrayQueue<CacheIncludeEvent<K, V>>>,
269    above_watermark: bool,
270    reader_quiesce: bool,
271    stats: S,
272}
273
274unsafe impl<
275        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
276        V: Clone + Debug + Sync + Send + 'static,
277        S: ARCacheReadStat + Clone + Sync + Send + 'static,
278    > Send for ARCacheReadTxn<'_, K, V, S>
279{
280}
281unsafe impl<
282        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
283        V: Clone + Debug + Sync + Send + 'static,
284        S: ARCacheReadStat + Clone + Sync + Send + 'static,
285    > Sync for ARCacheReadTxn<'_, K, V, S>
286{
287}
288
289/// An active write transaction over the cache. The data in this cache is isolated
290/// from readers, and may be rolled-back if an error occurs. Changes only become
291/// globally visible once you call "commit". Items may be added to the cache on
292/// a miss via "insert", and you can explicitly remove items by calling "remove".
293pub struct ARCacheWriteTxn<'a, K, V, S>
294where
295    K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
296    V: Clone + Debug + Sync + Send + 'static,
297    S: ARCacheWriteStat<K>,
298{
299    caller: &'a ARCache<K, V>,
300    // wr_txn to cache
301    cache: DataMapWriteTxn<'a, K, CacheItem<K, V>>,
302    // Cache of missed items (w_ dirty/clean)
303    // On COMMIT we drain this to the main cache
304    tlocal: Map<K, ThreadCacheItem<V>>,
305    hit: UnsafeCell<Vec<u64>>,
306    clear: UnsafeCell<bool>,
307    above_watermark: bool,
308    // read_ops: UnsafeCell<u32>,
309    stats: S,
310}
311
312impl<
313        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
314        V: Clone + Debug + Sync + Send + 'static,
315    > CacheItem<K, V>
316{
317    fn to_vref(&self) -> Option<&V> {
318        match &self {
319            CacheItem::Freq(_, v) | CacheItem::Rec(_, v) => Some(v),
320            _ => None,
321        }
322    }
323
324    fn to_kvsref(&self) -> Option<(&K, &V, usize)> {
325        match &self {
326            CacheItem::Freq(lln, v) | CacheItem::Rec(lln, v) => {
327                let cii = lln.as_ref();
328                Some((&cii.k, v, cii.size))
329            }
330            _ => None,
331        }
332    }
333
334    #[cfg(test)]
335    fn to_state(&self) -> CacheState {
336        match &self {
337            CacheItem::Freq(_, _v) => CacheState::Freq,
338            CacheItem::Rec(_, _v) => CacheState::Rec,
339            CacheItem::GhostFreq(_) => CacheState::GhostFreq,
340            CacheItem::GhostRec(_) => CacheState::GhostRec,
341            CacheItem::Haunted(_) => CacheState::Haunted,
342        }
343    }
344}
345
346/// A configurable builder to create new concurrent Adaptive Replacement Caches.
347pub struct ARCacheBuilder {
348    max: Option<usize>,
349    read_max: Option<usize>,
350    watermark: Option<usize>,
351    reader_quiesce: bool,
352    look_back_limit: Option<u8>,
353}
354
355impl Default for ARCacheBuilder {
356    fn default() -> Self {
357        ARCacheBuilder {
358            max: None,
359            read_max: None,
360            watermark: None,
361            reader_quiesce: true,
362            look_back_limit: None,
363        }
364    }
365}
366
367impl ARCacheBuilder {
368    /// Create a new ARCache builder that you can configure before creation.
369    pub fn new() -> Self {
370        Self::default()
371    }
372
373    /// Configure a new ARCache, that derives its size based on your expected workload.
374    ///
375    /// The values are:
376    /// * Total number of items you want to have in memory (soft upper bound)
377    /// * Number of reading threads you expect concurrently (excluding write thread)
378    /// * Average expected number of cache misses per read transaction
379    /// * Average expected number of writes and/or misses per write transaction
380    ///
381    /// The cache may still exceed your provided total, and inaccurate tuning numbers
382    /// will yield a situation where you may use too-little ram, or too much. This could
383    /// be to your read misses exceeding your expected amount causing the queues to have
384    /// more items in them at a time, or your writes are larger than expected.
385    ///
386    /// If you set ex_ro_miss to zero, no read thread local cache will be configured, but
387    /// space will still be reserved for channel communication.
388    #[must_use]
389    pub fn set_expected_workload(
390        self,
391        total: usize,
392        threads: usize,
393        ex_ro_miss: usize,
394        ex_rw_miss: usize,
395        read_cache: bool,
396    ) -> Self {
397        let total = isize::try_from(total).unwrap();
398        let threads = isize::try_from(threads).unwrap();
399        let ro_miss = isize::try_from(ex_ro_miss).unwrap();
400        let wr_miss = isize::try_from(ex_rw_miss).unwrap();
401
402        // We need to clamp the expected read-miss so that the calculation doesn't end up
403        // skewing the cache to all "read cache" allocation.
404        //
405        // We clamp the read-max to an 8th of total div thread. This is because 1/8th read
406        // cache is a pretty standard ratio for a shared to per thread cache.
407        let read_max = if read_cache {
408            let read_max_limit = total / READ_THREAD_CACHE_RATIO;
409            let read_max_thread_limit = read_max_limit / threads;
410            ro_miss.clamp(0, read_max_thread_limit)
411        } else {
412            // No read cache requested
413            0
414        };
415
416        // We have to clamp rw_miss, even though we could go over-size - this is because
417        // we need to ensure that rw_miss is always < total.
418        let wr_miss_thread_limit = total / WRITE_THREAD_CACHE_RATIO;
419        let wr_miss = wr_miss.clamp(0, wr_miss_thread_limit);
420
421        let max = total - (wr_miss + (read_max * threads));
422
423        // Now make them usize.
424        let max = usize::try_from(max).unwrap();
425        let read_max = usize::try_from(read_max).unwrap();
426
427        ARCacheBuilder {
428            // stats: self.stats,
429            max: Some(max),
430            read_max: Some(read_max),
431            watermark: self.watermark,
432            reader_quiesce: self.reader_quiesce,
433            look_back_limit: self.look_back_limit,
434        }
435    }
436
437    /// Configure a new ARCache, with a capacity of `max` main cache items and `read_max`
438    /// Note that due to the way the cache operates, the number of items can and
439    /// will exceed `max` on a regular basis, so you should consider using `set_expected_workload`
440    /// and specifying your expected workload parameters to have a better derived
441    /// cache size.
442    #[must_use]
443    pub fn set_size(mut self, max: usize, read_max: usize) -> Self {
444        self.max = Some(max);
445        self.read_max = Some(read_max);
446        self
447    }
448
449    /// See [ARCache::new_size] for more information. This allows manual configuration of the data
450    /// tracking watermark. To disable this, set to 0. If watermark is greater than
451    /// max, it will be clamped to max.
452    #[must_use]
453    pub fn set_watermark(mut self, watermark: usize) -> Self {
454        self.watermark = Some(watermark);
455        self
456    }
457
458    /// Set the look back limit for ghost lists. This is a balance between the cache's ability
459    /// to have "perfect information" about all past keys to tune the p weight, and memory
460    /// consumption of storing the ghost lists. If your dataset consists of small keys/large
461    /// values you should not change this value from it's default. If your dataset contains
462    /// equally sized keys/values, then you may wish to lower this value. The lowest value is
463    /// `4`, defaults to `32`.
464    #[must_use]
465    pub fn set_look_back_limit(mut self, look_back_limit: u8) -> Self {
466        self.look_back_limit = Some(look_back_limit);
467        self
468    }
469
470    /// Enable or Disable reader cache quiescing. In some cases this can improve
471    /// reader performance, at the expense that cache includes or hits may be delayed
472    /// before acknowledgement. You must MANUALLY run periodic quiesces if you mark
473    /// this as "false" to disable reader quiescing.
474    #[must_use]
475    pub fn set_reader_quiesce(mut self, reader_quiesce: bool) -> Self {
476        self.reader_quiesce = reader_quiesce;
477        self
478    }
479
480    /// Consume this builder, returning a cache if successful. If configured parameters are
481    /// missing or incorrect, a None will be returned.
482    pub fn build<K, V>(self) -> Option<ARCache<K, V>>
483    where
484        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
485        V: Clone + Debug + Sync + Send + 'static,
486    {
487        let ARCacheBuilder {
488            // stats,
489            max,
490            read_max,
491            watermark,
492            reader_quiesce,
493            look_back_limit,
494        } = self;
495
496        let (max, read_max) = max.zip(read_max)?;
497
498        let watermark = watermark.unwrap_or(if max < WATERMARK_DISABLE_MIN {
499            0
500        } else {
501            (max / WATERMARK_DISABLE_DIVISOR) * WATERMARK_DISABLE_RATIO
502        });
503        let watermark = watermark.clamp(0, max);
504        // If the watermark is 0, always track from the start.
505        let init_watermark = watermark == 0;
506
507        let look_back_limit = look_back_limit
508            .unwrap_or(TXN_LOOKBACK_LIMIT_DEFAULT)
509            .clamp(TXN_LOOKBACK_LIMIT_ABS_MIN, u8::MAX) as u64;
510
511        // The hit queue is reasonably cheap, so we can let this grow a bit.
512        /*
513        let chan_size = max / 20;
514        let chan_size = if chan_size < 16 { 16 } else { chan_size };
515        let chan_size = chan_size.clamp(0, 128);
516        */
517        let chan_size = WRITE_THREAD_CHANNEL_SIZE;
518        let hit_queue = Arc::new(ArrayQueue::new(chan_size));
519
520        // this can oversize and take a lot of time to drain and manage, so we keep this bounded.
521        // let chan_size = chan_size.clamp(0, 64);
522        let chan_size = READ_THREAD_CHANNEL_SIZE;
523        let inc_queue = Arc::new(ArrayQueue::new(chan_size));
524
525        let shared = RwLock::new(ArcShared {
526            max,
527            read_max,
528            // stat_tx,
529            hit_queue: hit_queue.clone(),
530            inc_queue: inc_queue.clone(),
531            watermark,
532            reader_quiesce,
533        });
534        let inner = Mutex::new(ArcInner {
535            // We use p from the former stats.
536            p: 0,
537            freq: LL::new(),
538            rec: LL::new(),
539            ghost_freq: LL::new(),
540            ghost_rec: LL::new(),
541            haunted: LL::new(),
542            // stat_rx,
543            hit_queue,
544            inc_queue,
545            min_txid: 0,
546        });
547
548        Some(ARCache {
549            cache: DataMap::new(),
550            shared,
551            inner,
552            // stats: CowCell::new(stats),
553            above_watermark: AtomicBool::new(init_watermark),
554            look_back_limit,
555        })
556    }
557}
558
559impl<
560        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
561        V: Clone + Debug + Sync + Send + 'static,
562    > ARCache<K, V>
563{
564    /// Use ARCacheBuilder instead
565    #[deprecated(since = "0.2.20", note = "please use`ARCacheBuilder` instead")]
566    pub fn new(
567        total: usize,
568        threads: usize,
569        ex_ro_miss: usize,
570        ex_rw_miss: usize,
571        read_cache: bool,
572    ) -> Self {
573        ARCacheBuilder::default()
574            .set_expected_workload(total, threads, ex_ro_miss, ex_rw_miss, read_cache)
575            .build()
576            .expect("Invalid cache parameters!")
577    }
578
579    /// Use ARCacheBuilder instead
580    #[deprecated(since = "0.2.20", note = "please use`ARCacheBuilder` instead")]
581    pub fn new_size(max: usize, read_max: usize) -> Self {
582        ARCacheBuilder::default()
583            .set_size(max, read_max)
584            .build()
585            .expect("Invalid cache parameters!")
586    }
587
588    /// Use ARCacheBuilder instead
589    #[deprecated(since = "0.2.20", note = "please use`ARCacheBuilder` instead")]
590    pub fn new_size_watermark(max: usize, read_max: usize, watermark: usize) -> Self {
591        ARCacheBuilder::default()
592            .set_size(max, read_max)
593            .set_watermark(watermark)
594            .build()
595            .expect("Invalid cache parameters!")
596    }
597
598    /// Begin a read operation on the cache. This reader has a thread-local cache for items
599    /// that are localled included via `insert`, and can communicate back to the main cache
600    /// to safely include items.
601    pub fn read_stats<S>(&self, stats: S) -> ARCacheReadTxn<'_, K, V, S>
602    where
603        S: ARCacheReadStat + Clone,
604    {
605        let rshared = self.shared.read().unwrap();
606        let tlocal = if rshared.read_max > 0 {
607            Some(ReadCache {
608                set: Map::new(),
609                read_size: rshared.read_max,
610                tlru: LL::new(),
611            })
612        } else {
613            None
614        };
615        let above_watermark = self.above_watermark.load(Ordering::Relaxed);
616        ARCacheReadTxn {
617            caller: self,
618            cache: self.cache.read(),
619            tlocal,
620            // stat_tx: rshared.stat_tx.clone(),
621            hit_queue: rshared.hit_queue.clone(),
622            inc_queue: rshared.inc_queue.clone(),
623            above_watermark,
624            reader_quiesce: rshared.reader_quiesce,
625            stats,
626        }
627    }
628
629    /// Begin a read operation on the cache. This reader has a thread-local cache for items
630    /// that are localled included via `insert`, and can communicate back to the main cache
631    /// to safely include items.
632    pub fn read(&self) -> ARCacheReadTxn<'_, K, V, ()> {
633        self.read_stats(())
634    }
635
636    /// Begin a write operation on the cache. This writer has a thread-local store
637    /// for all items that have been included or dirtied in the transactions, items
638    /// may be removed from this cache (ie deleted, invalidated).
639    pub fn write(&self) -> ARCacheWriteTxn<'_, K, V, ()> {
640        self.write_stats(())
641    }
642
643    /// _
644    pub fn write_stats<S>(&self, stats: S) -> ARCacheWriteTxn<'_, K, V, S>
645    where
646        S: ARCacheWriteStat<K>,
647    {
648        let above_watermark = self.above_watermark.load(Ordering::Relaxed);
649        ARCacheWriteTxn {
650            caller: self,
651            cache: self.cache.write(),
652            tlocal: Map::new(),
653            hit: UnsafeCell::new(Vec::new()),
654            clear: UnsafeCell::new(false),
655            above_watermark,
656            // read_ops: UnsafeCell::new(0),
657            stats,
658        }
659    }
660
661    fn try_write_stats<S>(&self, stats: S) -> Result<ARCacheWriteTxn<'_, K, V, S>, S>
662    where
663        S: ARCacheWriteStat<K>,
664    {
665        match self.cache.try_write() {
666            Some(cache) => {
667                let above_watermark = self.above_watermark.load(Ordering::Relaxed);
668                Ok(ARCacheWriteTxn {
669                    caller: self,
670                    cache,
671                    tlocal: Map::new(),
672                    hit: UnsafeCell::new(Vec::new()),
673                    clear: UnsafeCell::new(false),
674                    above_watermark,
675                    // read_ops: UnsafeCell::new(0),
676                    stats,
677                })
678            }
679            None => Err(stats),
680        }
681    }
682
683    /// If the lock is available, attempt to quiesce the cache's async channel states. If the lock
684    /// is currently held, no action is taken.
685    pub fn try_quiesce_stats<S>(&self, stats: S) -> S
686    where
687        S: ARCacheWriteStat<K>,
688    {
689        // It seems like a good idea to skip this when not at wmark, but
690        // that can cause low-pressure caches to no submit includes properly.
691        // if self.above_watermark.load(Ordering::Relaxed) {
692        match self.try_write_stats(stats) {
693            Ok(wr_txn) => wr_txn.commit(),
694            Err(stats) => stats,
695        }
696    }
697
698    /// If the lock is available, attempt to quiesce the cache's async channel states. If the lock
699    /// is currently held, no action is taken.
700    pub fn try_quiesce(&self) {
701        self.try_quiesce_stats(())
702    }
703
704    fn calc_p_freq(ghost_rec_len: usize, ghost_freq_len: usize, p: &mut usize, size: usize) {
705        let delta = if ghost_rec_len > ghost_freq_len {
706            ghost_rec_len / ghost_freq_len
707        } else {
708            1
709        } * size;
710        let p_was = *p;
711        if delta < *p {
712            *p -= delta
713        } else {
714            *p = 0
715        }
716        tracing::trace!("f {} >>> {}", p_was, *p);
717    }
718
719    fn calc_p_rec(
720        cap: usize,
721        ghost_rec_len: usize,
722        ghost_freq_len: usize,
723        p: &mut usize,
724        size: usize,
725    ) {
726        let delta = if ghost_freq_len > ghost_rec_len {
727            ghost_freq_len / ghost_rec_len
728        } else {
729            1
730        } * size;
731        let p_was = *p;
732        if delta <= cap - *p {
733            *p += delta
734        } else {
735            *p = cap
736        }
737        tracing::trace!("r {} >>> {}", p_was, *p);
738    }
739
740    fn drain_tlocal_inc<S>(
741        &self,
742        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
743        inner: &mut ArcInner<K, V>,
744        shared: &ArcShared<K, V>,
745        tlocal: Map<K, ThreadCacheItem<V>>,
746        commit_txid: u64,
747        stats: &mut S,
748    ) where
749        S: ARCacheWriteStat<K>,
750    {
751        // drain tlocal into the main cache.
752        tlocal.into_iter().for_each(|(k, tcio)| {
753            #[cfg(test)]
754            {
755                inner.rec.verify();
756                inner.freq.verify();
757                inner.ghost_rec.verify();
758                inner.ghost_freq.verify();
759                inner.haunted.verify();
760            }
761
762            let r = cache.get_mut(&k);
763            match (r, tcio) {
764                (None, ThreadCacheItem::Present(tci, clean, size)) => {
765                    assert!(clean);
766                    let llp = inner.rec.append_k(CacheItemInner {
767                        k: k.clone(),
768                        txid: commit_txid,
769                        size,
770                    });
771                    // stats.write_includes += 1;
772                    stats.include(&k);
773                    // The key MUST NOT exist in the cache already.
774                    let existing = cache.insert(k, CacheItem::Rec(llp, tci));
775                    assert!(
776                        existing.is_none(),
777                        "Impossible state! Key must not already exist in cache!"
778                    );
779                }
780                (None, ThreadCacheItem::Removed(clean)) => {
781                    assert!(clean);
782                    // Mark this as haunted
783                    let llp = inner.haunted.append_k(CacheItemInner {
784                        k: k.clone(),
785                        txid: commit_txid,
786                        size: HAUNTED_SIZE,
787                    });
788                    // The key MUST NOT exist in the cache already.
789                    let existing = cache.insert(k, CacheItem::Haunted(llp));
790                    assert!(
791                        existing.is_none(),
792                        "Impossible state! Key must not already exist in cache!"
793                    );
794                    // Must be now in haunted!
795                    debug_assert!(inner.haunted.len() > 0);
796                }
797                (Some(ref mut ci), ThreadCacheItem::Removed(clean)) => {
798                    assert!(clean);
799                    // From whatever set we were in, pop and move to haunted.
800                    let mut next_state = match ci {
801                        CacheItem::Freq(llp, _v) => {
802                            let mut owned = inner.freq.extract(llp.clone());
803                            owned.as_mut().txid = commit_txid;
804                            owned.as_mut().size = HAUNTED_SIZE;
805                            let pointer = inner.haunted.append_n(owned);
806                            debug_assert!(inner.haunted.len() > 0);
807                            CacheItem::Haunted(pointer)
808                        }
809                        CacheItem::Rec(llp, _v) => {
810                            // Remove the node and put it into freq.
811                            let mut owned = inner.rec.extract(llp.clone());
812                            owned.as_mut().txid = commit_txid;
813                            owned.as_mut().size = HAUNTED_SIZE;
814                            let pointer = inner.haunted.append_n(owned);
815                            debug_assert!(inner.haunted.len() > 0);
816                            CacheItem::Haunted(pointer)
817                        }
818                        CacheItem::GhostFreq(llp) => {
819                            let mut owned = inner.ghost_freq.extract(llp.clone());
820                            owned.as_mut().txid = commit_txid;
821                            owned.as_mut().size = HAUNTED_SIZE;
822                            let pointer = inner.haunted.append_n(owned);
823                            debug_assert!(inner.haunted.len() > 0);
824                            CacheItem::Haunted(pointer)
825                        }
826                        CacheItem::GhostRec(llp) => {
827                            let mut owned = inner.ghost_rec.extract(llp.clone());
828                            owned.as_mut().txid = commit_txid;
829                            owned.as_mut().size = HAUNTED_SIZE;
830                            let pointer = inner.haunted.append_n(owned);
831                            debug_assert!(inner.haunted.len() > 0);
832                            CacheItem::Haunted(pointer)
833                        }
834                        CacheItem::Haunted(llp) => {
835                            unsafe { llp.make_mut().txid = commit_txid };
836                            debug_assert!(inner.haunted.len() > 0);
837                            CacheItem::Haunted(llp.clone())
838                        }
839                    };
840                    // Now change the state.
841                    mem::swap(*ci, &mut next_state);
842                }
843                // Done! https://github.com/rust-lang/rust/issues/68354 will stabilise
844                // in 1.44 so we can prevent a need for a clone.
845                (Some(ref mut ci), ThreadCacheItem::Present(tci, clean, size)) => {
846                    assert!(clean);
847                    //   * as we include each item, what state was it in before?
848                    // It's in the cache - what action must we take?
849                    let mut next_state = match ci {
850                        CacheItem::Freq(llp, _v) => {
851                            let mut owned = inner.freq.extract(llp.clone());
852                            owned.as_mut().txid = commit_txid;
853                            owned.as_mut().size = size;
854                            // Move the list item to it's head.
855                            stats.modify(&owned.as_ref().k);
856                            let pointer = inner.freq.append_n(owned);
857                            // Update v.
858                            CacheItem::Freq(pointer, tci)
859                        }
860                        CacheItem::Rec(llp, _v) => {
861                            // Remove the node and put it into freq.
862                            let mut owned = inner.rec.extract(llp.clone());
863                            owned.as_mut().txid = commit_txid;
864                            owned.as_mut().size = size;
865                            stats.modify(&owned.as_ref().k);
866                            let pointer = inner.freq.append_n(owned);
867                            CacheItem::Freq(pointer, tci)
868                        }
869                        CacheItem::GhostFreq(llp) => {
870                            // Adjust p
871                            Self::calc_p_freq(
872                                inner.ghost_rec.len(),
873                                inner.ghost_freq.len(),
874                                &mut inner.p,
875                                size,
876                            );
877                            let mut owned = inner.ghost_freq.extract(llp.clone());
878                            owned.as_mut().txid = commit_txid;
879                            owned.as_mut().size = size;
880                            stats.ghost_frequent_revive(&owned.as_ref().k);
881                            let pointer = inner.freq.append_n(owned);
882                            CacheItem::Freq(pointer, tci)
883                        }
884                        CacheItem::GhostRec(llp) => {
885                            // Adjust p
886                            Self::calc_p_rec(
887                                shared.max,
888                                inner.ghost_rec.len(),
889                                inner.ghost_freq.len(),
890                                &mut inner.p,
891                                size,
892                            );
893                            let mut owned = inner.ghost_rec.extract(llp.clone());
894                            owned.as_mut().txid = commit_txid;
895                            owned.as_mut().size = size;
896                            stats.ghost_recent_revive(&owned.as_ref().k);
897                            let pointer = inner.rec.append_n(owned);
898                            CacheItem::Rec(pointer, tci)
899                        }
900                        CacheItem::Haunted(llp) => {
901                            // Moving from haunted to recent.
902                            // How can an item be Haunted, but then not be in the haunted set?
903
904                            // eprintln!("{:?}", inner.haunted.len());
905                            let before_len = inner.haunted.len();
906                            debug_assert!(inner.haunted.len() > 0);
907                            let mut owned = inner.haunted.extract(llp.clone());
908                            debug_assert!(before_len - HAUNTED_SIZE == inner.haunted.len());
909
910                            owned.as_mut().txid = commit_txid;
911                            owned.as_mut().size = size;
912                            stats.include_haunted(&owned.as_ref().k);
913                            let pointer = inner.rec.append_n(owned);
914                            CacheItem::Rec(pointer, tci)
915                        }
916                    };
917                    // Now change the state.
918                    mem::swap(*ci, &mut next_state);
919                }
920            }
921
922            #[cfg(test)]
923            {
924                inner.rec.verify();
925                inner.freq.verify();
926                inner.ghost_rec.verify();
927                inner.ghost_freq.verify();
928                inner.haunted.verify();
929            }
930        });
931    }
932
933    fn drain_hit_rx(
934        &self,
935        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
936        inner: &mut ArcInner<K, V>,
937        commit_ts: Instant,
938    ) {
939        // * for each item
940        // while let Ok(ce) = inner.rx.try_recv() {
941        // TODO: Find a way to remove these clones here!
942        while let Some(ce) = inner.hit_queue.pop() {
943            let CacheHitEvent { t, k_hash } = ce;
944            if let Some(ref mut ci_slots) = unsafe { cache.get_slot_mut(k_hash) } {
945                for ref mut ci in ci_slots.iter_mut() {
946                    let mut next_state = match &ci.v {
947                        CacheItem::Freq(llp, v) => {
948                            inner.freq.touch(llp.to_owned());
949                            CacheItem::Freq(llp.to_owned(), v.to_owned())
950                        }
951                        CacheItem::Rec(llp, v) => {
952                            let owned = inner.rec.extract(llp.to_owned());
953                            let pointer = inner.freq.append_n(owned);
954                            CacheItem::Freq(pointer, v.to_owned())
955                        }
956                        // While we can't add this from nothing, we can
957                        // at least keep it in the ghost sets.
958                        CacheItem::GhostFreq(llp) => {
959                            inner.ghost_freq.touch(llp.to_owned());
960                            CacheItem::GhostFreq(llp.to_owned())
961                        }
962                        CacheItem::GhostRec(llp) => {
963                            inner.ghost_rec.touch(llp.to_owned());
964                            CacheItem::GhostRec(llp.to_owned())
965                        }
966                        CacheItem::Haunted(llp) => {
967                            // We can't do anything about this ...
968                            // Don't touch or rearrange the haunted list, it should be
969                            // in commit txid order.
970                            debug_assert!(inner.haunted.len() > 0);
971                            CacheItem::Haunted(llp.to_owned())
972                        }
973                    };
974                    mem::swap(&mut ci.v, &mut next_state);
975                } // for each item in the bucket.
976            }
977            // Do nothing, it must have been evicted.
978
979            // Stop processing the queue, we are up to "now".
980            if t >= commit_ts {
981                break;
982            }
983        }
984    }
985
986    fn drain_inc_rx<S>(
987        &self,
988        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
989        inner: &mut ArcInner<K, V>,
990        shared: &ArcShared<K, V>,
991        commit_ts: Instant,
992        stats: &mut S,
993    ) where
994        S: ARCacheWriteStat<K>,
995    {
996        while let Some(ce) = inner.inc_queue.pop() {
997            // Update if it was inc
998            let CacheIncludeEvent {
999                t,
1000                k,
1001                v: iv,
1002                txid,
1003                size,
1004            } = ce;
1005            let mut r = cache.get_mut(&k);
1006            match r {
1007                Some(ref mut ci) => {
1008                    let mut next_state = match &ci {
1009                        CacheItem::Freq(llp, _v) => {
1010                            if llp.as_ref().txid >= txid || inner.min_txid > txid {
1011                                // Our cache already has a newer value, keep it.
1012                                inner.freq.touch(llp.to_owned());
1013                                None
1014                            } else {
1015                                // The value is newer, update.
1016                                let mut owned = inner.freq.extract(llp.to_owned());
1017                                owned.as_mut().txid = txid;
1018                                owned.as_mut().size = size;
1019                                stats.modify(&owned.as_mut().k);
1020                                let pointer = inner.freq.append_n(owned);
1021                                Some(CacheItem::Freq(pointer, iv))
1022                            }
1023                        }
1024                        CacheItem::Rec(llp, v) => {
1025                            let mut owned = inner.rec.extract(llp.to_owned());
1026                            if llp.as_ref().txid >= txid || inner.min_txid > txid {
1027                                let pointer = inner.freq.append_n(owned);
1028                                Some(CacheItem::Freq(pointer, v.to_owned()))
1029                            } else {
1030                                owned.as_mut().txid = txid;
1031                                owned.as_mut().size = size;
1032                                stats.modify(&owned.as_mut().k);
1033                                let pointer = inner.freq.append_n(owned);
1034                                Some(CacheItem::Freq(pointer, iv))
1035                            }
1036                        }
1037                        CacheItem::GhostFreq(llp) => {
1038                            // Adjust p
1039                            if llp.as_ref().txid > txid || inner.min_txid > txid {
1040                                // The cache version is newer, this is just a hit.
1041                                let size = llp.as_ref().size;
1042                                Self::calc_p_freq(
1043                                    inner.ghost_rec.len(),
1044                                    inner.ghost_freq.len(),
1045                                    &mut inner.p,
1046                                    size,
1047                                );
1048                                inner.ghost_freq.touch(llp.to_owned());
1049                                None
1050                            } else {
1051                                // This item is newer, so we can include it.
1052                                Self::calc_p_freq(
1053                                    inner.ghost_rec.len(),
1054                                    inner.ghost_freq.len(),
1055                                    &mut inner.p,
1056                                    size,
1057                                );
1058                                let mut owned = inner.ghost_freq.extract(llp.to_owned());
1059                                owned.as_mut().txid = txid;
1060                                owned.as_mut().size = size;
1061                                stats.ghost_frequent_revive(&owned.as_mut().k);
1062                                let pointer = inner.freq.append_n(owned);
1063                                Some(CacheItem::Freq(pointer, iv))
1064                            }
1065                        }
1066                        CacheItem::GhostRec(llp) => {
1067                            // Adjust p
1068                            if llp.as_ref().txid > txid || inner.min_txid > txid {
1069                                let size = llp.as_ref().size;
1070                                Self::calc_p_rec(
1071                                    shared.max,
1072                                    inner.ghost_rec.len(),
1073                                    inner.ghost_freq.len(),
1074                                    &mut inner.p,
1075                                    size,
1076                                );
1077                                inner.ghost_rec.touch(llp.clone());
1078                                None
1079                            } else {
1080                                Self::calc_p_rec(
1081                                    shared.max,
1082                                    inner.ghost_rec.len(),
1083                                    inner.ghost_freq.len(),
1084                                    &mut inner.p,
1085                                    size,
1086                                );
1087                                let mut owned = inner.ghost_rec.extract(llp.to_owned());
1088                                owned.as_mut().txid = txid;
1089                                owned.as_mut().size = size;
1090                                stats.ghost_recent_revive(&owned.as_ref().k);
1091                                let pointer = inner.rec.append_n(owned);
1092                                Some(CacheItem::Rec(pointer, iv))
1093                            }
1094                        }
1095                        CacheItem::Haunted(llp) => {
1096                            // if
1097                            //     the haunted item is newer
1098                            // OR
1099                            //     inclusion is older than our minimum
1100                            // then the item is skipped.
1101                            if llp.as_ref().txid > txid || inner.min_txid > txid {
1102                                None
1103                            } else {
1104                                // ELSE we need to update the txid of the haunted item
1105                                // to ensure that it's at the list head.
1106                                debug_assert!(inner.haunted.len() > 0);
1107                                let before_len = inner.haunted.len();
1108
1109                                let mut owned = inner.haunted.extract(llp.to_owned());
1110
1111                                debug_assert!(before_len - HAUNTED_SIZE == inner.haunted.len());
1112
1113                                owned.as_mut().txid = txid;
1114                                debug_assert!(owned.as_mut().size == HAUNTED_SIZE);
1115                                stats.include_haunted(&owned.as_mut().k);
1116                                let pointer = inner.rec.append_n(owned);
1117                                Some(CacheItem::Rec(pointer, iv))
1118                            }
1119                        }
1120                    };
1121                    if let Some(ref mut next_state) = next_state {
1122                        mem::swap(*ci, next_state);
1123                    }
1124                }
1125                None => {
1126                    // This key has never been seen before.
1127                    // It's not present - include it!
1128                    if txid >= inner.min_txid {
1129                        let llp = inner.rec.append_k(CacheItemInner {
1130                            k: k.clone(),
1131                            txid,
1132                            size,
1133                        });
1134                        stats.include(&k);
1135                        // The key MUST NOT exist in the cache already.
1136                        let existing = cache.insert(k, CacheItem::Rec(llp, iv));
1137                        assert!(
1138                            existing.is_none(),
1139                            "Impossible state! Key must not already exist in cache!"
1140                        );
1141                    }
1142                }
1143            };
1144
1145            // Stop processing the queue, we are up to "now".
1146            if t >= commit_ts {
1147                break;
1148            }
1149        }
1150    }
1151
1152    fn drain_tlocal_hits(
1153        &self,
1154        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1155        inner: &mut ArcInner<K, V>,
1156        // shared: &ArcShared<K, V>,
1157        commit_txid: u64,
1158        hit: Vec<u64>,
1159    ) {
1160        // Stats updated by caller
1161        hit.into_iter().for_each(|k_hash| {
1162            // * everything hit must be in main cache now, so bring these
1163            //   all to the relevant item heads.
1164            // * Why do this last? Because the write is the "latest" we want all the fresh
1165            //   written items in the cache over the "read" hits, it gives us some appximation
1166            //   of time ordering, but not perfect.
1167
1168            // Find the item in the cache.
1169            // * based on it's type, promote it in the correct list, or move it.
1170            // How does this prevent incorrect promotion from rec to freq? txid?
1171            let mut r = unsafe { cache.get_slot_mut(k_hash) };
1172            match r {
1173                Some(ref mut ci_slots) => {
1174                    for ref mut ci in ci_slots.iter_mut() {
1175                        // This differs from above - we skip if we don't touch anything
1176                        // that was added in this txn. This is to prevent double touching
1177                        // anything that was included in a write.
1178
1179                        // TODO: find a way to remove these clones
1180                        let mut next_state = match &ci.v {
1181                            CacheItem::Freq(llp, v) => {
1182                                // To prevent cache hysterisis, we require a hit over
1183                                // two transactions.
1184                                if llp.as_ref().txid != commit_txid {
1185                                    inner.freq.touch(llp.to_owned());
1186                                    Some(CacheItem::Freq(llp.to_owned(), v.to_owned()))
1187                                } else {
1188                                    None
1189                                }
1190                            }
1191                            CacheItem::Rec(llp, v) => {
1192                                if llp.as_ref().txid != commit_txid {
1193                                    // println!("hit {:?} Rec -> Freq", k);
1194                                    let owned = inner.rec.extract(llp.clone());
1195                                    let pointer = inner.freq.append_n(owned);
1196                                    Some(CacheItem::Freq(pointer, v.clone()))
1197                                } else {
1198                                    None
1199                                }
1200                            }
1201                            _ => {
1202                                // Ignore hits on items that may have been cleared.
1203                                None
1204                            }
1205                        };
1206                        // Now change the state.
1207                        if let Some(ref mut next_state) = next_state {
1208                            mem::swap(&mut ci.v, next_state);
1209                        }
1210                    } // for each ci in slots
1211                }
1212                None => {
1213                    // Impossible state!
1214                    unreachable!();
1215                }
1216            }
1217        });
1218    }
1219
1220    fn evict_to_haunted_len(
1221        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1222        ll: &mut LL<CacheItemInner<K>>,
1223        to_ll: &mut LL<CacheItemInner<K>>,
1224        size: usize,
1225        txid: u64,
1226    ) {
1227        let to_ll_before = to_ll.len();
1228        let ll_before = ll.len();
1229        let mut added = 0;
1230        let mut removed = 0;
1231
1232        while ll.len() > size {
1233            #[cfg(test)]
1234            {
1235                ll.verify();
1236                to_ll.verify();
1237            }
1238
1239            if let Some(mut owned) = ll.pop() {
1240                debug_assert!(!owned.is_null());
1241
1242                // Track the sizes.
1243                removed += owned.as_mut().size;
1244
1245                assert_eq!(ll.len(), ll_before - removed);
1246
1247                // Set the item's evict txid.
1248                owned.as_mut().txid = txid;
1249                // Trim the haunted size as needed.
1250                owned.as_mut().size = HAUNTED_SIZE;
1251                added += HAUNTED_SIZE;
1252
1253                let pointer = to_ll.append_n(owned);
1254
1255                assert_eq!(
1256                    to_ll.len(),
1257                    to_ll_before + added,
1258                    "Impossible State! List lengths are no longer consistent!"
1259                );
1260
1261                let mut r = cache.get_mut(&pointer.as_ref().k);
1262
1263                match r {
1264                    Some(ref mut ci) => {
1265                        // Now change the state.
1266                        let mut next_state = CacheItem::Haunted(pointer);
1267                        mem::swap(*ci, &mut next_state);
1268                    }
1269                    None => {
1270                        // Impossible state!
1271                        unreachable!();
1272                    }
1273                };
1274            } else {
1275                // Impossible state!
1276                unreachable!();
1277            }
1278
1279            #[cfg(test)]
1280            {
1281                ll.verify();
1282                to_ll.verify();
1283            }
1284        }
1285    }
1286
1287    fn evict_to_len<S>(
1288        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1289        ll: &mut LL<CacheItemInner<K>>,
1290        to_ll: &mut LL<CacheItemInner<K>>,
1291        size: usize,
1292        txid: u64,
1293        stats: &mut S,
1294    ) where
1295        S: ARCacheWriteStat<K>,
1296    {
1297        debug_assert!(ll.len() >= size);
1298
1299        while ll.len() > size {
1300            #[cfg(test)]
1301            {
1302                ll.verify();
1303                to_ll.verify();
1304            }
1305
1306            if let Some(mut owned) = ll.pop() {
1307                debug_assert!(!owned.is_null());
1308                let mut r = cache.get_mut(&owned.as_ref().k);
1309                // Set the item's evict txid.
1310                owned.as_mut().txid = txid;
1311                match r {
1312                    Some(ref mut ci) => {
1313                        let mut next_state = match &ci {
1314                            CacheItem::Freq(llp, _v) => {
1315                                // The pointer from any key MUST be unique!
1316                                assert!(llp == &owned, "Impossible State! Pointer in map does not match the pointer from the list!");
1317                                // No need to extract, already popped!
1318                                // $ll.extract(*llp);
1319                                stats.evict_from_frequent(&owned.as_ref().k);
1320                                let pointer = to_ll.append_n(owned);
1321                                CacheItem::GhostFreq(pointer)
1322                            }
1323                            CacheItem::Rec(llp, _v) => {
1324                                // The pointer from any key MUST be unique!
1325                                assert!(llp == &owned, "Impossible State! Pointer in map does not match the pointer from the list!");
1326                                // No need to extract, already popped!
1327                                // $ll.extract(*llp);
1328                                stats.evict_from_recent(&owned.as_mut().k);
1329                                let pointer = to_ll.append_n(owned);
1330                                CacheItem::GhostRec(pointer)
1331                            }
1332                            _ => {
1333                                // Impossible state! All members of the from-ll, must be
1334                                // in either the frequent or recent state.
1335                                unreachable!();
1336                            }
1337                        };
1338                        // Now change the state.
1339                        mem::swap(*ci, &mut next_state);
1340                    }
1341                    None => {
1342                        // Impossible state! This indicates that the key was already
1343                        // removed. Only one key -> linked-list-pointer should exist at
1344                        // anytime. If we already removed this, that indicates there were
1345                        // two llp's with the same key!
1346                        unreachable!();
1347                    }
1348                };
1349            } else {
1350                //  Impossible state!
1351                unreachable!();
1352            }
1353
1354            #[cfg(test)]
1355            {
1356                ll.verify();
1357                to_ll.verify();
1358            }
1359        } // end while
1360    }
1361
1362    #[allow(clippy::cognitive_complexity)]
1363    fn evict<S>(
1364        &self,
1365        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1366        inner: &mut ArcInner<K, V>,
1367        shared: &ArcShared<K, V>,
1368        commit_txid: u64,
1369        stats: &mut S,
1370    ) where
1371        S: ARCacheWriteStat<K>,
1372    {
1373        debug_assert!(inner.p <= shared.max);
1374        // Convince the compiler copying is okay.
1375        let p = inner.p;
1376
1377        if inner.rec.len() + inner.freq.len() > shared.max {
1378            // println!("Checking cache evict");
1379            trace!(
1380                "from -> rec {:?}, freq {:?}",
1381                inner.rec.len(),
1382                inner.freq.len()
1383            );
1384            let delta = (inner.rec.len() + inner.freq.len()) - shared.max;
1385            // We have overflowed by delta. As we are not "evicting as we go" we have to work out
1386            // what we should have evicted up to now.
1387            //
1388            // keep removing from rec until == p OR delta == 0, and if delta remains, then remove from freq.
1389            //
1390            // Remember P is "the maximum size of recent" or "pressure on recent". If P is max then
1391            // we are pressuring churning on recent but not freq, so evict in freq.
1392            //
1393            // If P is toward 0 that means all our pressure is in frequent and we evicted things we
1394            // shouldn't have so we want more space in frequent and less in rec.
1395
1396            // delta here is the number of elements we need to remove to remain below shared.max.
1397
1398            let rec_to_len = if inner.p == 0 {
1399                trace!("p == 0 => {:?} - {}", inner.rec.len(), delta);
1400                if delta < inner.rec.len() {
1401                    // We are fully weighted to freq, so only remove in recent.
1402                    inner.rec.len() - delta
1403                } else {
1404                    // We need to fully clear rec *and* then from freq as well.
1405                    0
1406                }
1407            } else if inner.rec.len() > inner.p {
1408                // There is a partial weighting, how much do we need to move?
1409                let rec_delta = inner.rec.len() - inner.p;
1410                if rec_delta > delta {
1411                    /*
1412                    println!(
1413                        "p ({:?}) <= rec ({:?}), rec_delta ({:?}) > delta ({:?})",
1414                        inner.p,
1415                        inner.rec.len(),
1416                        rec_delta,
1417                        delta
1418                    );
1419                    */
1420                    // We will have removed enough through delta alone in rec. Technically
1421                    // this means we are still over p, but since we already removed delta
1422                    // number of elements, freq won't change dimensions.
1423                    inner.rec.len() - delta
1424                } else {
1425                    /*
1426                    println!(
1427                        "p ({:?}) <= rec ({:?}), rec_delta ({:?}) <= delta ({:?})",
1428                        inner.p,
1429                        inner.rec.len(),
1430                        rec_delta,
1431                        delta
1432                    );
1433                    */
1434                    // Remove the full recent delta, and excess will be removed from freq.
1435                    inner.rec.len() - rec_delta
1436                }
1437            } else {
1438                // rec is already below p, therefore we must need to remove in freq, and
1439                // we need to consider how much is in rec.
1440                // println!("p ({:?}) > rec ({:?})", inner.p, inner.rec.len());
1441                inner.rec.len()
1442            };
1443
1444            // Now we can get the expected sizes;
1445            let freq_to_len = shared.max - rec_to_len;
1446            // println!("move to -> rec {:?}, freq {:?}", rec_to_len, freq_to_len);
1447            debug_assert!(rec_to_len <= inner.rec.len());
1448            debug_assert!(freq_to_len <= inner.freq.len());
1449
1450            // stats.freq_evicts += (inner.freq.len() - freq_to_len) as u64;
1451            // stats.recent_evicts += (inner.rec.len() - rec_to_len) as u64;
1452            // stats.frequent_evict_add((inner.freq.len() - freq_to_len) as u64);
1453            // stats.recent_evict_add((inner.rec.len() - rec_to_len) as u64);
1454
1455            Self::evict_to_len(
1456                cache,
1457                &mut inner.rec,
1458                &mut inner.ghost_rec,
1459                rec_to_len,
1460                commit_txid,
1461                stats,
1462            );
1463            Self::evict_to_len(
1464                cache,
1465                &mut inner.freq,
1466                &mut inner.ghost_freq,
1467                freq_to_len,
1468                commit_txid,
1469                stats,
1470            );
1471
1472            // Finally, do an evict of the ghost sets if they are too long - these are weighted
1473            // inverse to the above sets. Note the freq to len in ghost rec, and rec to len in
1474            // ghost freq!
1475            if inner.ghost_rec.len() > (shared.max - p) {
1476                Self::evict_to_haunted_len(
1477                    cache,
1478                    &mut inner.ghost_rec,
1479                    &mut inner.haunted,
1480                    freq_to_len,
1481                    commit_txid,
1482                );
1483            }
1484
1485            if inner.ghost_freq.len() > p {
1486                Self::evict_to_haunted_len(
1487                    cache,
1488                    &mut inner.ghost_freq,
1489                    &mut inner.haunted,
1490                    rec_to_len,
1491                    commit_txid,
1492                );
1493            }
1494        }
1495    }
1496
1497    fn drain_ll_to_ghost<S>(
1498        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1499        ll: &mut LL<CacheItemInner<K>>,
1500        gf: &mut LL<CacheItemInner<K>>,
1501        gr: &mut LL<CacheItemInner<K>>,
1502        txid: u64,
1503        stats: &mut S,
1504    ) where
1505        S: ARCacheWriteStat<K>,
1506    {
1507        while let Some(mut owned) = ll.pop() {
1508            #[cfg(test)]
1509            {
1510                ll.verify();
1511                gf.verify();
1512                gr.verify();
1513            }
1514
1515            debug_assert!(!owned.is_null());
1516
1517            // Set the item's eviction txid.
1518            owned.as_mut().txid = txid;
1519
1520            let mut r = cache.get_mut(&owned.as_ref().k);
1521            match r {
1522                Some(ref mut ci) => {
1523                    let mut next_state = match &ci {
1524                        CacheItem::Freq(n, _) => {
1525                            debug_assert!(n == &owned);
1526                            stats.evict_from_frequent(&owned.as_ref().k);
1527                            let pointer = gf.append_n(owned);
1528                            CacheItem::GhostFreq(pointer)
1529                        }
1530                        CacheItem::Rec(n, _) => {
1531                            debug_assert!(n == &owned);
1532                            stats.evict_from_recent(&owned.as_ref().k);
1533                            let pointer = gr.append_n(owned);
1534                            CacheItem::GhostRec(pointer)
1535                        }
1536                        _ => {
1537                            // Impossible state!
1538                            unreachable!();
1539                        }
1540                    };
1541                    // Now change the state.
1542                    mem::swap(*ci, &mut next_state);
1543                }
1544                None => {
1545                    // Impossible state! This indicates that the key was already
1546                    // removed. Only one key -> linked-list-pointer should exist at
1547                    // anytime. If we already removed this, that indicates there were
1548                    // two llp's with the same key!
1549                    unreachable!();
1550                }
1551            }
1552
1553            #[cfg(test)]
1554            {
1555                ll.verify();
1556                gf.verify();
1557                gr.verify();
1558            }
1559        } // end while
1560    }
1561
1562    fn drain_ll_min_txid(
1563        cache: &mut DataMapWriteTxn<K, CacheItem<K, V>>,
1564        ll: &mut LL<CacheItemInner<K>>,
1565        min_txid: u64,
1566    ) {
1567        while let Some(node) = ll.peek_head() {
1568            #[cfg(test)]
1569            {
1570                ll.verify();
1571            }
1572
1573            // if the node is older than our min txid.
1574            if node.txid < min_txid {
1575                let before_len = ll.len();
1576                debug_assert!(ll.len() > 0);
1577
1578                // Need to free from the cache.
1579                cache.remove(&node.k);
1580
1581                // Okay, this node can be trimmed.
1582                ll.drop_head();
1583
1584                debug_assert!(before_len - HAUNTED_SIZE == ll.len());
1585            } else {
1586                // We are done with this loop, everything else
1587                // is newer.
1588                break;
1589            }
1590
1591            #[cfg(test)]
1592            {
1593                ll.verify();
1594            }
1595        }
1596    }
1597
1598    #[allow(clippy::unnecessary_mut_passed)]
1599    fn commit<S>(
1600        &self,
1601        mut cache: DataMapWriteTxn<K, CacheItem<K, V>>,
1602        tlocal: Map<K, ThreadCacheItem<V>>,
1603        hit: Vec<u64>,
1604        clear: bool,
1605        init_above_watermark: bool,
1606        // read_ops: u32,
1607        mut stats: S,
1608    ) -> S
1609    where
1610        S: ARCacheWriteStat<K>,
1611    {
1612        // What is the time?
1613        let commit_ts = Instant::now();
1614        let commit_txid = cache.get_txid();
1615        // Copy p + init cache sizes for adjustment.
1616        let mut inner = self.inner.lock().unwrap();
1617        let shared = self.shared.read().unwrap();
1618
1619        // Did we request to be cleared? If so, we move everything to a ghost set
1620        // that was live.
1621        //
1622        // we also set the min_txid watermark which prevents any inclusion of
1623        // any item that existed before this point in time.
1624        if clear {
1625            // Set the watermark of this txn.
1626            inner.min_txid = commit_txid;
1627
1628            // Indicate that we evicted all to ghost/freq
1629            // stats.frequent_evict_add(inner.freq.len() as u64);
1630            // stats.recent_evict_add(inner.rec.len() as u64);
1631
1632            // This weird looking dance is to convince rust that the mutable borrow is safe.
1633            let m_inner = inner.deref_mut();
1634
1635            let i_f = &mut m_inner.freq;
1636            let g_f = &mut m_inner.ghost_freq;
1637            let i_r = &mut m_inner.rec;
1638            let g_r = &mut m_inner.ghost_rec;
1639
1640            // Move everything active into ghost sets.
1641            Self::drain_ll_to_ghost(&mut cache, i_f, g_f, g_r, commit_txid, &mut stats);
1642            Self::drain_ll_to_ghost(&mut cache, i_r, g_f, g_r, commit_txid, &mut stats);
1643        } else {
1644            // Update the minimum txid we'll include from based on the look back limit
1645            //
1646            // If a clear happens, we don't want this setting the limit back lower than an existing
1647            // limit, so we take the greater of these values.
1648            let possible_new_limit = commit_txid.saturating_sub(self.look_back_limit);
1649            inner.min_txid = inner.min_txid.max(possible_new_limit);
1650        }
1651
1652        // Why is it okay to drain the rx/tlocal and create the cache in a temporary
1653        // oversize? Because these values in the queue/tlocal are already in memory
1654        // and we are moving them to the cache, we are not actually using any more
1655        // memory (well, not significantly more). By adding everything, then evicting
1656        // we also get better and more accurate hit patterns over the cache based on what
1657        // was used. This gives us an advantage over other cache types - we can see
1658        // patterns based on temporal usage that other caches can't, at the expense that
1659        // it may take some moments for that cache pattern to sync to the main thread.
1660
1661        self.drain_tlocal_inc(
1662            &mut cache,
1663            inner.deref_mut(),
1664            shared.deref(),
1665            tlocal,
1666            commit_txid,
1667            &mut stats,
1668        );
1669
1670        // drain rx until empty or time >= time.
1671        self.drain_inc_rx(
1672            &mut cache,
1673            inner.deref_mut(),
1674            shared.deref(),
1675            commit_ts,
1676            &mut stats,
1677        );
1678
1679        self.drain_hit_rx(&mut cache, inner.deref_mut(), commit_ts);
1680
1681        // drain the tlocal hits into the main cache.
1682
1683        // stats.write_hits += hit.len() as u64;
1684        // stats.write_read_ops += read_ops as u64;
1685
1686        self.drain_tlocal_hits(&mut cache, inner.deref_mut(), commit_txid, hit);
1687
1688        // now clean the space for each of the primary caches, evicting into the ghost sets.
1689        // * It's possible that both caches are now over-sized if rx was empty
1690        //   but wr inc many items.
1691        // * p has possibly changed width, causing a balance shift
1692        // * and ghost items have been included changing ghost list sizes.
1693        // so we need to do a clean up/balance of all the list lengths.
1694        self.evict(
1695            &mut cache,
1696            inner.deref_mut(),
1697            shared.deref(),
1698            commit_txid,
1699            &mut stats,
1700        );
1701
1702        // self.drain_stat_rx(inner.deref_mut(), stats, commit_ts);
1703
1704        // Now drain from the haunted set if required, this removes anything
1705        // lower than the min_txid.
1706        {
1707            // Tell the compiler copying is okay.
1708            let min_txid = inner.min_txid;
1709            Self::drain_ll_min_txid(&mut cache, &mut inner.haunted, min_txid);
1710        }
1711
1712        stats.p_weight(inner.p as u64);
1713        stats.shared_max(shared.max as u64);
1714        stats.freq(inner.freq.len() as u64);
1715        stats.recent(inner.rec.len() as u64);
1716        stats.all_seen_keys(cache.len() as u64);
1717
1718        // Indicate if we are at/above watermark, so that read/writers begin to indicate their
1719        // hit events so we can start to setup/order our arc sets correctly.
1720        //
1721        // If we drop below this again, they'll go back to just insert/remove content only mode.
1722        if init_above_watermark {
1723            // We were above, now we are below the limit, go back to just insert/remove only mode.
1724            if (inner.freq.len() + inner.rec.len()) < shared.watermark {
1725                self.above_watermark.store(false, Ordering::Relaxed);
1726            }
1727        } else if (inner.freq.len() + inner.rec.len()) >= shared.watermark {
1728            // we were not above above the watermark but now the cache is large enough
1729            // to demand that we should be tracking data.
1730            self.above_watermark.store(true, Ordering::Relaxed);
1731        }
1732
1733        // commit on the wr txn.
1734        cache.commit();
1735        // done!
1736
1737        // Return the stats to the caller.
1738        stats
1739    }
1740}
1741
1742impl<
1743        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
1744        V: Clone + Debug + Sync + Send + 'static,
1745        S: ARCacheWriteStat<K>,
1746    > ARCacheWriteTxn<'_, K, V, S>
1747{
1748    /// Commit the changes of this writer, making them globally visible. This causes
1749    /// all items written to this thread's local store to become visible in the main
1750    /// cache.
1751    ///
1752    /// To rollback (abort) and operation, just do not call commit (consider std::mem::drop
1753    /// on the write transaction)
1754    pub fn commit(self) -> S {
1755        self.caller.commit(
1756            self.cache,
1757            self.tlocal,
1758            self.hit.into_inner(),
1759            self.clear.into_inner(),
1760            self.above_watermark,
1761            // self.read_ops.into_inner(),
1762            self.stats,
1763        )
1764    }
1765
1766    /// Clear all items of the cache. This operation does not take effect until you commit.
1767    /// After calling "clear", you may then include new items which will be stored thread
1768    /// locally until you commit.
1769    pub fn clear(&mut self) {
1770        // Mark that we have been requested to clear the cache.
1771        unsafe {
1772            let clear_ptr = self.clear.get();
1773            *clear_ptr = true;
1774        }
1775        // Dump the hit log.
1776        unsafe {
1777            let hit_ptr = self.hit.get();
1778            (*hit_ptr).clear();
1779        }
1780
1781        // Throw away any read ops we did on the old values since they'll
1782        // mess up stat numbers.
1783        self.stats.cache_clear();
1784        /*
1785        unsafe {
1786            let op_ptr = self.read_ops.get();
1787            (*op_ptr) = 0;
1788        }
1789        */
1790
1791        // Dump the thread local state.
1792        self.tlocal.clear();
1793        // From this point any get will miss on the main cache.
1794        // Inserts are accepted.
1795    }
1796
1797    /// Attempt to retrieve a k-v pair from the cache. If it is present in the main cache OR
1798    /// the thread local cache, a `Some` is returned, else you will receive a `None`. On a
1799    /// `None`, you must then consult the external data source that this structure is acting
1800    /// as a cache for.
1801    pub fn get<Q>(&mut self, k: &Q) -> Option<&V>
1802    where
1803        K: Borrow<Q>,
1804        Q: Hash + Eq + Ord + ?Sized,
1805    {
1806        let k_hash: u64 = self.cache.prehash(k);
1807
1808        // Track the attempted read op
1809        /*
1810        unsafe {
1811            let op_ptr = self.read_ops.get();
1812            (*op_ptr) += 1;
1813        }
1814        */
1815        self.stats.cache_read();
1816
1817        let r: Option<&V> = if let Some(tci) = self.tlocal.get(k) {
1818            match tci {
1819                ThreadCacheItem::Present(v, _clean, _size) => {
1820                    let v = v as *const _;
1821                    unsafe { Some(&(*v)) }
1822                }
1823                ThreadCacheItem::Removed(_clean) => {
1824                    return None;
1825                }
1826            }
1827        } else {
1828            // If we have been requested to clear, the main cache is "empty"
1829            // but we can't do that until a commit, so we just flag it and avoid.
1830            let is_cleared = unsafe {
1831                let clear_ptr = self.clear.get();
1832                *clear_ptr
1833            };
1834            if !is_cleared {
1835                if let Some(v) = self.cache.get_prehashed(k, k_hash) {
1836                    (*v).to_vref()
1837                } else {
1838                    None
1839                }
1840            } else {
1841                None
1842            }
1843        };
1844
1845        if r.is_some() {
1846            self.stats.cache_hit();
1847        }
1848
1849        // How do we track this was a hit?
1850        // Remember, we don't track misses - they are *implied* by the fact they'll trigger
1851        // an inclusion from the external system. Subsequent, any further re-hit on an
1852        // included value WILL be tracked, allowing arc to adjust appropriately.
1853        if self.above_watermark && r.is_some() {
1854            unsafe {
1855                let hit_ptr = self.hit.get();
1856                (*hit_ptr).push(k_hash);
1857            }
1858        }
1859        r
1860    }
1861
1862    /// If a value is in the thread local cache, retrieve it for mutation. If the value
1863    /// is not in the thread local cache, it is retrieved and cloned from the main cache. If
1864    /// the value had been marked for removal, it must first be re-inserted.
1865    ///
1866    /// # Safety
1867    ///
1868    /// Since you are mutating the state of the value, if you have sized insertions you MAY
1869    /// break this since you can change the weight of the value to be inconsistent
1870    pub fn get_mut<Q>(&mut self, k: &Q, make_dirty: bool) -> Option<&mut V>
1871    where
1872        K: Borrow<Q>,
1873        Q: Hash + Eq + Ord + ?Sized,
1874    {
1875        // If we were requested to clear, we can not copy to the tlocal cache.
1876        let is_cleared = unsafe {
1877            let clear_ptr = self.clear.get();
1878            *clear_ptr
1879        };
1880
1881        // If the main cache has NOT been cleared (ie it has items) and our tlocal
1882        // does NOT contain this key, then we prime it.
1883        if !is_cleared && !self.tlocal.contains_key(k) {
1884            // Copy from the core cache into the tlocal.
1885            let k_hash: u64 = self.cache.prehash(k);
1886            if let Some(v) = self.cache.get_prehashed(k, k_hash) {
1887                if let Some((dk, dv, ds)) = v.to_kvsref() {
1888                    self.tlocal.insert(
1889                        dk.clone(),
1890                        ThreadCacheItem::Present(dv.clone(), !make_dirty, ds),
1891                    );
1892                }
1893            }
1894        };
1895
1896        // Now return from the tlocal, if present, a mut pointer.
1897
1898        match self.tlocal.get_mut(k) {
1899            Some(ThreadCacheItem::Present(v, clean, _size)) => {
1900                if make_dirty && *clean {
1901                    *clean = false;
1902                }
1903                let v = v as *mut _;
1904                unsafe { Some(&mut (*v)) }
1905            }
1906            _ => None,
1907        }
1908    }
1909
1910    /// Determine if this cache contains the following key.
1911    pub fn contains_key<Q>(&mut self, k: &Q) -> bool
1912    where
1913        K: Borrow<Q>,
1914        Q: Hash + Eq + Ord + ?Sized,
1915    {
1916        self.get(k).is_some()
1917    }
1918
1919    /// Add a value to the cache. This may be because you have had a cache miss and
1920    /// now wish to include in the thread local storage, or because you have written
1921    /// a new value and want it to be submitted for caching. This item is marked as
1922    /// clean, IE you have synced it to whatever associated store exists.
1923    pub fn insert(&mut self, k: K, v: V) {
1924        self.tlocal.insert(k, ThreadCacheItem::Present(v, true, 1));
1925    }
1926
1927    /// Insert an item to the cache, with an associated weight/size factor. See also `insert`
1928    pub fn insert_sized(&mut self, k: K, v: V, size: NonZeroUsize) {
1929        self.tlocal
1930            .insert(k, ThreadCacheItem::Present(v, true, size.get()));
1931    }
1932
1933    /// Remove this value from the thread local cache IE mask from from being
1934    /// returned until this thread performs an insert. This item is marked as clean
1935    /// IE you have synced it to whatever associated store exists.
1936    pub fn remove(&mut self, k: K) {
1937        self.tlocal.insert(k, ThreadCacheItem::Removed(true));
1938    }
1939
1940    /// Add a value to the cache. This may be because you have had a cache miss and
1941    /// now wish to include in the thread local storage, or because you have written
1942    /// a new value and want it to be submitted for caching. This item is marked as
1943    /// dirty, because you have *not* synced it. You MUST call iter_mut_mark_clean before calling
1944    /// `commit` on this transaction, or a panic will occur.
1945    pub fn insert_dirty(&mut self, k: K, v: V) {
1946        self.tlocal.insert(k, ThreadCacheItem::Present(v, false, 1));
1947    }
1948
1949    /// Insert a dirty item to the cache, with an associated weight/size factor. See also `insert_dirty`
1950    pub fn insert_dirty_sized(&mut self, k: K, v: V, size: NonZeroUsize) {
1951        self.tlocal
1952            .insert(k, ThreadCacheItem::Present(v, false, size.get()));
1953    }
1954
1955    /// Remove this value from the thread local cache IE mask from from being
1956    /// returned until this thread performs an insert. This item is marked as
1957    /// dirty, because you have *not* synced it. You MUST call iter_mut_mark_clean before calling
1958    /// `commit` on this transaction, or a panic will occur.
1959    pub fn remove_dirty(&mut self, k: K) {
1960        self.tlocal.insert(k, ThreadCacheItem::Removed(false));
1961    }
1962
1963    /// Determines if dirty elements exist in this cache or not.
1964    pub fn is_dirty(&self) -> bool {
1965        self.iter_dirty().take(1).next().is_some()
1966    }
1967
1968    /// Yields an iterator over all values that are currently dirty. As the iterator
1969    /// progresses, items will NOT be marked clean. This allows you to examine
1970    /// any currently dirty items in the cache.
1971    pub fn iter_dirty(&self) -> impl Iterator<Item = (&K, Option<&V>)> {
1972        self.tlocal
1973            .iter()
1974            .filter(|(_k, v)| match v {
1975                ThreadCacheItem::Present(_v, c, _size) => !c,
1976                ThreadCacheItem::Removed(c) => !c,
1977            })
1978            .map(|(k, v)| {
1979                // Get the data.
1980                let data = match v {
1981                    ThreadCacheItem::Present(v, _c, _size) => Some(v),
1982                    ThreadCacheItem::Removed(_c) => None,
1983                };
1984                (k, data)
1985            })
1986    }
1987
1988    /// Yields a mutable iterator over all values that are currently dirty. As the iterator
1989    /// progresses, items will NOT be marked clean. This allows you to modify and
1990    /// change any currently dirty items as required.
1991    pub fn iter_mut_dirty(&mut self) -> impl Iterator<Item = (&K, Option<&mut V>)> {
1992        self.tlocal
1993            .iter_mut()
1994            .filter(|(_k, v)| match v {
1995                ThreadCacheItem::Present(_v, c, _size) => !c,
1996                ThreadCacheItem::Removed(c) => !c,
1997            })
1998            .map(|(k, v)| {
1999                // Get the data.
2000                let data = match v {
2001                    ThreadCacheItem::Present(v, _c, _size) => Some(v),
2002                    ThreadCacheItem::Removed(_c) => None,
2003                };
2004                (k, data)
2005            })
2006    }
2007
2008    /// Yields an iterator over all values that are currently dirty. As the iterator
2009    /// progresses, items will be marked clean. This is where you should sync dirty
2010    /// cache content to your associated store. The iterator is `K, Option<V>`, where
2011    /// the `Option<V>` indicates if the item has been remove (None) or is updated (Some).
2012    pub fn iter_mut_mark_clean(&mut self) -> impl Iterator<Item = (&K, Option<&mut V>)> {
2013        self.tlocal
2014            .iter_mut()
2015            .filter(|(_k, v)| match v {
2016                ThreadCacheItem::Present(_v, c, _size) => !c,
2017                ThreadCacheItem::Removed(c) => !c,
2018            })
2019            .map(|(k, v)| {
2020                // Mark it clean.
2021                match v {
2022                    ThreadCacheItem::Present(_v, c, _size) => *c = true,
2023                    ThreadCacheItem::Removed(c) => *c = true,
2024                }
2025                // Get the data.
2026                let data = match v {
2027                    ThreadCacheItem::Present(v, _c, _size) => Some(v),
2028                    ThreadCacheItem::Removed(_c) => None,
2029                };
2030                (k, data)
2031            })
2032    }
2033
2034    /// Yield an iterator over all currently live and valid cache items.
2035    pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
2036        self.cache.values().filter_map(|ci| match &ci {
2037            CacheItem::Rec(lln, v) => {
2038                let cii = lln.as_ref();
2039                Some((&cii.k, v))
2040            }
2041            CacheItem::Freq(lln, v) => {
2042                let cii = lln.as_ref();
2043                Some((&cii.k, v))
2044            }
2045            _ => None,
2046        })
2047    }
2048
2049    /// Yield an iterator over all currently live and valid items in the
2050    /// recent access list.
2051    pub fn iter_rec(&self) -> impl Iterator<Item = &K> {
2052        self.cache.values().filter_map(|ci| match &ci {
2053            CacheItem::Rec(lln, _) => {
2054                let cii = lln.as_ref();
2055                Some(&cii.k)
2056            }
2057            _ => None,
2058        })
2059    }
2060
2061    /// Yield an iterator over all currently live and valid items in the
2062    /// frequent access list.
2063    pub fn iter_freq(&self) -> impl Iterator<Item = &K> {
2064        self.cache.values().filter_map(|ci| match &ci {
2065            CacheItem::Rec(lln, _) => {
2066                let cii = lln.as_ref();
2067                Some(&cii.k)
2068            }
2069            _ => None,
2070        })
2071    }
2072
2073    #[cfg(test)]
2074    pub(crate) fn iter_ghost_rec(&self) -> impl Iterator<Item = &K> {
2075        self.cache.values().filter_map(|ci| match &ci {
2076            CacheItem::GhostRec(lln) => {
2077                let cii = lln.as_ref();
2078                Some(&cii.k)
2079            }
2080            _ => None,
2081        })
2082    }
2083
2084    #[cfg(test)]
2085    pub(crate) fn iter_ghost_freq(&self) -> impl Iterator<Item = &K> {
2086        self.cache.values().filter_map(|ci| match &ci {
2087            CacheItem::GhostFreq(lln) => {
2088                let cii = lln.as_ref();
2089                Some(&cii.k)
2090            }
2091            _ => None,
2092        })
2093    }
2094
2095    #[cfg(test)]
2096    pub(crate) fn peek_hit(&self) -> &[u64] {
2097        let hit_ptr = self.hit.get();
2098        unsafe { &(*hit_ptr) }
2099    }
2100
2101    #[cfg(test)]
2102    pub(crate) fn peek_cache<Q: ?Sized + Hash + Eq + Ord>(&self, k: &Q) -> CacheState
2103    where
2104        K: Borrow<Q>,
2105    {
2106        if let Some(v) = self.cache.get(k) {
2107            (*v).to_state()
2108        } else {
2109            CacheState::None
2110        }
2111    }
2112
2113    #[cfg(test)]
2114    pub(crate) fn peek_stat(&self) -> CStat {
2115        let inner = self.caller.inner.lock().unwrap();
2116        let shared = self.caller.shared.read().unwrap();
2117        CStat {
2118            max: shared.max,
2119            cache: self.cache.len(),
2120            tlocal: self.tlocal.len(),
2121            freq: inner.freq.len(),
2122            rec: inner.rec.len(),
2123            ghost_freq: inner.ghost_freq.len(),
2124            ghost_rec: inner.ghost_rec.len(),
2125            haunted: inner.haunted.len(),
2126            p: inner.p,
2127        }
2128    }
2129
2130    // to_snapshot
2131}
2132
2133impl<
2134        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
2135        V: Clone + Debug + Sync + Send + 'static,
2136        S: ARCacheReadStat + Clone,
2137    > ARCacheReadTxn<'_, K, V, S>
2138{
2139    /// Attempt to retrieve a k-v pair from the cache. If it is present in the main cache OR
2140    /// the thread local cache, a `Some` is returned, else you will receive a `None`. On a
2141    /// `None`, you must then consult the external data source that this structure is acting
2142    /// as a cache for.
2143    pub fn get<Q>(&mut self, k: &Q) -> Option<&V>
2144    where
2145        K: Borrow<Q>,
2146        Q: Hash + Eq + Ord + ?Sized,
2147    {
2148        let k_hash: u64 = self.cache.prehash(k);
2149
2150        self.stats.cache_read();
2151        // self.ops += 1;
2152
2153        let mut hits = false;
2154        let mut tlocal_hits = false;
2155
2156        let r: Option<&V> = self
2157            .tlocal
2158            .as_ref()
2159            .and_then(|cache| {
2160                cache.set.get(k).map(|v| {
2161                    // Indicate a hit on the tlocal cache.
2162                    tlocal_hits = true;
2163
2164                    if self.above_watermark {
2165                        let _ = self.hit_queue.push(CacheHitEvent {
2166                            t: Instant::now(),
2167                            k_hash,
2168                        });
2169                    }
2170                    unsafe {
2171                        let v = &v.as_ref().v as *const _;
2172                        // This discards the lifetime and repins it to the lifetime of `self`.
2173                        &(*v)
2174                    }
2175                })
2176            })
2177            .or_else(|| {
2178                self.cache.get_prehashed(k, k_hash).and_then(|v| {
2179                    (*v).to_vref().map(|vin| {
2180                        // Indicate a hit on the main cache.
2181                        hits = true;
2182
2183                        if self.above_watermark {
2184                            let _ = self.hit_queue.push(CacheHitEvent {
2185                                t: Instant::now(),
2186                                k_hash,
2187                            });
2188                        }
2189
2190                        unsafe {
2191                            let vin = vin as *const _;
2192                            &(*vin)
2193                        }
2194                    })
2195                })
2196            });
2197
2198        if tlocal_hits {
2199            self.stats.cache_local_hit()
2200        } else if hits {
2201            self.stats.cache_main_hit()
2202        };
2203
2204        r
2205    }
2206
2207    /// Determine if this cache contains the following key.
2208    pub fn contains_key<Q>(&mut self, k: &Q) -> bool
2209    where
2210        K: Borrow<Q>,
2211        Q: Hash + Eq + Ord + ?Sized,
2212    {
2213        self.get(k).is_some()
2214    }
2215
2216    /// Insert an item to the cache, with an associated weight/size factor. See also `insert`
2217    pub fn insert_sized(&mut self, k: K, v: V, size: NonZeroUsize) {
2218        let size = size.get();
2219        // Send a copy forward through time and space.
2220        // let _ = self.tx.try_send(
2221        if self
2222            .inc_queue
2223            .push(CacheIncludeEvent {
2224                t: Instant::now(),
2225                k: k.clone(),
2226                v: v.clone(),
2227                txid: self.cache.get_txid(),
2228                size,
2229            })
2230            .is_ok()
2231        {
2232            self.stats.include();
2233        } else {
2234            self.stats.failed_include();
2235        }
2236
2237        // We have a cache, so lets update it.
2238        if let Some(ref mut cache) = self.tlocal {
2239            self.stats.local_include();
2240            while cache.tlru.len() >= cache.read_size {
2241                if let Some(owned_inner) = cache.tlru.pop_n_free() {
2242                    let existing = cache.set.remove(&owned_inner.k);
2243                    // Must have been present.
2244                    assert!(
2245                        existing.is_some(),
2246                        "Impossible state! Key was NOT present in cache!"
2247                    );
2248                } else {
2249                    // Somehow the list is empty, but we still are oversize?
2250                    debug_assert!(false);
2251                    break;
2252                }
2253            }
2254
2255            // Now add it, as we have enough space.
2256            let n = cache.tlru.append_k(ReadCacheItem {
2257                k: k.clone(),
2258                v,
2259                size,
2260            });
2261            let r = cache.set.insert(k, n);
2262            // There should never be a previous value.
2263            assert!(r.is_none());
2264        }
2265    }
2266
2267    /// Add a value to the cache. This may be because you have had a cache miss and
2268    /// now wish to include in the thread local storage.
2269    ///
2270    /// Note that is invalid to insert an item who's key already exists in this thread local cache,
2271    /// and this is asserted IE will panic if you attempt this. It is also invalid for you to insert
2272    /// a value that does not match the source-of-truth state, IE inserting a different
2273    /// value than another thread may perceive. This is a *read* thread, so you should only be adding
2274    /// values that are relevant to this read transaction and this point in time. If you do not
2275    /// heed this warning, you may alter the fabric of time and space and have some interesting
2276    /// distortions in your data over time.
2277    pub fn insert(&mut self, k: K, v: V) {
2278        self.insert_sized(k, v, unsafe { NonZeroUsize::new_unchecked(1) })
2279    }
2280
2281    /// _
2282    pub fn finish(self) -> S {
2283        let stats = self.stats.clone();
2284        drop(self);
2285
2286        stats
2287    }
2288}
2289
2290impl<
2291        K: Hash + Eq + Ord + Clone + Debug + Sync + Send + 'static,
2292        V: Clone + Debug + Sync + Send + 'static,
2293        S: ARCacheReadStat + Clone,
2294    > Drop for ARCacheReadTxn<'_, K, V, S>
2295{
2296    fn drop(&mut self) {
2297        // We could make this check the queue sizes rather than blindly quiescing
2298        if self.reader_quiesce {
2299            self.caller.try_quiesce();
2300        }
2301    }
2302}
2303
2304#[cfg(test)]
2305mod tests {
2306    use super::stats::{TraceStat, WriteCountStat};
2307    use super::ARCache;
2308    use super::ARCacheBuilder;
2309    use super::CStat;
2310    use super::CacheState;
2311    use std::num::NonZeroUsize;
2312    use std::sync::Arc;
2313    use std::thread;
2314
2315    use std::sync::atomic::{AtomicBool, Ordering};
2316
2317    #[test]
2318    fn test_cache_arc_basic() {
2319        let arc: ARCache<usize, usize> = ARCacheBuilder::new()
2320            .set_size(4, 4)
2321            .build()
2322            .expect("Invalid cache parameters!");
2323        let mut wr_txn = arc.write();
2324
2325        assert!(wr_txn.get(&1).is_none());
2326        assert!(wr_txn.peek_hit().is_empty());
2327        wr_txn.insert(1, 1);
2328        assert!(wr_txn.get(&1) == Some(&1));
2329        assert!(wr_txn.peek_hit().len() == 1);
2330
2331        wr_txn.commit();
2332
2333        // Now we start the second txn, and see if it's in there.
2334        let mut wr_txn = arc.write();
2335        assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
2336        assert!(wr_txn.get(&1) == Some(&1));
2337        assert!(wr_txn.peek_hit().len() == 1);
2338        wr_txn.commit();
2339        // And now check it's moved to Freq due to the extra
2340        let wr_txn = arc.write();
2341        assert!(wr_txn.peek_cache(&1) == CacheState::Freq);
2342        println!("{:?}", wr_txn.peek_stat());
2343    }
2344
2345    #[test]
2346    fn test_cache_evict() {
2347        let _ = tracing_subscriber::fmt::try_init();
2348        println!("== 1");
2349        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2350            .set_size(4, 4)
2351            .build()
2352            .expect("Invalid cache parameters!");
2353        let stats = TraceStat {};
2354
2355        let mut wr_txn = arc.write_stats(stats);
2356        assert!(
2357            CStat {
2358                max: 4,
2359                cache: 0,
2360                tlocal: 0,
2361                freq: 0,
2362                rec: 0,
2363                ghost_freq: 0,
2364                ghost_rec: 0,
2365                haunted: 0,
2366                p: 0
2367            } == wr_txn.peek_stat()
2368        );
2369
2370        // In the first txn we insert 4 items.
2371        wr_txn.insert(1, 1);
2372        wr_txn.insert(2, 2);
2373        wr_txn.insert(3, 3);
2374        wr_txn.insert(4, 4);
2375
2376        assert!(
2377            CStat {
2378                max: 4,
2379                cache: 0,
2380                tlocal: 4,
2381                freq: 0,
2382                rec: 0,
2383                ghost_freq: 0,
2384                ghost_rec: 0,
2385                haunted: 0,
2386                p: 0
2387            } == wr_txn.peek_stat()
2388        );
2389        let stats = wr_txn.commit();
2390
2391        // Now we start the second txn, and check the stats.
2392        println!("== 2");
2393        let mut wr_txn = arc.write_stats(stats);
2394        assert!(
2395            CStat {
2396                max: 4,
2397                cache: 4,
2398                tlocal: 0,
2399                freq: 0,
2400                rec: 4,
2401                ghost_freq: 0,
2402                ghost_rec: 0,
2403                haunted: 0,
2404                p: 0
2405            } == wr_txn.peek_stat()
2406        );
2407
2408        // Now touch two items, this promote to the freq set.
2409        // Remember, a double hit doesn't weight any more than 1 hit.
2410        assert!(wr_txn.get(&1) == Some(&1));
2411        assert!(wr_txn.get(&1) == Some(&1));
2412        assert!(wr_txn.get(&2) == Some(&2));
2413
2414        let stats = wr_txn.commit();
2415
2416        // Now we start the third txn, and check the stats.
2417        println!("== 3");
2418        let mut wr_txn = arc.write_stats(stats);
2419        assert!(
2420            CStat {
2421                max: 4,
2422                cache: 4,
2423                tlocal: 0,
2424                freq: 2,
2425                rec: 2,
2426                ghost_freq: 0,
2427                ghost_rec: 0,
2428                haunted: 0,
2429                p: 0
2430            } == wr_txn.peek_stat()
2431        );
2432        // Add one more item, this will trigger an evict.
2433        wr_txn.insert(5, 5);
2434        let stats = wr_txn.commit();
2435
2436        // Now we start the fourth txn, and check the stats.
2437        println!("== 4");
2438        let mut wr_txn = arc.write_stats(stats);
2439        println!("stat -> {:?}", wr_txn.peek_stat());
2440        assert!(
2441            CStat {
2442                max: 4,
2443                cache: 5,
2444                tlocal: 0,
2445                freq: 2,
2446                rec: 2,
2447                ghost_freq: 0,
2448                ghost_rec: 1,
2449                haunted: 0,
2450                p: 0
2451            } == wr_txn.peek_stat()
2452        );
2453        // And assert what's in the sets to be sure of what went where.
2454        // 🚨 Can no longer peek these with hashmap backing as the keys may
2455        // be evicted out-of-order, but the stats are correct!
2456
2457        // Now touch the two recent items to bring them also to freq
2458
2459        let rec_set: Vec<usize> = wr_txn.iter_rec().take(2).copied().collect();
2460        assert!(wr_txn.get(&rec_set[0]) == Some(&rec_set[0]));
2461        assert!(wr_txn.get(&rec_set[1]) == Some(&rec_set[1]));
2462
2463        let stats = wr_txn.commit();
2464
2465        // Now we start the fifth txn, and check the stats.
2466        println!("== 5");
2467        let mut wr_txn = arc.write_stats(stats);
2468        println!("stat -> {:?}", wr_txn.peek_stat());
2469        assert!(
2470            CStat {
2471                max: 4,
2472                cache: 5,
2473                tlocal: 0,
2474                freq: 4,
2475                rec: 0,
2476                ghost_freq: 0,
2477                ghost_rec: 1,
2478                haunted: 0,
2479                p: 0
2480            } == wr_txn.peek_stat()
2481        );
2482        // And assert what's in the sets to be sure of what went where.
2483        // 🚨 Can no longer peek these with hashmap backing as the keys may
2484        // be evicted out-of-order, but the stats are correct!
2485
2486        // Now touch the one item that's in ghost rec - this will trigger
2487        // an evict from ghost freq
2488        let grec: usize = wr_txn.iter_ghost_rec().take(1).copied().next().unwrap();
2489        wr_txn.insert(grec, grec);
2490        assert!(wr_txn.get(&grec) == Some(&grec));
2491        // When we add 3, we are basically issuing a demand that the rec set should be
2492        // allowed to grow as we had a potential cache miss here.
2493        let stats = wr_txn.commit();
2494
2495        // Now we start the sixth txn, and check the stats.
2496        println!("== 6");
2497        let mut wr_txn = arc.write_stats(stats);
2498        println!("stat -> {:?}", wr_txn.peek_stat());
2499        assert!(
2500            CStat {
2501                max: 4,
2502                cache: 5,
2503                tlocal: 0,
2504                freq: 3,
2505                rec: 1,
2506                ghost_freq: 1,
2507                ghost_rec: 0,
2508                haunted: 0,
2509                p: 1
2510            } == wr_txn.peek_stat()
2511        );
2512        // And assert what's in the sets to be sure of what went where.
2513        // 🚨 Can no longer peek these with hashmap backing as the keys may
2514        // be evicted out-of-order, but the stats are correct!
2515        assert!(wr_txn.peek_cache(&grec) == CacheState::Rec);
2516
2517        // Right, seventh txn - we show how a cache scan doesn't cause p shifting or evict.
2518        // tl;dr - attempt to include a bunch in a scan, and it will be ignored as p is low,
2519        // and any miss on rec won't shift p unless it's in the ghost rec.
2520        wr_txn.insert(10, 10);
2521        wr_txn.insert(11, 11);
2522        wr_txn.insert(12, 12);
2523        let stats = wr_txn.commit();
2524
2525        println!("== 7");
2526        let mut wr_txn = arc.write_stats(stats);
2527        println!("stat -> {:?}", wr_txn.peek_stat());
2528        assert!(
2529            CStat {
2530                max: 4,
2531                cache: 8,
2532                tlocal: 0,
2533                freq: 3,
2534                rec: 1,
2535                ghost_freq: 1,
2536                ghost_rec: 3,
2537                haunted: 0,
2538                p: 1
2539            } == wr_txn.peek_stat()
2540        );
2541        // 🚨 Can no longer peek these with hashmap backing as the keys may
2542        // be evicted out-of-order, but the stats are correct!
2543
2544        // Eight txn - now that we had a demand for items before, we re-demand them - this will trigger
2545        // a shift in p, causing some more to be in the rec cache.
2546        let grec_set: Vec<usize> = wr_txn.iter_ghost_rec().take(3).copied().collect();
2547        println!("{:?}", grec_set);
2548
2549        grec_set
2550            .iter()
2551            .for_each(|i| println!("{:?}", wr_txn.peek_cache(i)));
2552
2553        grec_set.iter().for_each(|i| wr_txn.insert(*i, *i));
2554
2555        grec_set
2556            .iter()
2557            .for_each(|i| println!("{:?}", wr_txn.peek_cache(i)));
2558        wr_txn.commit();
2559
2560        println!("== 8");
2561        let mut wr_txn = arc.write();
2562        println!("stat -> {:?}", wr_txn.peek_stat());
2563        assert!(
2564            CStat {
2565                max: 4,
2566                cache: 8,
2567                tlocal: 0,
2568                freq: 0,
2569                rec: 4,
2570                ghost_freq: 4,
2571                ghost_rec: 0,
2572                haunted: 0,
2573                p: 4
2574            } == wr_txn.peek_stat()
2575        );
2576
2577        grec_set
2578            .iter()
2579            .for_each(|i| println!("{:?}", wr_txn.peek_cache(i)));
2580        grec_set
2581            .iter()
2582            .for_each(|i| assert!(wr_txn.peek_cache(i) == CacheState::Rec));
2583
2584        // Now lets go back the other way - we want freq items to come back.
2585        let gfreq_set: Vec<usize> = wr_txn.iter_ghost_freq().take(4).copied().collect();
2586
2587        gfreq_set.iter().for_each(|i| wr_txn.insert(*i, *i));
2588        wr_txn.commit();
2589
2590        println!("== 9");
2591        let wr_txn = arc.write();
2592        println!("stat -> {:?}", wr_txn.peek_stat());
2593        assert!(
2594            CStat {
2595                max: 4,
2596                cache: 8,
2597                tlocal: 0,
2598                freq: 4,
2599                rec: 0,
2600                ghost_freq: 0,
2601                ghost_rec: 4,
2602                haunted: 0,
2603                p: 0
2604            } == wr_txn.peek_stat()
2605        );
2606        // 🚨 Can no longer peek these with hashmap backing as the keys may
2607        // be evicted out-of-order, but the stats are correct!
2608        gfreq_set
2609            .iter()
2610            .for_each(|i| assert!(wr_txn.peek_cache(i) == CacheState::Freq));
2611
2612        // And done!
2613        let () = wr_txn.commit();
2614        // See what stats did
2615        // let stats = arc.view_stats();
2616        // println!("{:?}", *stats);
2617    }
2618
2619    #[test]
2620    fn test_cache_concurrent_basic() {
2621        // Now we want to check some basic interactions of read and write together.
2622        // Setup the cache.
2623        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2624            .set_size(4, 4)
2625            .build()
2626            .expect("Invalid cache parameters!");
2627        // start a rd
2628        {
2629            let mut rd_txn = arc.read();
2630            // add items to the rd
2631            rd_txn.insert(1, 1);
2632            rd_txn.insert(2, 2);
2633            rd_txn.insert(3, 3);
2634            rd_txn.insert(4, 4);
2635            // Should be in the tlocal
2636            // assert!(rd_txn.get(&1).is_some());
2637            // assert!(rd_txn.get(&2).is_some());
2638            // assert!(rd_txn.get(&3).is_some());
2639            // assert!(rd_txn.get(&4).is_some());
2640            // end the rd
2641        }
2642        arc.try_quiesce();
2643        // What state is the cache now in?
2644        println!("== 2");
2645        let wr_txn = arc.write();
2646        println!("{:?}", wr_txn.peek_stat());
2647        assert!(
2648            CStat {
2649                max: 4,
2650                cache: 4,
2651                tlocal: 0,
2652                freq: 0,
2653                rec: 4,
2654                ghost_freq: 0,
2655                ghost_rec: 0,
2656                haunted: 0,
2657                p: 0
2658            } == wr_txn.peek_stat()
2659        );
2660        assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
2661        assert!(wr_txn.peek_cache(&2) == CacheState::Rec);
2662        assert!(wr_txn.peek_cache(&3) == CacheState::Rec);
2663        assert!(wr_txn.peek_cache(&4) == CacheState::Rec);
2664        // Magic! Without a single write op we included items!
2665        // Lets have the read touch two items, and then add two new.
2666        // This will trigger evict on 1/2
2667        {
2668            let mut rd_txn = arc.read();
2669            // add items to the rd
2670            assert!(rd_txn.get(&3) == Some(&3));
2671            assert!(rd_txn.get(&4) == Some(&4));
2672            rd_txn.insert(5, 5);
2673            rd_txn.insert(6, 6);
2674            // end the rd
2675        }
2676        // Now commit and check the state.
2677        wr_txn.commit();
2678        println!("== 3");
2679        let wr_txn = arc.write();
2680        assert!(
2681            CStat {
2682                max: 4,
2683                cache: 6,
2684                tlocal: 0,
2685                freq: 2,
2686                rec: 2,
2687                ghost_freq: 0,
2688                ghost_rec: 2,
2689                haunted: 0,
2690                p: 0
2691            } == wr_txn.peek_stat()
2692        );
2693        assert!(wr_txn.peek_cache(&1) == CacheState::GhostRec);
2694        assert!(wr_txn.peek_cache(&2) == CacheState::GhostRec);
2695        assert!(wr_txn.peek_cache(&3) == CacheState::Freq);
2696        assert!(wr_txn.peek_cache(&4) == CacheState::Freq);
2697        assert!(wr_txn.peek_cache(&5) == CacheState::Rec);
2698        assert!(wr_txn.peek_cache(&6) == CacheState::Rec);
2699
2700        // Now trigger hits on 1/2 which will cause a shift in P.
2701        {
2702            let mut rd_txn = arc.read();
2703            // add items to the rd
2704            rd_txn.insert(1, 1);
2705            rd_txn.insert(2, 2);
2706            // end the rd
2707        }
2708
2709        wr_txn.commit();
2710        println!("== 4");
2711        let wr_txn = arc.write();
2712        assert!(
2713            CStat {
2714                max: 4,
2715                cache: 6,
2716                tlocal: 0,
2717                freq: 2,
2718                rec: 2,
2719                ghost_freq: 0,
2720                ghost_rec: 2,
2721                haunted: 0,
2722                p: 2
2723            } == wr_txn.peek_stat()
2724        );
2725        assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
2726        assert!(wr_txn.peek_cache(&2) == CacheState::Rec);
2727        assert!(wr_txn.peek_cache(&3) == CacheState::Freq);
2728        assert!(wr_txn.peek_cache(&4) == CacheState::Freq);
2729        assert!(wr_txn.peek_cache(&5) == CacheState::GhostRec);
2730        assert!(wr_txn.peek_cache(&6) == CacheState::GhostRec);
2731        // See what stats did
2732        // let stats = arc.view_stats();
2733        // println!("stats 1: {:?}", *stats);
2734        // assert!(stats.reader_hits == 2);
2735        // assert!(stats.reader_includes == 8);
2736        // assert!(stats.reader_tlocal_includes == 8);
2737        // assert!(stats.reader_tlocal_hits == 0);
2738    }
2739
2740    // Test edge cases that are horrifying and could destroy peoples lives
2741    // and sanity.
2742    #[test]
2743    fn test_cache_concurrent_cursed_1() {
2744        // Case 1 - It's possible for a read transaction to last for a long time,
2745        // and then have a cache include, which may cause an attempt to include
2746        // an outdated value into the cache. To handle this the haunted set exists
2747        // so that all keys and their eviction ids are always tracked for all of time
2748        // to ensure that we never incorrectly include a value that may have been updated
2749        // more recently.
2750        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2751            .set_size(4, 4)
2752            .build()
2753            .expect("Invalid cache parameters!");
2754
2755        // Start a wr
2756        let mut wr_txn = arc.write();
2757        // Start a rd
2758        let mut rd_txn = arc.read();
2759        // Add the value 1,1 via the wr.
2760        wr_txn.insert(1, 1);
2761
2762        // assert 1 is not in rd.
2763        assert!(rd_txn.get(&1).is_none());
2764
2765        // Commit wr
2766        wr_txn.commit();
2767        // Even after the commit, it's not in rd.
2768        assert!(rd_txn.get(&1).is_none());
2769        // begin wr
2770        let mut wr_txn = arc.write();
2771        // We now need to flood the cache, to cause ghost rec eviction.
2772        wr_txn.insert(10, 1);
2773        wr_txn.insert(11, 1);
2774        wr_txn.insert(12, 1);
2775        wr_txn.insert(13, 1);
2776        wr_txn.insert(14, 1);
2777        wr_txn.insert(15, 1);
2778        wr_txn.insert(16, 1);
2779        wr_txn.insert(17, 1);
2780        // commit wr
2781        wr_txn.commit();
2782
2783        // begin wr
2784        let wr_txn = arc.write();
2785        // assert that 1 is haunted.
2786        assert!(wr_txn.peek_cache(&1) == CacheState::Haunted);
2787        // assert 1 is not in rd.
2788        assert!(rd_txn.get(&1).is_none());
2789        // now that 1 is hanuted, in rd attempt to insert 1, X
2790        rd_txn.insert(1, 100);
2791        // commit wr
2792        wr_txn.commit();
2793
2794        // start wr
2795        let wr_txn = arc.write();
2796        // assert that 1 is still haunted.
2797        assert!(wr_txn.peek_cache(&1) == CacheState::Haunted);
2798        // assert that 1, x is in rd.
2799        assert!(rd_txn.get(&1) == Some(&100));
2800        // done!
2801    }
2802
2803    #[test]
2804    fn test_cache_clear() {
2805        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2806            .set_size(4, 4)
2807            .build()
2808            .expect("Invalid cache parameters!");
2809
2810        // Start a wr
2811        let mut wr_txn = arc.write();
2812        // Add a bunch of values, and touch some twice.
2813        wr_txn.insert(10, 10);
2814        wr_txn.insert(11, 11);
2815        wr_txn.insert(12, 12);
2816        wr_txn.insert(13, 13);
2817        wr_txn.insert(14, 14);
2818        wr_txn.insert(15, 15);
2819        wr_txn.insert(16, 16);
2820        wr_txn.insert(17, 17);
2821        wr_txn.commit();
2822        // Begin a new write.
2823        let mut wr_txn = arc.write();
2824
2825        // Touch two values that are in the rec set.
2826        let rec_set: Vec<usize> = wr_txn.iter_rec().take(2).copied().collect();
2827        println!("{:?}", rec_set);
2828        assert!(wr_txn.get(&rec_set[0]) == Some(&rec_set[0]));
2829        assert!(wr_txn.get(&rec_set[1]) == Some(&rec_set[1]));
2830
2831        // commit wr
2832        wr_txn.commit();
2833        // Begin a new write.
2834        let mut wr_txn = arc.write();
2835        println!("stat -> {:?}", wr_txn.peek_stat());
2836        assert!(
2837            CStat {
2838                max: 4,
2839                cache: 8,
2840                tlocal: 0,
2841                freq: 2,
2842                rec: 2,
2843                ghost_freq: 0,
2844                ghost_rec: 4,
2845                haunted: 0,
2846                p: 0
2847            } == wr_txn.peek_stat()
2848        );
2849
2850        // Clear
2851        wr_txn.clear();
2852        // Now commit
2853        wr_txn.commit();
2854        // Now check their states.
2855        let wr_txn = arc.write();
2856        // See what stats did
2857        println!("stat -> {:?}", wr_txn.peek_stat());
2858        // stat -> CStat { max: 4, cache: 8, tlocal: 0, freq: 0, rec: 0, ghost_freq: 2, ghost_rec: 6, haunted: 0, p: 0 }
2859        assert!(
2860            CStat {
2861                max: 4,
2862                cache: 8,
2863                tlocal: 0,
2864                freq: 0,
2865                rec: 0,
2866                ghost_freq: 2,
2867                ghost_rec: 6,
2868                haunted: 0,
2869                p: 0
2870            } == wr_txn.peek_stat()
2871        );
2872        // let stats = arc.view_stats();
2873        // println!("{:?}", *stats);
2874    }
2875
2876    #[test]
2877    fn test_cache_clear_rollback() {
2878        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2879            .set_size(4, 4)
2880            .build()
2881            .expect("Invalid cache parameters!");
2882
2883        // Start a wr
2884        let mut wr_txn = arc.write();
2885        // Add a bunch of values, and touch some twice.
2886        wr_txn.insert(10, 10);
2887        wr_txn.insert(11, 11);
2888        wr_txn.insert(12, 12);
2889        wr_txn.insert(13, 13);
2890        wr_txn.insert(14, 14);
2891        wr_txn.insert(15, 15);
2892        wr_txn.insert(16, 16);
2893        wr_txn.insert(17, 17);
2894        wr_txn.commit();
2895        // Begin a new write.
2896        let mut wr_txn = arc.write();
2897        let rec_set: Vec<usize> = wr_txn.iter_rec().take(2).copied().collect();
2898        println!("{:?}", rec_set);
2899        let r = wr_txn.get(&rec_set[0]);
2900        println!("{:?}", r);
2901        assert!(r == Some(&rec_set[0]));
2902        assert!(wr_txn.get(&rec_set[1]) == Some(&rec_set[1]));
2903
2904        // commit wr
2905        wr_txn.commit();
2906        // Begin a new write.
2907        let mut wr_txn = arc.write();
2908        println!("stat -> {:?}", wr_txn.peek_stat());
2909        assert!(
2910            CStat {
2911                max: 4,
2912                cache: 8,
2913                tlocal: 0,
2914                freq: 2,
2915                rec: 2,
2916                ghost_freq: 0,
2917                ghost_rec: 4,
2918                haunted: 0,
2919                p: 0
2920            } == wr_txn.peek_stat()
2921        );
2922
2923        // Clear
2924        wr_txn.clear();
2925        // Now abort the clear - should do nothing!
2926        drop(wr_txn);
2927        // Check the states, should not have changed
2928        let wr_txn = arc.write();
2929        println!("stat -> {:?}", wr_txn.peek_stat());
2930        assert!(
2931            CStat {
2932                max: 4,
2933                cache: 8,
2934                tlocal: 0,
2935                freq: 2,
2936                rec: 2,
2937                ghost_freq: 0,
2938                ghost_rec: 4,
2939                haunted: 0,
2940                p: 0
2941            } == wr_txn.peek_stat()
2942        );
2943    }
2944
2945    #[test]
2946    fn test_cache_clear_cursed() {
2947        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
2948            .set_size(4, 4)
2949            .build()
2950            .expect("Invalid cache parameters!");
2951        // Setup for the test
2952        // --
2953        let mut wr_txn = arc.write();
2954        wr_txn.insert(10, 1);
2955        wr_txn.commit();
2956        // --
2957        let wr_txn = arc.write();
2958        assert!(wr_txn.peek_cache(&10) == CacheState::Rec);
2959        wr_txn.commit();
2960        // --
2961        // Okay, now the test starts. First, we begin a read
2962        let mut rd_txn = arc.read();
2963        // Then while that read exists, we open a write, and conduct
2964        // a cache clear.
2965        let mut wr_txn = arc.write();
2966        wr_txn.clear();
2967        // Commit the clear write.
2968        wr_txn.commit();
2969
2970        // Now on the read, we perform a touch of an item, and we include
2971        // something that was not yet in the cache.
2972        assert!(rd_txn.get(&10) == Some(&1));
2973        rd_txn.insert(11, 1);
2974        // Complete the read
2975        std::mem::drop(rd_txn);
2976        // Perform a cache quiesce
2977        arc.try_quiesce();
2978        // --
2979
2980        // Assert that the items that we provided were NOT included, and are
2981        // in the correct states.
2982        let wr_txn = arc.write();
2983        assert!(wr_txn.peek_cache(&10) == CacheState::GhostRec);
2984        println!("--> {:?}", wr_txn.peek_cache(&11));
2985        assert!(wr_txn.peek_cache(&11) == CacheState::None);
2986    }
2987
2988    #[test]
2989    fn test_cache_p_weight_zero_churn() {
2990        let arc: ARCache<usize, usize> = ARCacheBuilder::new()
2991            .set_size(4, 4)
2992            .set_watermark(0)
2993            .build()
2994            .expect("Invalid cache parameters!");
2995
2996        let mut wr_txn = arc.write();
2997
2998        // First we need to load up frequent.
2999        wr_txn.insert(1, 1);
3000        wr_txn.insert(2, 2);
3001        wr_txn.insert(3, 3);
3002        wr_txn.insert(4, 4);
3003        assert_eq!(wr_txn.get(&1), Some(&1));
3004        assert_eq!(wr_txn.get(&2), Some(&2));
3005        assert_eq!(wr_txn.get(&3), Some(&3));
3006        assert_eq!(wr_txn.get(&4), Some(&4));
3007
3008        assert_eq!(wr_txn.peek_stat().p, 0);
3009        wr_txn.commit();
3010
3011        // Hitting again in a new txn moves to freq
3012        let mut wr_txn = arc.write();
3013        assert_eq!(wr_txn.get(&1), Some(&1));
3014        assert_eq!(wr_txn.get(&2), Some(&2));
3015        assert_eq!(wr_txn.get(&3), Some(&3));
3016        assert_eq!(wr_txn.get(&4), Some(&4));
3017
3018        assert_eq!(wr_txn.peek_stat().p, 0);
3019        wr_txn.commit();
3020
3021        // Now include new items. The goal is we want to shift p to at least 1.
3022        let mut wr_txn = arc.write();
3023        // Won't fit, moves to ghost recent
3024        wr_txn.insert(100, 100);
3025        println!("b {:?}", wr_txn.peek_stat());
3026        assert_eq!(wr_txn.peek_stat().p, 0);
3027        wr_txn.commit();
3028
3029        // Include again, causes evict in freq.
3030        let mut wr_txn = arc.write();
3031        assert_eq!(wr_txn.peek_stat().p, 0);
3032        assert_eq!(wr_txn.peek_stat().ghost_rec, 1);
3033
3034        // Causes shift in P, stays in recent.
3035        wr_txn.insert(100, 100);
3036        wr_txn.commit();
3037
3038        let wr_txn = arc.write();
3039        println!("c {:?}", wr_txn.peek_stat());
3040        assert_eq!(wr_txn.peek_stat().p, 1);
3041        assert_eq!(wr_txn.peek_stat().ghost_rec, 0);
3042        assert_eq!(wr_txn.peek_stat().ghost_freq, 1);
3043        wr_txn.commit();
3044
3045        // Now, we want to bring back from ghost freq.
3046        let mut wr_txn = arc.write();
3047        assert_eq!(wr_txn.get(&1), None);
3048        // We missed, so re-include.
3049        wr_txn.insert(1, 1);
3050        wr_txn.commit();
3051
3052        let wr_txn = arc.write();
3053        println!("d {:?}", wr_txn.peek_stat());
3054        // Weights P back to 0.
3055        assert_eq!(wr_txn.peek_stat().p, 0);
3056        assert_eq!(wr_txn.peek_stat().ghost_rec, 1);
3057        assert_eq!(wr_txn.peek_stat().ghost_freq, 0);
3058        wr_txn.commit();
3059    }
3060
3061    #[test]
3062    fn test_cache_haunted_bounds() {
3063        let arc: ARCache<usize, usize> = ARCacheBuilder::new()
3064            .set_size(4, 4)
3065            .set_watermark(0)
3066            .set_look_back_limit(4)
3067            .build()
3068            .expect("Invalid cache parameters!");
3069
3070        let mut wr_txn = arc.write();
3071
3072        // We want to test that haunted items are removed after 4 generations.
3073
3074        // Setup a removed key which immediately goes to haunted.
3075        wr_txn.remove(1);
3076        wr_txn.commit();
3077
3078        // Now write and commit 4 times to advance the txid.
3079        for _i in 0..5 {
3080            let wr_txn = arc.write();
3081            println!("l {:?}", wr_txn.peek_stat());
3082            assert_eq!(wr_txn.peek_stat().haunted, 1);
3083            assert!(wr_txn.peek_cache(&1) == CacheState::Haunted);
3084            wr_txn.commit();
3085        }
3086
3087        // Now it's removed.
3088        let wr_txn = arc.write();
3089        println!("d {:?}", wr_txn.peek_stat());
3090        assert_eq!(wr_txn.peek_stat().haunted, 0);
3091        assert_eq!(wr_txn.peek_cache(&1), CacheState::None);
3092        wr_txn.commit();
3093    }
3094
3095    #[test]
3096    fn test_cache_dirty_write() {
3097        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
3098            .set_size(4, 4)
3099            .build()
3100            .expect("Invalid cache parameters!");
3101        let mut wr_txn = arc.write();
3102        wr_txn.insert_dirty(10, 1);
3103        wr_txn.iter_mut_mark_clean().for_each(|(_k, _v)| {});
3104        wr_txn.commit();
3105    }
3106
3107    #[test]
3108    fn test_cache_read_no_tlocal() {
3109        // Check a cache with no read local thread capacity
3110        // Setup the cache.
3111        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
3112            .set_size(4, 0)
3113            .build()
3114            .expect("Invalid cache parameters!");
3115        // start a rd
3116        {
3117            let mut rd_txn = arc.read();
3118            // add items to the rd
3119            rd_txn.insert(1, 1);
3120            rd_txn.insert(2, 2);
3121            rd_txn.insert(3, 3);
3122            rd_txn.insert(4, 4);
3123            // end the rd
3124            // Everything should be missing frm the tlocal.
3125            assert!(rd_txn.get(&1).is_none());
3126            assert!(rd_txn.get(&2).is_none());
3127            assert!(rd_txn.get(&3).is_none());
3128            assert!(rd_txn.get(&4).is_none());
3129        }
3130        arc.try_quiesce();
3131        // What state is the cache now in?
3132        println!("== 2");
3133        let wr_txn = arc.write();
3134        assert!(
3135            CStat {
3136                max: 4,
3137                cache: 4,
3138                tlocal: 0,
3139                freq: 0,
3140                rec: 4,
3141                ghost_freq: 0,
3142                ghost_rec: 0,
3143                haunted: 0,
3144                p: 0
3145            } == wr_txn.peek_stat()
3146        );
3147        assert!(wr_txn.peek_cache(&1) == CacheState::Rec);
3148        assert!(wr_txn.peek_cache(&2) == CacheState::Rec);
3149        assert!(wr_txn.peek_cache(&3) == CacheState::Rec);
3150        assert!(wr_txn.peek_cache(&4) == CacheState::Rec);
3151        // let stats = arc.view_stats();
3152        // println!("stats 1: {:?}", *stats);
3153        // assert!(stats.reader_includes == 4);
3154        // assert!(stats.reader_tlocal_includes == 0);
3155        // assert!(stats.reader_tlocal_hits == 0);
3156    }
3157
3158    #[derive(Clone, Debug)]
3159    struct Weighted {
3160        _i: u64,
3161    }
3162
3163    #[test]
3164    fn test_cache_weighted() {
3165        let arc: ARCache<usize, Weighted> = ARCacheBuilder::default()
3166            .set_size(4, 0)
3167            .build()
3168            .expect("Invalid cache parameters!");
3169        let mut wr_txn = arc.write();
3170
3171        assert!(
3172            CStat {
3173                max: 4,
3174                cache: 0,
3175                tlocal: 0,
3176                freq: 0,
3177                rec: 0,
3178                ghost_freq: 0,
3179                ghost_rec: 0,
3180                haunted: 0,
3181                p: 0
3182            } == wr_txn.peek_stat()
3183        );
3184
3185        // In the first txn we insert 2 weight 2 items.
3186        wr_txn.insert_sized(1, Weighted { _i: 1 }, NonZeroUsize::new(2).unwrap());
3187        wr_txn.insert_sized(2, Weighted { _i: 2 }, NonZeroUsize::new(2).unwrap());
3188
3189        assert!(
3190            CStat {
3191                max: 4,
3192                cache: 0,
3193                tlocal: 2,
3194                freq: 0,
3195                rec: 0,
3196                ghost_freq: 0,
3197                ghost_rec: 0,
3198                haunted: 0,
3199                p: 0
3200            } == wr_txn.peek_stat()
3201        );
3202        wr_txn.commit();
3203
3204        // Now once committed, the proper sizes kick in.
3205
3206        let wr_txn = arc.write();
3207        assert!(
3208            CStat {
3209                max: 4,
3210                cache: 2,
3211                tlocal: 0,
3212                freq: 0,
3213                rec: 4,
3214                ghost_freq: 0,
3215                ghost_rec: 0,
3216                haunted: 0,
3217                p: 0
3218            } == wr_txn.peek_stat()
3219        );
3220        wr_txn.commit();
3221
3222        // Check the numbers move properly.
3223        let mut wr_txn = arc.write();
3224        wr_txn.get(&1);
3225        wr_txn.commit();
3226
3227        let mut wr_txn = arc.write();
3228        assert!(
3229            CStat {
3230                max: 4,
3231                cache: 2,
3232                tlocal: 0,
3233                freq: 2,
3234                rec: 2,
3235                ghost_freq: 0,
3236                ghost_rec: 0,
3237                haunted: 0,
3238                p: 0
3239            } == wr_txn.peek_stat()
3240        );
3241
3242        wr_txn.insert_sized(3, Weighted { _i: 3 }, NonZeroUsize::new(2).unwrap());
3243        wr_txn.insert_sized(4, Weighted { _i: 4 }, NonZeroUsize::new(2).unwrap());
3244        wr_txn.commit();
3245
3246        // Check the evicts
3247        let wr_txn = arc.write();
3248        assert!(
3249            CStat {
3250                max: 4,
3251                cache: 4,
3252                tlocal: 0,
3253                freq: 2,
3254                rec: 2,
3255                ghost_freq: 0,
3256                ghost_rec: 4,
3257                haunted: 0,
3258                p: 0
3259            } == wr_txn.peek_stat()
3260        );
3261        wr_txn.commit();
3262    }
3263
3264    #[test]
3265    fn test_cache_stats_reload() {
3266        let _ = tracing_subscriber::fmt::try_init();
3267
3268        // Make a cache
3269        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
3270            .set_size(4, 0)
3271            .build()
3272            .expect("Invalid cache parameters!");
3273
3274        let stats = WriteCountStat::default();
3275
3276        let mut wr_txn = arc.write_stats(stats);
3277        wr_txn.insert(1, 1);
3278        let stats = wr_txn.commit();
3279
3280        tracing::trace!("stats 1: {:?}", stats);
3281    }
3282
3283    #[test]
3284    fn test_cache_mut_inplace() {
3285        // Make a cache
3286        let arc: ARCache<usize, usize> = ARCacheBuilder::default()
3287            .set_size(4, 0)
3288            .build()
3289            .expect("Invalid cache parameters!");
3290        let mut wr_txn = arc.write();
3291
3292        assert!(wr_txn.get_mut(&1, false).is_none());
3293        // It was inserted, can mutate. This is the tlocal present state.
3294        wr_txn.insert(1, 1);
3295        {
3296            let mref = wr_txn.get_mut(&1, false).unwrap();
3297            *mref = 2;
3298        }
3299        assert!(wr_txn.get_mut(&1, false) == Some(&mut 2));
3300        wr_txn.commit();
3301
3302        // It's in the main cache, can mutate immediately and the tlocal is primed.
3303        let mut wr_txn = arc.write();
3304        {
3305            let mref = wr_txn.get_mut(&1, false).unwrap();
3306            *mref = 3;
3307        }
3308        assert!(wr_txn.get_mut(&1, false) == Some(&mut 3));
3309        wr_txn.commit();
3310
3311        // Marked for remove, can not mut.
3312        let mut wr_txn = arc.write();
3313        wr_txn.remove(1);
3314        assert!(wr_txn.get_mut(&1, false).is_none());
3315        wr_txn.commit();
3316    }
3317
3318    #[allow(dead_code)]
3319    pub static RUNNING: AtomicBool = AtomicBool::new(false);
3320
3321    #[allow(dead_code)]
3322    pub static READ_OPERATIONS: u32 = 1024;
3323    #[allow(dead_code)]
3324    pub static WRITE_OPERATIONS: u32 = 10240;
3325
3326    #[allow(dead_code)]
3327    pub static CACHE_SIZE: u32 = 64;
3328    pub static VALUE_MAX_RANGE: u32 = CACHE_SIZE * 8;
3329
3330    #[cfg(test)]
3331    fn multi_thread_worker(arc: Arc<ARCache<Box<u32>, Box<u32>>>) {
3332        while RUNNING.load(Ordering::Relaxed) {
3333            let mut rd_txn = arc.read();
3334
3335            use rand::Rng;
3336            let mut rng = rand::rng();
3337
3338            for _i in 0..VALUE_MAX_RANGE {
3339                let x = rng.random_range(0..VALUE_MAX_RANGE);
3340
3341                if rd_txn.get(&x).is_none() {
3342                    rd_txn.insert(Box::new(x), Box::new(x))
3343                }
3344            }
3345        }
3346    }
3347
3348    #[allow(dead_code)]
3349    #[cfg_attr(miri, ignore)]
3350    #[cfg_attr(feature = "dhat-heap", test)]
3351    fn test_cache_stress_1() {
3352        #[cfg(feature = "dhat-heap")]
3353        let _profiler = dhat::Profiler::builder().trim_backtraces(None).build();
3354
3355        use rand::Rng;
3356        let mut rng = rand::rng();
3357
3358        let arc: Arc<ARCache<Box<u32>, Box<u32>>> = Arc::new(
3359            ARCacheBuilder::default()
3360                .set_size(CACHE_SIZE as usize, 4)
3361                .build()
3362                .expect("Invalid cache parameters!"),
3363        );
3364
3365        // Do some writes ...
3366        for _i in 0..WRITE_OPERATIONS {
3367            let mut wr_txn = arc.write();
3368
3369            let x = rng.random_range(0..VALUE_MAX_RANGE);
3370
3371            if wr_txn.get(&x).is_none() {
3372                wr_txn.insert(Box::new(x), Box::new(x))
3373            }
3374
3375            // Can corrupt here no issue.
3376            wr_txn.commit();
3377        }
3378
3379        eprintln!("writes pass");
3380
3381        let thread_count = 8;
3382
3383        RUNNING.store(true, Ordering::Relaxed);
3384
3385        // Now do writes and reads concurrently
3386        let handles: Vec<_> = (0..thread_count)
3387            .map(|_| {
3388                // Build the threads.
3389                let cache = arc.clone();
3390                thread::spawn(move || multi_thread_worker(cache))
3391            })
3392            .collect();
3393
3394        std::thread::sleep(std::time::Duration::from_secs(5));
3395
3396        // Everything is fine until we write.
3397
3398        for _i in 0..WRITE_OPERATIONS {
3399            let mut wr_txn = arc.write();
3400
3401            let x = rng.random_range(0..VALUE_MAX_RANGE);
3402
3403            if wr_txn.get(&x).is_none() {
3404                wr_txn.insert(Box::new(x), Box::new(x))
3405            }
3406
3407            wr_txn.commit();
3408        }
3409
3410        RUNNING.store(false, Ordering::Relaxed);
3411
3412        for handle in handles {
3413            if let Err(err) = handle.join() {
3414                std::panic::resume_unwind(err)
3415            }
3416        }
3417    }
3418
3419    #[test]
3420    fn test_set_expected_workload_negative() {
3421        let _arc: Arc<ARCache<Box<u32>, Box<u32>>> = Arc::new(
3422            ARCacheBuilder::default()
3423                .set_expected_workload(256, 31, 8, 16, true)
3424                .build()
3425                .expect("Invalid cache parameters!"),
3426        );
3427    }
3428}