memory_cache_rust/
cache.rs

1use std::{ptr, time};
2use std::any::TypeId;
3use std::fmt::{Debug, Formatter};
4use std::hash::{BuildHasher, Hash, Hasher};
5use std::marker::PhantomData;
6use std::ops::{Add, Deref};
7use std::sync::atomic::{AtomicIsize, Ordering};
8use std::time::Duration;
9
10use seize::{Collector, Guard, Linked};
11use xxhash_rust::const_xxh3::xxh3_64 as const_xxh3;
12
13use crate::cache::ItemFlag::{ItemDelete, ItemNew, ItemUpdate};
14use crate::policy::{DefaultPolicy};
15use crate::reclaim::{Atomic, Shared};
16use crate::ring::RingBuffer;
17use crate::store::{Node, Store};
18
19/// number shared element on store
20pub const NUM_SHARDS: usize = 256;
21
22pub enum ItemFlag {
23    ItemNew,
24    ItemDelete,
25    ItemUpdate,
26}
27macro_rules! load_factor {
28    ($n: expr) => {
29        // ¾ n = n - n/4 = n - (n >> 2)
30        $n - ($n >> 2)
31    };
32}
33
34pub struct Item<V> {
35    pub flag: ItemFlag,
36    pub key: u64,
37    pub conflict: u64,
38    pub(crate) value: Atomic<V>,
39    pub cost: i64,
40    pub expiration: Option<Duration>,
41}
42
43
44/// Config is passed to NewCache for creating new Cache instances.
45pub struct Config<K, V> {
46    // NumCounters determines the number of counters (keys) to keep that hold
47    // access frequency information. It's generally a good idea to have more
48    // counters than the max cache capacity, as this will improve eviction
49    // accuracy and subsequent HIT ratios.
50    //
51    // For example, if you expect your cache to hold 1,000,000 items when full,
52    // NumCounters should be 10,000,000 (10x). Each counter takes up 4 bits, so
53    // keeping 10,000,000 counters would require 5MB of memory.
54
55    pub numb_counters: i64,
56    // max_cost can be considered as the cache capacity, in whatever units you
57    // choose to use.
58    //
59    // For example, if you want the cache to have a max capacity of 100MB, you
60    // would set MaxCost to 100,000,000 and pass an item's number of bytes as
61    // the `cost` parameter for calls to Set. If new items are accepted, the
62    // eviction process will take care of making room for the new item and not
63    // overflowing the MaxCost value.
64    pub max_cost: i64,
65
66    // buffer_items determines the size of Get buffers.
67    //
68    // Unless you have a rare use case, using `64` as the buffer_items value
69    // results in good performance.
70    pub buffer_items: usize,
71    // metrics determines whether cache statistics are kept during the cache's
72    // lifetime. There *is* some overhead to keeping statistics, so you should
73    // only set this flag to true when testing or throughput performance isn't a
74    // major factor.
75    pub metrics: bool,
76
77    pub key_to_hash: fn(&K) -> (u64, u64),
78
79    pub on_evict: Option<fn(u64, u64, V, i64)>,
80    pub cost: Option<fn(V) -> i64>,
81}
82
83impl<K, V> Default for Config<K, V> {
84    fn default() -> Self {
85        Config {
86            numb_counters: 1e7 as i64, // number of keys to track frequency of (10M).
87            max_cost: 1 << 30,// maximum cost of cache
88            buffer_items: 64,// number of keys per Get buffer.
89            metrics: false,
90            key_to_hash: |_x| { (0, 0) },
91            on_evict: None,
92            cost: None,
93        }
94    }
95}
96
97
98/// Cache is a thread-safe implementation of a hashmap with a TinyLFU admission
99/// policy and a Sampled LFU eviction policy. You can use the same Cache instance
100/// from as many goroutines as you want.
101pub struct Cache<K, V, S = crate::DefaultHashBuilder> {
102    pub(crate) store: Atomic<Store<V>>,
103    pub(crate) policy: Atomic<DefaultPolicy<V>>,
104    pub(crate) get_buf: Atomic<RingBuffer<V>>,
105    collector: Collector,
106    // key_to_hash: fn(&K) -> (u64, u64),
107
108    /// Table initialization and resizing control.  When negative, the
109    /// table is being initialized or resized: -1 for initialization,
110    /// else -(1 + the number of active resizing threads).  Otherwise,
111    /// when table is null, holds the initial table size to use upon
112    /// creation, or 0 for default. After initialization, holds the
113    /// next element count value upon which to resize the table.
114    size_ctl: AtomicIsize,
115
116    size_buf_ctl: AtomicIsize,
117    build_hasher: S,
118    pub on_evict: Option<fn(u64, u64, &V, i64)>,
119    cost: Option<fn(&V) -> i64>,
120
121    _marker: PhantomData<K>,
122
123    pub numb_counters: i64,
124    pub buffer_items: usize,
125    // max_cost can be considered as the cache capacity, in whatever units you
126    // choose to use.
127    //
128    // For example, if you want the cache to have a max capacity of 100MB, you
129    // would set MaxCost to 100,000,000 and pass an item's number of bytes as
130    // the `cost` parameter for calls to Set. If new items are accepted, the
131    // eviction process will take care of making room for the new item and not
132    // overflowing the MaxCost value.
133    pub max_cost: i64,
134
135    pub(crate) metrics: Option<Box<Metrics>>,
136
137}
138
139impl<K, V, S> Debug for Cache<K, V, S> {
140    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
141        f.debug_tuple("Cache")
142            .field(&self.numb_counters)
143            .finish()
144    }
145}
146
147impl<K, V, S> Clone for Cache<K, V, S>
148    where
149        K: Sync + Send + Clone + Hash + Ord,
150        V: Sync + Send + Clone,
151        S: BuildHasher + Clone,
152{
153    fn clone(&self) -> Cache<K, V, S> {
154        Self {
155            store: self.store.clone(),
156            policy: Atomic::from(self.policy.load(Ordering::SeqCst, &self.guard())),
157            get_buf: Atomic::from(self.get_buf.load(Ordering::SeqCst, &self.guard())),
158            collector: self.collector.clone(),
159            size_ctl: AtomicIsize::from(self.size_ctl.load(Ordering::SeqCst)),
160            size_buf_ctl: AtomicIsize::from(self.size_buf_ctl.load(Ordering::SeqCst)),
161            build_hasher: self.build_hasher.clone(),
162            on_evict: None,
163            cost: None,
164
165            _marker: Default::default(),
166
167            numb_counters: self.numb_counters,
168            buffer_items: self.buffer_items,
169            max_cost: self.max_cost,
170            metrics: self.metrics.clone(),
171
172        }
173    }
174}
175
176impl<K, V> Cache<K, V, crate::DefaultHashBuilder> {
177    pub fn new() -> Self {
178        Self::default()
179    }
180
181    pub fn with_config(c: Config<K, V>) -> Self {
182        Self::with_hasher(crate::DefaultHashBuilder::default(), c)
183    }
184}
185
186impl<K, V, S> Default for Cache<K, V, S>
187    where
188        S: Default,
189{
190    fn default() -> Self {
191        Self::with_hasher(S::default(), Default::default())
192    }
193}
194
195impl<K, V, S> Drop for Cache<K, V, S> {
196    fn drop(&mut self) {
197
198        let guard = unsafe { Guard::unprotected() };
199
200        let table = self.store.swap(Shared::null(), Ordering::SeqCst, &guard);
201        if !table.is_null() {
202            // table was never allocated!
203            let mut table = unsafe { table.into_box() };
204            table.as_mut().data.clear()
205        }
206        let table = self.policy.swap(Shared::null(), Ordering::SeqCst, &guard);
207        if !table.is_null() {
208            // table was never allocated!
209            let mut table = unsafe { table.into_box() };
210            table.evict.key_costs.clear();
211        }
212        let table = self.get_buf.swap(Shared::null(), Ordering::SeqCst, &guard);
213        if !table.is_null() {
214            // table was never allocated!
215            let _ = unsafe { table.into_box() };
216        }
217    }
218}
219
220impl<K, V, S> Cache<K, V, S>
221
222{
223    pub fn with_hasher(hash_builder: S, c: Config<K, V>) -> Self {
224        let collector = Collector::new();
225        let mut ca = Cache {
226            store: Atomic::null(),
227            policy: Atomic::null(),
228            get_buf: Atomic::null(),
229            collector: collector,
230            size_ctl: AtomicIsize::new(0),
231            size_buf_ctl: AtomicIsize::new(0),
232            build_hasher: hash_builder,
233            on_evict: None,
234            cost: None,
235            buffer_items: c.buffer_items,
236            _marker: Default::default(),
237
238            numb_counters: c.numb_counters,
239            max_cost: c.max_cost,
240            metrics: None,
241
242        };
243
244        if c.metrics {
245            ca.metrics = Some(Box::new(Metrics::new(DO_NOT_USE, &ca.collector)));
246        }
247
248
249        if let Some(m) = &ca.metrics {
250            let v: *const Metrics = &**m;
251
252            let table = Shared::boxed(DefaultPolicy::new(ca.numb_counters, ca.max_cost, v), &ca.collector);
253            ca.policy.store(table, Ordering::SeqCst);
254
255            let table = Shared::boxed(RingBuffer::new(table, ca.buffer_items), &ca.collector);
256            ca.get_buf.store(table, Ordering::SeqCst);
257        } else {
258            let table = Shared::boxed(DefaultPolicy::new(ca.numb_counters, ca.max_cost, ptr::null()), &ca.collector);
259            ca.policy.store(table, Ordering::SeqCst);
260
261            let table = Shared::boxed(RingBuffer::new(table, ca.buffer_items), &ca.collector);
262            ca.get_buf.store(table, Ordering::SeqCst);
263        }
264        ca
265    }
266
267    /// Pin a `Guard` for use with this map.
268    ///
269    /// Keep in mind that for as long as you hold onto this `Guard`, you are preventing the
270    /// collection of garbage generated by the map.
271    pub fn guard(&self) -> Guard<'_> {
272        self.collector.enter()
273    }
274
275    fn check_guard(&self, guard: &Guard<'_>) {
276        if let Some(c) = guard.collector() {
277            assert!(Collector::ptr_eq(c, &self.collector))
278        }
279    }
280
281
282    /*    fn init_metrics<'g>(&'g self, guard: &'g Guard<'_>) -> Shared<'g, Metrics> {
283            loop {
284                let table = self.metrics.load(Ordering::SeqCst, guard);
285                // safety: we loaded the table while the thread was marked as active.
286                // table won't be deallocated until the guard is dropped at the earliest.
287                if !table.is_null() {
288                    break table;
289                }
290
291                //try to allocate the table
292                let mut sc = self.size_metrics_ctl.load(Ordering::SeqCst);
293                if sc < 0 {
294                    // we lost the initialization race; just spin
295                    std::thread::yield_now();
296                    continue;
297                }
298
299                if self
300                    .size_metrics_ctl
301                    .compare_exchange(sc, -1, Ordering::SeqCst, Ordering::Relaxed)
302                    .is_ok() {
303                    // we get to do it!
304                    let mut table = self.metrics.load(Ordering::SeqCst, guard);
305
306                    // safety: we loaded the table while the thread was marked as active.
307                    // table won't be deallocated until the guard is dropped at the earliest.
308                    if table.is_null() {
309                        let n = if sc > 0 {
310                            sc as usize
311                        } else {
312                            DO_NOT_USE
313                        };
314                        table = Shared::boxed(Metrics::new(n, &self.collector), &self.collector);
315                        self.metrics.store(table, Ordering::SeqCst);
316                        sc = load_factor!(n as isize);
317                    }
318                    self.size_metrics_ctl.store(sc, Ordering::SeqCst);
319                    break table;
320                }
321            }
322        }*/
323    fn init_ringbuf<'g>(&'g self, guard: &'g Guard<'_>) -> Shared<'g, RingBuffer<V>> {
324        loop {
325            let table = self.get_buf.load(Ordering::SeqCst, guard);
326            // safety: we loaded the table while the thread was marked as active.
327            // table won't be deallocated until the guard is dropped at the earliest.
328            if !table.is_null() {
329                break table;
330            }
331
332            //try to allocate the table
333            let mut sc = self.size_buf_ctl.load(Ordering::SeqCst);
334            if sc < 0 {
335                // we lost the initialization race; just spin
336                std::thread::yield_now();
337                continue;
338            }
339
340            if self
341                .size_buf_ctl
342                .compare_exchange(sc, -1, Ordering::SeqCst, Ordering::Relaxed)
343                .is_ok() {
344                // we get to do it!
345                let mut table = self.get_buf.load(Ordering::SeqCst, guard);
346
347                // safety: we loaded the table while the thread was marked as active.
348                // table won't be deallocated until the guard is dropped at the earliest.
349                if table.is_null() {
350                    let n = if sc > 0 {
351                        sc as usize
352                    } else {
353                        DO_NOT_USE
354                    };
355                    let p = self.policy.load(Ordering::SeqCst, guard);
356
357                    table = Shared::boxed(RingBuffer::new(p, self.buffer_items), &self.collector);
358                    self.get_buf.store(table, Ordering::SeqCst);
359                    sc = load_factor!(n as isize);
360                }
361                self.size_buf_ctl.store(sc, Ordering::SeqCst);
362                break table;
363            }
364        }
365    }
366
367    fn init_store<'g>(&'g self, guard: &'g Guard<'_>) -> Shared<'g, Store<V>> {
368        loop {
369            let table = self.store.load(Ordering::SeqCst, guard);
370            // safety: we loaded the table while the thread was marked as active.
371            // table won't be deallocated until the guard is dropped at the earliest.
372            if !table.is_null() && !unsafe { table.deref() }.is_empty() {
373                break table;
374            }
375
376            //try to allocate the table
377            let mut sc = self.size_ctl.load(Ordering::SeqCst);
378            if sc < 0 {
379                // we lost the initialization race; just spin
380                std::thread::yield_now();
381                continue;
382            }
383
384            if self
385                .size_ctl
386                .compare_exchange(sc, -1, Ordering::SeqCst, Ordering::Relaxed)
387                .is_ok() {
388                // we get to do it!
389                let mut table = self.store.load(Ordering::SeqCst, guard);
390
391                // safety: we loaded the table while the thread was marked as active.
392                // table won't be deallocated until the guard is dropped at the earliest.
393                if table.is_null() || unsafe { table.deref() }.is_empty() {
394                    let n = if sc > 0 {
395                        sc as usize
396                    } else {
397                        NUM_SHARDS
398                    };
399                    table = Shared::boxed(Store::new(), &self.collector);
400                    self.store.store(table, Ordering::SeqCst);
401                    sc = load_factor!(n as isize);
402                }
403
404
405                self.size_ctl.store(sc, Ordering::SeqCst);
406
407
408                break table;
409            }
410        }
411    }
412
413    fn init_policy<'g>(&'g self, guard: &'g Guard<'_>) -> Shared<'g, DefaultPolicy<V>> {
414        loop {
415            let mut table = self.policy.load(Ordering::SeqCst, guard);
416            // safety: we loaded the table while the thread was marked as active.
417            // table won't be deallocated until the guard is dropped at the earliest.
418            if !table.is_null() {
419                break table;
420            }
421
422            //try to allocate the table
423            // let metrics = Box::into_raw(Box::new(Metrics::new(DO_NOT_USE, &self.collector)));
424            if let Some(m) = &self.metrics {
425                let v: *const Metrics = &**m;
426
427
428                let p = DefaultPolicy::new(self.numb_counters, self.max_cost, v);
429
430
431                table = Shared::boxed(p, &self.collector);
432                self.policy.store(table, Ordering::SeqCst);
433                self.init_ringbuf(guard);
434            } else {
435                continue;
436            }
437            break table;
438        }
439    }
440}
441
442
443impl<V, K, S> Cache<K, V, S>
444    where K: Hash + Ord,
445          S: BuildHasher,
446{
447    pub fn hash<Q: ?Sized + Hash + 'static>(&self, key: &Q) -> (u64, u64) {
448        let t = TypeId::of::<&Q>();
449        if t == TypeId::of::<&i64>() {
450            let v = key as *const Q as *const i64;
451            let v = unsafe { v.as_ref().unwrap() };
452            if *v == 0 {
453                return (0, 0);
454            }
455            return (*v as u64, 0);
456        }
457        if t == TypeId::of::<&i32>() {
458            let v = key as *const Q as *const i32;
459            let v = unsafe { v.as_ref().unwrap() };
460            return (*v as u64, 0);
461        }
462
463        if t == TypeId::of::<&u64>() {
464            let v = key as *const Q as *const u64;
465            let v = unsafe { v.as_ref().unwrap() };
466
467            return (*v as u64, 0);
468        }
469
470
471        if t == TypeId::of::<&u32>() {
472            let v = key as *const Q as *const u32;
473            let v = unsafe { v.as_ref().unwrap() };
474            return (*v as u64, 0);
475        }
476
477        if t == TypeId::of::<&u8>() {
478            let v = key as *const Q as *const u8;
479            let v = unsafe { v.as_ref().unwrap() };
480            return (*v as u64, 0);
481        }
482        if t == TypeId::of::<&usize>() {
483            let v = key as *const Q as *const usize;
484            let v = unsafe { v.as_ref().unwrap() };
485            return (*v as u64, 0);
486        }
487
488        if t == TypeId::of::<&i16>() {
489            let v = key as *const Q as *const i16;
490            let v = unsafe { v.as_ref().unwrap() };
491            return (*v as u64, 0);
492        }
493        if t == TypeId::of::<&i8>() {
494            let v = key as *const Q as *const i8;
495            let v = unsafe { v.as_ref().unwrap() };
496            return (*v as u64, 0);
497        }
498        let mut h = self.build_hasher.build_hasher();
499        key.hash(&mut h);
500
501        let slice = unsafe {
502            std::slice::from_raw_parts(key as *const Q as *const u8, std::mem::size_of_val(key))
503        };
504
505        let t = TypeId::of::<Q>();
506        if t == TypeId::of::<i64>() {}
507
508        (h.finish(), const_xxh3(slice))
509    }
510
511
512    /// Get returns the value (if any) and a boolean representing whether the
513    /// value was found or not. The value can be nil and the boolean can be true at
514    /// the same time.
515    pub fn get<'g, Q: ?Sized + Hash + 'static>(&'g self, key: &Q, guard: &'g Guard) -> Option<&'g V> {
516        let (key_hash, conflict) = self.hash(key);
517
518        let buf = self.get_buf.load(Ordering::SeqCst, guard);
519        if buf.is_null() {
520            return None;
521        }
522        unsafe { buf.deref() }.push(key_hash, guard);
523
524        let store = self.store.load(Ordering::SeqCst, guard);
525
526        // let mut old_value = None;
527
528        if store.is_null() {
529            return None;
530        }
531
532
533        let result = unsafe { store.deref() }.get(key_hash, conflict, guard);
534        return match result {
535            None => {
536                if let Some(metrics) = &self.metrics {
537                    metrics.add(HIT, key_hash, 1, guard);
538                }
539                None
540            }
541            Some(ref _v) => {
542                if let Some(metrics) = &self.metrics {
543                    metrics.add(MISS, key_hash, 1, guard);
544                }
545                result
546            }
547        };
548    }
549}
550
551impl<V, K, S> Cache<K, V, S>
552    where
553        K: Sync + Send + Clone + Hash + Ord + 'static,
554        V: Sync + Send,
555        S: BuildHasher,
556{
557    /*    fn init_metrics2<'g>(&'g self, guard: &'g Guard<'_>) -> Shared<'g, Metrics> {
558            loop {
559                let mut metrics = self.metrics.load(Ordering::SeqCst, guard);
560                if !metrics.is_null() {
561                    break metrics;
562                }
563
564                metrics = Shared::boxed(Metrics::new(, &self.collector), &self.collector);
565                self.metrics.store(metrics, Ordering::SeqCst);
566                break metrics;
567            }
568        }*/
569
570
571
572    /// Set attempts to add the key-value item to the cache. If it returns false,
573    /// then the Set was dropped and the key-value item isn't added to the cache. If
574    /// it returns true, there's still a chance it could be dropped by the policy if
575    /// its determined that the key-value item isn't worth keeping, but otherwise the
576    /// item will be added and other items will be evicted in order to make room.
577    ///
578    /// To dynamically evaluate the items cost using the Config.Coster function, set
579    /// the cost parameter to 0 and Coster will be ran when needed in order to find
580    /// the items true cost.
581    pub fn set<'g>(&'g self, key: K, value: V, cost: i64, guard: &'g Guard<'_>) -> bool {
582        self.check_guard(guard);
583        self.set_with_ttl(key, value, cost, Duration::from_millis(0), guard)
584    }
585
586
587    /// SetWithTTL works like Set but adds a key-value pair to the cache that will expire
588    /// after the specified TTL (time to live) has passed. A zero value means the value never
589    /// expires, which is identical to calling Set. A negative value is a no-op and the value
590    /// is discarded.
591    pub fn set_with_ttl<'g>(&'g self, key: K, value: V, cost: i64, ttl: Duration, guard: &'g Guard) -> bool {
592        let mut expiration: Option<Duration> = None;
593        if ttl.as_millis() < 0 {
594            return false;
595        } else if ttl.is_zero() {
596            expiration = Some(ttl)
597        } else if ttl.as_millis() > 0 {
598            expiration = Some(ttl)
599        } else {
600            expiration = Some(time::SystemTime::now().elapsed().unwrap().checked_add(ttl).unwrap())
601        }
602        let (key_hash, conflict) = self.hash(&key);
603
604        let mut store = self.store.load(Ordering::SeqCst, guard);
605        let value = Shared::boxed(value, &self.collector);
606        // let mut old_value = None;
607
608        let policy = self.policy.load(Ordering::SeqCst, guard);
609        loop {
610            if store.is_null() {
611                store = self.init_store(guard);
612                continue;
613            }
614
615            let dstore = unsafe { store.as_ptr() };
616            let dstore = unsafe { dstore.as_mut().unwrap() };
617
618            let mut item = Item {
619                flag: ItemNew,
620                key: key_hash,
621                conflict: conflict,
622                value: Atomic::null(),
623                cost,
624                expiration,
625            };
626            item.value.store(value, Ordering::SeqCst);
627
628            if dstore.update(&item, guard) {
629                item.flag = ItemUpdate
630            };
631
632            let node = Node {
633                key: key_hash,
634                conflict,
635                value: Atomic::null(),
636                expiration,
637            };
638            node.value.store(value, Ordering::SeqCst);
639
640            match item.flag {
641                ItemNew | ItemUpdate => unsafe {
642                    if item.cost == 0 && self.cost.is_some() {
643                        item.cost = (self.cost.unwrap())(item.value.load(Ordering::SeqCst, guard).deref());
644                    }
645                }
646                _ => {}
647            }
648
649
650            match item.flag {
651                ItemNew => {
652                    let (victims, added) = unsafe {
653                        let policy = policy.as_ptr();
654                        policy.as_mut().unwrap().add(item.key, item.cost, guard)
655                    };
656
657                    if added {
658                        dstore.set(node, guard);
659                        if let Some(metrics) = &self.metrics {
660                            metrics.add(KEY_ADD, item.key, 1, guard)
661                        }
662                    }
663
664
665                    for i in 0..victims.len() {
666                        let delVal = dstore.del(&victims[i].key, &0, guard);
667                        match delVal {
668                            Some((_c, _v)) => {
669                                // victims[i].value = Some(v.clone());
670                                // victims[i].conflict = c;
671
672                                if self.on_evict.is_some() {
673                                    let v = victims[i].value.load(Ordering::SeqCst, guard);
674
675                                    (self.on_evict.unwrap())(victims[i].key, victims[i].conflict, unsafe { v.deref().deref().deref() }, victims[i].cost)
676                                }
677                                // if !self.metrics.is_null() {
678                                //     unsafe {
679                                //         self.metrics.as_mut().unwrap().add(KEY_EVICT, victims[i].key, 1);
680                                //         self.metrics.as_mut().unwrap().add(COST_EVICT, victims[i].key, victims[i].cost as u64);
681                                //     };
682                                // }
683                            }
684                            None => { continue; }
685                        }
686                    }
687                    break true;
688                }
689                ItemDelete => {
690                    unsafe {
691                        let policy = policy.as_ptr();
692                        policy.as_mut().unwrap().del(&item.key, guard)
693                    }
694                    dstore.del(&item.key, &item.conflict, guard);
695                }
696                ItemUpdate => {
697                    unsafe {
698                        let policy = policy.as_ptr();
699                        policy.as_mut().unwrap().update(item.key, item.cost, guard);
700                    }
701                    // unsafe { policy.deref() }.update(item2.key, item2.cost, guard);
702                }
703            }
704
705
706            // self.process_items(node, item2, cost, guard);
707
708            break true;
709        }
710    }
711
712
713    /// Del deletes the key-value item from the cache if it exists.
714    pub fn del<'g, Q: ?Sized + Hash + 'static>(&'g self, key: &Q, guard: &'g Guard) {
715        let (key_hash, conflict) = self.hash(key);
716        let item = Item {
717            flag: ItemDelete,
718            key: key_hash,
719            conflict: conflict,
720            value: Atomic::null(),
721            cost: 0,
722            expiration: None,
723        };
724
725        let node = Node {
726            key: 0,
727            conflict: 0,
728            value: Atomic::null(),
729            expiration: None,
730
731        };
732
733
734        self.process_items(node, item, 0, guard);
735        // self.set_buf.send(item);
736    }
737
738
739    /// Clear empties the hashmap and zeroes all policy counters. Note that this is
740    /// not an atomic operation (but that shouldn't be a problem as it's assumed that
741    /// Set/Get calls won't be occurring until after this).
742    pub fn clear<'g>(&'g self, guard: &'g Guard) {
743        // block until processItems  is returned
744        let store = self.store.load(Ordering::SeqCst, guard);
745        let policy = self.policy.load(Ordering::SeqCst, guard);
746
747
748        unsafe {
749            if !policy.is_null() {
750                let policy = policy.as_ptr();
751                policy.as_mut().unwrap().clear(guard);
752            }
753        }
754        if !store.is_null() {
755            unsafe {
756                let p = store.as_ptr();
757                p.as_mut().unwrap().clear(guard);
758            };
759        }
760
761        self.clear(guard);
762
763
764        /* let (tx, rx) = crossbeam_channel::unbounded();
765         self.set_buf = tx;
766         self.receiver_buf = rx;*/
767
768        //TODO fix thead after clear
769        /* thread::spawn( || {
770             let guard = crossbeam::epoch::pin();
771             self.process_items(&guard);
772         });*/
773    }
774
775    pub fn process_items<'g>(&'g self, node: Node<V>, mut item: Item<V>, cost: i64, guard: &'g Guard) {
776        let _cost = cost;
777        match item.flag {
778            ItemNew | ItemUpdate => unsafe {
779                if item.cost == 0 && self.cost.is_some() {
780                    item.cost = (self.cost.unwrap())(item.value.load(Ordering::SeqCst, guard).deref());
781                }
782            }
783            _ => {}
784        }
785
786        match item.flag {
787            ItemNew => {
788                let mut policy = self.policy.load(Ordering::SeqCst, guard);
789                loop {
790                    if policy.is_null() {
791                        policy = self.init_policy(guard);
792                        continue;
793                    }
794                    let (victims, added) = unsafe {
795                        let p = policy.as_ptr();
796                        let p = p.as_mut().unwrap();
797                        p.add(item.key, item.cost, guard)
798                    };
799
800                    let store = self.store.load(Ordering::SeqCst, guard);
801                    if added {
802                        let store = unsafe { store.as_ptr() };
803                        let store = unsafe { store.as_mut().unwrap() };
804                        store.set(node, guard);
805                        break;
806                    }
807
808                    for i in 0..victims.len() {
809                        let store = unsafe { store.as_ptr() };
810                        let store = unsafe { store.as_mut().unwrap() };
811                        let delVal = store.del(&victims[i].key, &0, guard);
812                        match delVal {
813                            Some((_c, _v)) => {
814                                // victims[i].value = Some(v.clone());
815                                // victims[i].conflict = c;
816
817                                if self.on_evict.is_some() {
818                                    let v = victims[i].value.load(Ordering::SeqCst, guard);
819
820                                    (self.on_evict.unwrap())(victims[i].key, victims[i].conflict, unsafe { v.deref().deref().deref() }, victims[i].cost)
821                                }
822                                // if !self.metrics.is_null() {
823                                //     unsafe {
824                                //         self.metrics.as_mut().unwrap().add(KEY_EVICT, victims[i].key, 1);
825                                //         self.metrics.as_mut().unwrap().add(COST_EVICT, victims[i].key, victims[i].cost as u64);
826                                //     };
827                                // }
828                            }
829                            None => { continue; }
830                        }
831                    }
832                }
833            }
834            ItemDelete => {
835                let policy = self.policy.load(Ordering::SeqCst, guard);
836                if policy.is_null() {
837                    return;
838                }
839
840                let store = self.store.load(Ordering::SeqCst, guard);
841                unsafe {
842                    let p = policy.as_ptr();
843                    let p = p.as_mut().unwrap();
844                    p.del(&item.key, guard)
845                }
846
847                let store = unsafe { store.as_ptr() };
848                let store = unsafe { store.as_mut().unwrap() };
849                store.del(&item.key, &item.conflict, guard);
850            }
851            ItemFlag::ItemUpdate => {
852                let policy = self.policy.load(Ordering::SeqCst, guard);
853                if policy.is_null() {
854                    return;
855                }
856                unsafe {
857                    let p = policy.as_ptr();
858                    let p = p.as_mut().unwrap();
859                    p.update(item.key, item.cost, guard);
860                }
861            }
862        }
863    }
864
865
866    pub fn clean_up<'g>(&'g self, guard: &'g Guard<'_>) {
867        self.check_guard(guard);
868        let store = self.store.load(Ordering::SeqCst, guard);
869        let policy = self.policy.load(Ordering::SeqCst, guard);
870        if store.is_null() || policy.is_null() {}
871        unsafe { store.as_ptr().as_mut().unwrap() }.clean_up(unsafe { policy.as_ptr().as_mut().unwrap() }, guard)
872    }
873}
874
875type MetricType = usize;
876
877pub const HIT: MetricType = 0;
878pub const MISS: MetricType = 1;
879// The following 3 keep track of number of keys added, updated and evicted.
880const KEY_ADD: MetricType = 2;
881pub const KEY_UPDATE: MetricType = 3;
882pub const KEY_EVICT: MetricType = 4;
883// The following 2 keep track of cost of keys added and evicted.
884pub const COST_ADD: MetricType = 5;
885pub const COST_EVICT: MetricType = 6;
886// The following keep track of how many sets were dropped or rejected later.
887
888pub const DROP_SETS: MetricType = 7;
889pub const REJECT_SETS: MetricType = 8;
890// The following 2 keep track of how many gets were kept and dropped on the
891// floor.
892pub const DROP_GETS: MetricType = 9;
893pub const KEEP_GETS: MetricType = 10;
894// This should be the final enum. Other enums should be set before this.
895pub const DO_NOT_USE: MetricType = 11;
896
897pub struct Metrics {
898    pub(crate) all: Box<[Atomic<[u64; 256]>]>,
899
900}
901
902impl Clone for Metrics {
903    fn clone(&self) -> Self {
904        Self {
905            all: self.all.clone()
906        }
907    }
908}
909
910impl Metrics {
911    pub(crate) fn new(n: usize, collector: &Collector) -> Self {
912        let data = vec![Atomic::from(Shared::boxed([0u64; 256], collector)); n];
913        Metrics {
914            all: data.into_boxed_slice(),
915        }
916    }
917    pub(crate) fn get<'g>(&'g self, t: MetricType, guard: &'g Guard) -> u64 {
918        let all = self.all[t].load(Ordering::SeqCst, guard);
919        if all.is_null() {
920            return 0;
921        }
922
923        let data = unsafe { all.as_ptr() };
924        let data = unsafe { data.as_mut().unwrap() };
925        let mut total = 0;
926        for i in 0..data.len() {
927            total += data[i];
928        }
929        total
930    }
931    pub(crate) fn SetsDropped<'g>(&'g self, guard: &'g Guard) -> u64 {
932        self.get(DROP_SETS, guard)
933    }
934    pub(crate) fn add<'g>(&self, t: MetricType, hash: u64, delta: u64, guard: &'g Guard) {
935        let idx = (hash % 5) * 10;
936        let all = self.all[t].load(Ordering::SeqCst, guard);
937        if all.is_null() {
938            panic!("metric all is null");
939        }
940        let data = unsafe { all.as_ptr() };
941        let data = unsafe { data.as_mut().unwrap() };
942
943        let _ = data[idx as usize].checked_add(delta);
944        // unsafe {all.deref().deref().deref()[idx as usize] = delta};
945    }
946
947    pub fn clear<'g>(&self, guard: &'g Guard) {
948        let _data = vec![Atomic::from(Shared::boxed([0u64; 256], guard.collector().unwrap())); DO_NOT_USE];
949        // self.all.as_mut() = &mut *data.into_boxed_slice();
950    }
951}
952
953#[derive(Eq, PartialEq, Debug)]
954pub enum PutResult<'a, T> {
955    Inserted {
956        new: &'a T,
957    },
958    Replaced {
959        old: &'a T,
960        new: &'a T,
961    },
962    Exists {
963        current: &'a T,
964        not_inserted: Box<Linked<T>>,
965    },
966}
967
968
969#[cfg(test)]
970mod tests {
971    use std::sync::Arc;
972    use std::sync::atomic::Ordering;
973    use std::thread;
974    use std::time::Duration;
975
976    use hashbrown::HashSet;
977    use rayon;
978    use rayon::prelude::*;
979
980    use crate::bloom::haskey::key_to_hash;
981    use crate::cache::{Cache, Item};
982    use crate::cache::ItemFlag::ItemUpdate;
983    use crate::reclaim::{Atomic, Shared};
984    use crate::store::Node;
985
986    const ITER: u64 = 32 * 1024;
987
988    #[test]
989    fn check() {
990        let _v = 1e7 as i64;
991        let _s = 1 << 30;
992        let _v2 = 1e6 as i64;
993        let _s2 = 1 << 20;
994        println!("")
995    }
996
997    #[test]
998    fn test_cache_key_to_hash() {
999        let _key_to_hash_count = 0;
1000        let cache = Cache::new();
1001
1002        let guard = cache.guard();
1003        cache.set(1, 2, 1, &guard);
1004        cache.set(2, 2, 1, &guard);
1005        println!("{:?}", cache.get(&1, &guard));
1006        println!("{:?}", cache.get(&2, &guard));
1007    }
1008
1009    #[test]
1010    fn test_cache_insert_thread() {
1011        let map = Cache::<u64, u64>::new();
1012        // let map1 = Arc::clone(&map);
1013        let mut hashet = HashSet::new();
1014
1015        let guard = map.guard();
1016        for i in 0..300_u64 {
1017            if i == 256 {
1018                println!("")
1019            }
1020            let (key_hash, _conflict) = map.hash(&i);
1021
1022            hashet.insert(key_hash);
1023        }
1024
1025        let _size = hashet.len();
1026
1027        // let map2 = Arc::clone(&map);
1028        for i in 0..300 {
1029            if let Some(v) = map.get(&i, &guard) {
1030                if &i != v {
1031                    panic!("i not equel {i}", )
1032                }
1033                println!("key:{i}, value: {:?}", map.get(&i, &guard));
1034            } else {
1035                println!("key:{i}, value: None");
1036            }
1037        };
1038    }
1039
1040
1041    #[test]
1042    fn test_cache_key_to_hash_thread() {
1043        let _key_to_hash_count = 0;
1044        let cache = Cache::new();
1045
1046        let arcc = Arc::new(cache);
1047        let c1 = Arc::clone(&arcc);
1048        let c2 = Arc::clone(&arcc);
1049        let c3 = Arc::clone(&arcc);
1050
1051        let t1 = thread::spawn(move || {
1052            let guard = c1.guard();
1053            for i in 0..100000 {
1054                c1.set(i, i + 7, 1, &guard);
1055            }
1056        });
1057
1058        let t2 = thread::spawn(move || {
1059            let guard = c2.guard();
1060            for i in 0..100000 {
1061                c2.set(i, i + 7, 1, &guard);
1062            }
1063        });
1064
1065        let t3 = thread::spawn(move || {
1066            let guard = c3.guard();
1067            for i in 0..100000 {
1068                c3.set(i, i + 7, 1, &guard);
1069            }
1070        });
1071        let c41 = Arc::clone(&arcc);
1072        let t4 = thread::spawn(move || {
1073            thread::sleep(Duration::from_millis(1000));
1074            let guard = c41.guard();
1075            for i in 0..300 {
1076                println!("{:?}", c41.get(&i, &guard))
1077            }
1078        });
1079
1080        t1.join();
1081        t2.join();
1082        t3.join();
1083        t4.join();
1084        let c4 = Arc::clone(&arcc);
1085        let guard = c4.guard();
1086        c4.set(1, 2, 1, &guard);
1087        c4.set(2, 2, 1, &guard);
1088        println!("{:?}", c4.get(&1, &guard));
1089        println!("{:?}", c4.get(&2, &guard));
1090    }
1091
1092
1093    #[test]
1094    fn test_cache_with_ttl2() {
1095        let _key_to_hash_count = 0;
1096        let cache = Cache::new();
1097
1098        (0..ITER).into_par_iter().for_each(|i| {
1099            let guard = cache.guard();
1100            cache.set(i, i + 7, 1, &guard);
1101        });
1102    }
1103
1104    #[test]
1105    fn test_cache_with_ttl() {
1106        let _key_to_hash_count = 0;
1107        let cache = Cache::new();
1108
1109        let guard = cache.guard();
1110
1111        let key = 1;
1112        let value = 1;
1113        let _cost = 1;
1114        let _ttl = Duration::from_millis(0);
1115
1116        loop {
1117            if !cache.set_with_ttl(1, 1, 1, Duration::from_millis(0), &guard) {
1118                thread::sleep(Duration::from_millis(10));
1119                continue;
1120            }
1121            thread::sleep(Duration::from_millis(50));
1122            match cache.get(&key, &guard) {
1123                None => {
1124                    assert!(false)
1125                }
1126                Some(v) => {
1127                    assert_eq!(v, &value)
1128                }
1129            }
1130            break;
1131        }
1132
1133
1134        cache.set(1, 2, 2, &guard);
1135        let (key_hash, confilictha) = cache.hash(&1);
1136        let store = cache.store.load(Ordering::SeqCst, &guard);
1137        assert_eq!(store.is_null(), false);
1138        let some = unsafe { store.deref() }.get(key_hash, confilictha, &guard);
1139        assert_eq!(some.is_some(), true);
1140        assert_eq!(some.unwrap(), &2);
1141
1142
1143        thread::sleep(Duration::from_millis(10));
1144
1145        for _i in 0..1000 {
1146            let (key_hash, conflict) = cache.hash(&1);
1147            cache.process_items(Node {
1148                key: 0,
1149                conflict,
1150                value: Atomic::null(),
1151                expiration: None,
1152            }, Item {
1153                flag: ItemUpdate,
1154                key: key_hash,
1155                conflict: conflict,
1156                value: Atomic::from(Shared::boxed(1, &cache.collector)),
1157                cost: 1,
1158                expiration: Some(Duration::from_millis(200u64)),
1159            }, 1, &guard);
1160        }
1161        thread::sleep(Duration::from_millis(50));
1162        let v = cache.set(2, 2, 1, &guard);
1163        assert_eq!(v, true);
1164
1165        /*        assert_eq!(cache.metrics.is_none(), false);
1166                assert_eq!(cache.metrics.unwrap().SetsDropped(&guard), 0)*/
1167    }
1168
1169    #[test]
1170    fn test_sotre_set_get_thread() {
1171        let map = Arc::new(Cache::<u64, u64>::new());
1172
1173        let _thread: Vec<_> = (0..10).map(|_| {
1174            let map1 = map.clone();
1175            thread::spawn(move || {
1176                let guard = map1.guard();
1177                let s = map1.store.load(Ordering::SeqCst, &guard);
1178                if s.is_null() {
1179                    panic!("store is null");
1180                }
1181                for i in 0..ITER {
1182                    let (key, confilict) = key_to_hash(&i);
1183                    let value = Shared::boxed(i + 2, &map1.collector);
1184                    let node = Node::new(key, confilict, value, None);
1185                    unsafe { s.as_ptr().as_mut().unwrap().set(node, &guard) };
1186                    let v = unsafe { s.deref() }.get(key, confilict, &guard);
1187                    assert_eq!(v, Some(&(i + 2)))
1188                }
1189            })
1190        }).collect();
1191    }
1192
1193    // A unit struct without resources
1194    #[derive(Debug, Clone, Copy)]
1195    struct Unit;
1196
1197    // A tuple struct with resources that implements the `Clone` trait
1198    #[derive(Clone, Debug)]
1199    struct Pair(Box<i32>, Box<i32>);
1200
1201    #[test]
1202    fn testclone() {
1203        let unit = Unit;
1204        // Copy `Unit`, there are no resources to move
1205        let copied_unit = unit;
1206
1207        // Both `Unit`s can be used independently
1208        println!("original: {:?}", unit);
1209        println!("copy: {:?}", copied_unit);
1210
1211        // Instantiate `Pair`
1212        let pair = Pair(Box::new(1), Box::new(2));
1213        println!("original: {:?}", pair);
1214
1215        // Move `pair` into `moved_pair`, moves resources
1216        let moved_pair = pair;
1217        println!("moved: {:?}", moved_pair);
1218
1219        // Error! `pair` has lost its resources
1220        // println!("original: {:?}", pair);
1221        // TODO ^ Try uncommenting this line
1222
1223        // Clone `moved_pair` into `cloned_pair` (resources are included)
1224        let cloned_pair = moved_pair.clone();
1225        // Drop the original pair using std::mem::drop
1226        drop(moved_pair);
1227
1228        // Error! `moved_pair` has been dropped
1229        //println!("copy: {:?}", moved_pair);
1230        // TODO ^ Try uncommenting this line
1231
1232        // The result from .clone() can still be used!
1233        println!("clone: {:?}", cloned_pair);
1234    }
1235}