1use std::collections::HashMap;
2use std::ops::{Deref, DerefMut};
3use std::sync::atomic::Ordering;
4use std::time::{Duration, Instant};
5
6use parking_lot::Mutex;
7use seize::Guard;
8
9use crate::cache::{Item, NUM_SHARDS};
10use crate::policy::DefaultPolicy;
11use crate::reclaim::Atomic;
12use crate::ttl::ExpirationMap;
13
14pub struct Node<V> {
15 pub key: u64,
16 pub conflict: u64,
17 pub(crate) value: Atomic<V>,
18 pub expiration: Option<Duration>,
19
20}
21
22impl<V> Node<V> {
23 pub(crate) fn new<AV>(key: u64, conflict: u64, value: AV, expiration: Option<Duration>) -> Self
24 where AV: Into<Atomic<V>>,
25 {
26 Node {
27 key,
28 conflict,
29 value: value.into(),
30 expiration,
31 }
32 }
33}
34
35impl<V> Clone for Node<V> {
36 fn clone(&self) -> Self {
37 Self {
38 key: self.key,
39 conflict: self.conflict,
40 value: self.value.clone(),
41 expiration: self.expiration,
42 }
43 }
44}
45
46pub(crate) struct Store<V> {
47 pub data: Vec<HashMap<u64, Node<V>>>,
48 em: ExpirationMap,
49 lock: Mutex<()>,
50}
51
52impl<V> Deref for Store<V> {
63 type Target = ();
64
65 fn deref(&self) -> &Self::Target {
66 todo!()
67 }
68}
69
70impl<V> DerefMut for Store<V> {
71 fn deref_mut(&mut self) -> &mut Self::Target {
72 self
73 }
74}
75
76impl<V> Store<V> {
77 pub fn new() -> Self {
78 Self::from(Vec::with_capacity(NUM_SHARDS))
79 }
80 pub fn from(mut data: Vec<HashMap<u64, Node<V>>>) -> Self {
81 for _i in 0..NUM_SHARDS {
82 data.push(HashMap::new());
83 }
84
85 Self {
86 data: data,
87 em: ExpirationMap::new(),
88 lock: Default::default(),
89 }
90 }
91 pub(crate) fn clear<'g>(&'g mut self, _guard: &'g Guard) {
92 self.data = Vec::with_capacity(NUM_SHARDS);
93 for _i in 0..NUM_SHARDS {
94 self.data.push(HashMap::new());
95 }
96 }
97 pub(crate) fn is_empty(&self) -> bool {
98 self.data.is_empty()
99 }
100
101
102 pub(crate) fn bini(&self, hash: u64) -> usize {
103 (hash % NUM_SHARDS as u64) as usize
104 }
105
106
107 pub(crate) fn expiration<'g>(&'g mut self, key: &u64, _guard: &'g Guard<'_>) -> Option<Duration> {
122 let index = self.bini(*key);
123
124 return match self.data[index].get(key) {
125 None => None,
126 Some(v) => {
127 v.expiration
128 }
129 };
130 }
131
132 pub fn get<'g>(&'g self, key_hash: u64, confilict_hash: u64, guard: &'g Guard<'_>) -> Option<&'g V> {
133 let lock = self.lock.lock();
134 let index = self.bini(key_hash);
135
136 return match self.data[index].get(&key_hash) {
137 None => {
138 drop(lock);
139 None
140 }
141 Some(v) => {
142 if confilict_hash != 0 && confilict_hash != v.conflict {
143 drop(lock);
144 return None;
145 }
146 let now = Instant::now();
147 if v.expiration.is_some() && v.expiration.unwrap().as_millis() > now.elapsed().as_millis() {
148 drop(lock);
149 None
150 } else {
151 let item = v.value.load(Ordering::SeqCst, guard);
152 if let Some(v) = unsafe { item.as_ref() } {
153 let v = &**v;
154 drop(lock);
155 return Some(v);
156 }
157 drop(lock);
158 return None;
159 }
160 }
161 };
162 }
163
164 pub(crate) fn set<'g>(&'g mut self, item: Node<V>, guard: &'g Guard<'_>) {
165 let lock = self.lock.lock();
166
167
168 let index = self.bini(item.key);
169
170 match self.data[index].get(&item.key) {
171 None => {
172 if item.expiration.is_some() {
173 self.em.add(item.key, item.conflict, item.expiration.unwrap(), guard);
174 }
175
176 self.data[index].insert(item.key, item);
177 drop(lock);
178 return;
179 }
180 Some(v) if v.conflict != item.conflict && item.conflict != 0 => {
181 drop(lock);
182 return;
183 }
184 Some(v) => {
185 if v.expiration.is_some() {
186 self.em.update(item.key, item.conflict, v.expiration.unwrap(), item.expiration.unwrap(), guard);
187 }
188
189 self.data[index].insert(item.key, item);
190 drop(lock);
191 return;
192 }
193 }
194 }
195
196 pub(crate) fn update<'g>(&'g mut self, item: &Item<V>, guard: &'g Guard<'_>) -> bool {
197 let index = self.bini(item.key);
198
199
200 return match self.data[index].get_mut(&item.key) {
201 None => {
202 false
203 }
204 Some(v) if v.conflict != item.conflict && item.conflict != 0 => {
205 false
206 }
207 Some(v) => {
208 if v.expiration.is_some() {
209 self.em.update(item.key, item.conflict, v.expiration.unwrap(), item.expiration.unwrap(), guard);
211 }
212 self.data[index].insert(item.key, Node {
213 key: item.key,
214 conflict: item.conflict,
215 value: item.value.clone(),
216 expiration: item.expiration,
217
218 });
219
220 true
221 }
222 };
223 }
224
225 pub(crate) fn del<'g>(&'g mut self, key_hash: &u64, conflict: &u64, guard: &'g Guard<'_>) -> Option<(u64, &'g V)> {
226 let index = self.bini(*key_hash);
227
228
229 return match self.data[index].get_mut(key_hash) {
230 None => {
231 None
232 }
233 Some(v) if v.conflict != *conflict && *conflict != 0 => {
234 None
235 }
236 Some(v) => {
237 if v.expiration.is_some() {
238 self.em.del(&v.key, v.expiration.unwrap(), guard);
239 }
240 if let Some(item) = self.data[index].remove(key_hash) {
241 let v = item.value.load(Ordering::SeqCst, guard);
242 assert!(!v.is_null());
243 return Some((item.conflict, unsafe { v.as_ref().unwrap().deref() }));
244 }
245 None
246 }
247 };
248 }
249
250 pub(crate) fn clean_up<'g>(&'g mut self, policy: &mut DefaultPolicy<V>, guard: &'g Guard<'_>) {
251 let maps = self.em.cleanup(policy, None, guard);
252 for (key, conflict) in maps {
253 match self.expiration(&key,
254 guard) {
255 None => { continue; }
256 Some(_v) => {
257 let _cost = policy.cost(&key, guard);
258 policy.del(&key, guard);
259 let _value = self.del(&key, &conflict, guard);
260 }
266 }
267 }
268 }
269}
270
271
272
273#[cfg(test)]
274mod tests {
275 use seize::Collector;
276
277 use crate::bloom::haskey::key_to_hash;
278 use crate::cache::Item;
279 use crate::cache::ItemFlag::ItemNew;
280 use crate::reclaim::Shared;
281 use crate::store::{Node, Store};
282
283 const ITER: u64 = 32 * 1024;
284
285 #[test]
286 fn test_set_get() {
287 let collector = Collector::new();
288 let guard = collector.enter();
289 let mut s = Store::new();
290
291 for i in 0..20 {
292 let (key, confilict) = key_to_hash(&i);
293 let value = Shared::boxed(i + 2, &collector);
294 let node = Node::new(key, confilict, value, None);
295
296 s.set(node, &guard);
297 let v = s.get(key, confilict, &guard);
298 assert_eq!(v, Some(&(i + 2)))
299 }
300 }
301
302 #[test]
303 fn test_set_del() {
304 let collector = Collector::new();
305 let guard = collector.enter();
306 let mut s = Store::new();
307
308 for i in 0..20 {
309 let (key, confilict) = key_to_hash(&i);
310 let value = Shared::boxed(i + 2, &collector);
311 let node = Node::new(key, confilict, value, None);
312
313 s.set(node, &guard);
314 let d = s.del(&key, &confilict, &guard);
315 assert_eq!(d.unwrap().1, &(i + 2));
316
317 let v = s.get(key, confilict, &guard);
318 assert_eq!(v, None);
319 }
320 }
321
322
323 #[test]
324 fn test_set_clear() {
325 let collector = Collector::new();
326 let guard = collector.enter();
327 let mut s = Store::new();
328
329 for i in 0..20 {
330 let (key, confilict) = key_to_hash(&i);
331 let value = Shared::boxed(i + 2, &collector);
332 let node = Node::new(key, confilict, value, None);
333
334 s.set(node, &guard);
335 }
336 s.clear(&guard);
337
338 for i in 0..20 {
339 let (key, confilict) = key_to_hash(&i);
340 let v = s.get(key, confilict, &guard);
341 assert_eq!(v, None)
342 }
343 }
344
345 #[test]
346 fn test_set_update() {
347 let collector = Collector::new();
348 let guard = collector.enter();
349 let mut s = Store::new();
350
351 for i in 0..20 {
352 let (key, confilict) = key_to_hash(&i);
353 let value = Shared::boxed(i + 2, &collector);
354 let node = Node::new(key, confilict, value, None);
355
356 s.set(node, &guard);
357 let v = s.get(key, confilict, &guard);
358 assert_eq!(v, Some(&(i + 2)))
359 }
360
361 for i in 0..20 {
362 let (key, conflict) = key_to_hash(&i);
363 let value = Shared::boxed(i + 4, &collector);
364 let item = Item {
365 flag: ItemNew,
366 key: key,
367 conflict: conflict,
368 value: value.into(),
369 cost: 0,
370 expiration: None,
371 };
372 s.update(&item, &guard);
373 let v = s.get(key, conflict, &guard);
374 assert_eq!(v, Some(&(i + 4)))
375 }
376 }
377
378 #[test]
379 fn test_set_collision() {
380 let collector = Collector::new();
381 let guard = collector.enter();
382 let mut s = Store::new();
383 let value = Shared::boxed(1, &collector);
384
385 let node = Node::new(1, 0, value, None);
386 s.data.get_mut(1).unwrap().insert(1, node);
387 let v = s.get(1, 1, &guard);
388 assert_eq!(v, None);
389
390
391 let value = Shared::boxed(2, &collector);
392 let node = Node::new(1, 1, value, None);
393 s.set(node, &guard);
394 let v = s.get(1, 0, &guard);
395 assert_ne!(v, Some(&2));
396 let item = Item {
397 flag: ItemNew,
398 key: 1,
399 conflict: 1,
400 value: value.into(),
401 cost: 0,
402 expiration: None,
403 };
404 assert_eq!(s.update(&item, &guard), false);
405
406 s.del(&1, &1, &guard);
407 let v = s.get(1, 0, &guard);
408 assert_eq!(v, Some(&1));
409 }
410}
411
412