memory_cache_rust/
store.rs

1use std::collections::HashMap;
2use std::ops::{Deref, DerefMut};
3use std::sync::atomic::Ordering;
4use std::time::{Duration, Instant};
5
6use parking_lot::Mutex;
7use seize::Guard;
8
9use crate::cache::{Item, NUM_SHARDS};
10use crate::policy::DefaultPolicy;
11use crate::reclaim::Atomic;
12use crate::ttl::ExpirationMap;
13
14pub struct Node<V> {
15    pub key: u64,
16    pub conflict: u64,
17    pub(crate) value: Atomic<V>,
18    pub expiration: Option<Duration>,
19
20}
21
22impl<V> Node<V> {
23    pub(crate) fn new<AV>(key: u64, conflict: u64, value: AV, expiration: Option<Duration>) -> Self
24        where AV: Into<Atomic<V>>,
25    {
26        Node {
27            key,
28            conflict,
29            value: value.into(),
30            expiration,
31        }
32    }
33}
34
35impl<V> Clone for Node<V> {
36    fn clone(&self) -> Self {
37        Self {
38            key: self.key,
39            conflict: self.conflict,
40            value: self.value.clone(),
41            expiration: self.expiration,
42        }
43    }
44}
45
46pub(crate) struct Store<V> {
47    pub data: Vec<HashMap<u64, Node<V>>>,
48    em: ExpirationMap,
49    lock: Mutex<()>,
50}
51
52// impl<V> Clone for Store<V> {
53//     fn clone(&self) -> Self {
54//         let mut store = Store::new();
55//         for map in &self.data {
56//             store.data.push(map.clone())
57//         }
58//         store
59//     }
60// }
61
62impl<V> Deref for Store<V> {
63    type Target = ();
64
65    fn deref(&self) -> &Self::Target {
66        todo!()
67    }
68}
69
70impl<V> DerefMut for Store<V> {
71    fn deref_mut(&mut self) -> &mut Self::Target {
72        self
73    }
74}
75
76impl<V> Store<V> {
77    pub fn new() -> Self {
78        Self::from(Vec::with_capacity(NUM_SHARDS))
79    }
80    pub fn from(mut data: Vec<HashMap<u64, Node<V>>>) -> Self {
81        for _i in 0..NUM_SHARDS {
82            data.push(HashMap::new());
83        }
84
85        Self {
86            data: data,
87            em: ExpirationMap::new(),
88            lock: Default::default(),
89        }
90    }
91    pub(crate) fn clear<'g>(&'g mut self, _guard: &'g Guard) {
92        self.data = Vec::with_capacity(NUM_SHARDS);
93        for _i in 0..NUM_SHARDS {
94            self.data.push(HashMap::new());
95        }
96    }
97    pub(crate) fn is_empty(&self) -> bool {
98        self.data.is_empty()
99    }
100
101
102    pub(crate) fn bini(&self, hash: u64) -> usize {
103        (hash % NUM_SHARDS as u64) as usize
104    }
105
106
107    /*   pub(crate) fn bin<'g>(&'g self, i: usize, guard: &'g Guard<'_>) -> Shared<'g, HashMap<u64, Node<V>>> {
108           self.data[i].load(Ordering::Acquire, guard)
109       }
110
111
112       pub(crate) fn cas_bin<'g>(
113           &'g self,
114           i: usize,
115           current: Shared<'_, HashMap<u64, Node<V>>>,
116           new: Shared<'g, HashMap<u64, Node<V>>>,
117           guard: &'g Guard<'_>,
118       ) -> Result<Shared<'g, HashMap<u64, Node<V>>>, reclaim::CompareExchangeError<'g, HashMap<u64, Node<V>>>> {
119           self.data[i].compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire, guard)
120       }*/
121    pub(crate) fn expiration<'g>(&'g mut self, key: &u64, _guard: &'g Guard<'_>) -> Option<Duration> {
122        let index = self.bini(*key);
123
124        return match self.data[index].get(key) {
125            None => None,
126            Some(v) => {
127                v.expiration
128            }
129        };
130    }
131
132    pub fn get<'g>(&'g self, key_hash: u64, confilict_hash: u64, guard: &'g Guard<'_>) -> Option<&'g V> {
133        let lock = self.lock.lock();
134        let index = self.bini(key_hash);
135
136        return match self.data[index].get(&key_hash) {
137            None => {
138                drop(lock);
139                None
140            }
141            Some(v) => {
142                if confilict_hash != 0 && confilict_hash != v.conflict {
143                    drop(lock);
144                    return None;
145                }
146                let now = Instant::now();
147                if v.expiration.is_some() && v.expiration.unwrap().as_millis() > now.elapsed().as_millis() {
148                    drop(lock);
149                    None
150                } else {
151                    let item = v.value.load(Ordering::SeqCst, guard);
152                    if let Some(v) = unsafe { item.as_ref() } {
153                        let v = &**v;
154                        drop(lock);
155                        return Some(v);
156                    }
157                    drop(lock);
158                    return None;
159                }
160            }
161        };
162    }
163
164    pub(crate) fn set<'g>(&'g mut self, item: Node<V>, guard: &'g Guard<'_>) {
165        let lock = self.lock.lock();
166
167
168        let index = self.bini(item.key);
169
170        match self.data[index].get(&item.key) {
171            None => {
172                if item.expiration.is_some() {
173                    self.em.add(item.key, item.conflict, item.expiration.unwrap(), guard);
174                }
175
176                self.data[index].insert(item.key, item);
177                drop(lock);
178                return;
179            }
180            Some(v) if v.conflict != item.conflict && item.conflict != 0 => {
181                drop(lock);
182                return;
183            }
184            Some(v) => {
185                if v.expiration.is_some() {
186                    self.em.update(item.key, item.conflict, v.expiration.unwrap(), item.expiration.unwrap(), guard);
187                }
188
189                self.data[index].insert(item.key, item);
190                drop(lock);
191                return;
192            }
193        }
194    }
195
196    pub(crate) fn update<'g>(&'g mut self, item: &Item<V>, guard: &'g Guard<'_>) -> bool {
197        let index = self.bini(item.key);
198
199
200        return match self.data[index].get_mut(&item.key) {
201            None => {
202                false
203            }
204            Some(v) if v.conflict != item.conflict && item.conflict != 0 => {
205                false
206            }
207            Some(v) => {
208                if v.expiration.is_some() {
209                    //todo
210                    self.em.update(item.key, item.conflict, v.expiration.unwrap(), item.expiration.unwrap(), guard);
211                }
212                self.data[index].insert(item.key, Node {
213                    key: item.key,
214                    conflict: item.conflict,
215                    value: item.value.clone(),
216                    expiration: item.expiration,
217
218                });
219
220                true
221            }
222        };
223    }
224
225    pub(crate) fn del<'g>(&'g mut self, key_hash: &u64, conflict: &u64, guard: &'g Guard<'_>) -> Option<(u64, &'g V)> {
226        let index = self.bini(*key_hash);
227
228
229        return match self.data[index].get_mut(key_hash) {
230            None => {
231                None
232            }
233            Some(v) if v.conflict != *conflict && *conflict != 0 => {
234                None
235            }
236            Some(v) => {
237                if v.expiration.is_some() {
238                    self.em.del(&v.key, v.expiration.unwrap(), guard);
239                }
240                if let Some(item) = self.data[index].remove(key_hash) {
241                    let v = item.value.load(Ordering::SeqCst, guard);
242                    assert!(!v.is_null());
243                    return Some((item.conflict, unsafe { v.as_ref().unwrap().deref() }));
244                }
245                None
246            }
247        };
248    }
249
250    pub(crate) fn clean_up<'g>(&'g mut self, policy: &mut DefaultPolicy<V>, guard: &'g Guard<'_>) {
251        let maps = self.em.cleanup(policy, None, guard);
252        for (key, conflict) in maps {
253            match self.expiration(&key,
254                                  guard) {
255                None => { continue; }
256                Some(_v) => {
257                    let _cost = policy.cost(&key, guard);
258                    policy.del(&key, guard);
259                    let _value = self.del(&key, &conflict, guard);
260                    //ToDO for evict
261                    // if f.is_some(){
262                    // //ToDO for evict
263                    // // f.unwrap()(*key,*confilct,value.unwrap().1,cost)
264                    // }
265                }
266            }
267        }
268    }
269}
270
271
272
273#[cfg(test)]
274mod tests {
275    use seize::Collector;
276
277    use crate::bloom::haskey::key_to_hash;
278    use crate::cache::Item;
279    use crate::cache::ItemFlag::ItemNew;
280    use crate::reclaim::Shared;
281    use crate::store::{Node, Store};
282
283    const ITER: u64 = 32 * 1024;
284
285    #[test]
286    fn test_set_get() {
287        let collector = Collector::new();
288        let guard = collector.enter();
289        let mut s = Store::new();
290
291        for i in 0..20 {
292            let (key, confilict) = key_to_hash(&i);
293            let value = Shared::boxed(i + 2, &collector);
294            let node = Node::new(key, confilict, value, None);
295
296            s.set(node, &guard);
297            let v = s.get(key, confilict, &guard);
298            assert_eq!(v, Some(&(i + 2)))
299        }
300    }
301
302    #[test]
303    fn test_set_del() {
304        let collector = Collector::new();
305        let guard = collector.enter();
306        let mut s = Store::new();
307
308        for i in 0..20 {
309            let (key, confilict) = key_to_hash(&i);
310            let value = Shared::boxed(i + 2, &collector);
311            let node = Node::new(key, confilict, value, None);
312
313            s.set(node, &guard);
314            let d = s.del(&key, &confilict, &guard);
315            assert_eq!(d.unwrap().1, &(i + 2));
316
317            let v = s.get(key, confilict, &guard);
318            assert_eq!(v, None);
319        }
320    }
321
322
323    #[test]
324    fn test_set_clear() {
325        let collector = Collector::new();
326        let guard = collector.enter();
327        let mut s = Store::new();
328
329        for i in 0..20 {
330            let (key, confilict) = key_to_hash(&i);
331            let value = Shared::boxed(i + 2, &collector);
332            let node = Node::new(key, confilict, value, None);
333
334            s.set(node, &guard);
335        }
336        s.clear(&guard);
337
338        for i in 0..20 {
339            let (key, confilict) = key_to_hash(&i);
340            let v = s.get(key, confilict, &guard);
341            assert_eq!(v, None)
342        }
343    }
344
345    #[test]
346    fn test_set_update() {
347        let collector = Collector::new();
348        let guard = collector.enter();
349        let mut s = Store::new();
350
351        for i in 0..20 {
352            let (key, confilict) = key_to_hash(&i);
353            let value = Shared::boxed(i + 2, &collector);
354            let node = Node::new(key, confilict, value, None);
355
356            s.set(node, &guard);
357            let v = s.get(key, confilict, &guard);
358            assert_eq!(v, Some(&(i + 2)))
359        }
360
361        for i in 0..20 {
362            let (key, conflict) = key_to_hash(&i);
363            let value = Shared::boxed(i + 4, &collector);
364            let item = Item {
365                flag: ItemNew,
366                key: key,
367                conflict: conflict,
368                value: value.into(),
369                cost: 0,
370                expiration: None,
371            };
372            s.update(&item, &guard);
373            let v = s.get(key, conflict, &guard);
374            assert_eq!(v, Some(&(i + 4)))
375        }
376    }
377
378    #[test]
379    fn test_set_collision() {
380        let collector = Collector::new();
381        let guard = collector.enter();
382        let mut s = Store::new();
383        let value = Shared::boxed(1, &collector);
384
385        let node = Node::new(1, 0, value, None);
386        s.data.get_mut(1).unwrap().insert(1, node);
387        let v = s.get(1, 1, &guard);
388        assert_eq!(v, None);
389
390
391        let value = Shared::boxed(2, &collector);
392        let node = Node::new(1, 1, value, None);
393        s.set(node, &guard);
394        let v = s.get(1, 0, &guard);
395        assert_ne!(v, Some(&2));
396        let item = Item {
397            flag: ItemNew,
398            key: 1,
399            conflict: 1,
400            value: value.into(),
401            cost: 0,
402            expiration: None,
403        };
404        assert_eq!(s.update(&item, &guard), false);
405
406        s.del(&1, &1, &guard);
407        let v = s.get(1, 0, &guard);
408        assert_eq!(v, Some(&1));
409    }
410}
411
412