1use std::borrow::Borrow;
2use std::collections::HashMap;
3use std::fmt;
4use std::fmt::{Debug, Formatter};
5use std::hash::{BuildHasher, Hash, Hasher};
6use std::ops::Deref;
7use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering};
8use std::sync::Mutex;
9use seize::{Collector, Guard};
10use crate::entry::Entry;
11use crate::reclaim::{Atomic, Shared};
12
13macro_rules! load_factor {
14 ($n: expr) => {
15 $n - ($n >> 2)
17 };
18}
19pub struct Map<K, V, S = crate::DefaultHashBuilder> {
29 read: Atomic<ReadOnly<K, V>>,
30 dirty: Atomic<HashMap<K, *mut Entry<V>>>,
31 misses: AtomicUsize,
32 flag_ctl: AtomicIsize,
33 build_hasher: S,
34 collector: Collector,
35 lock: Mutex<()>,
36}
37
38impl<K, V, S> fmt::Debug for Map<K, V, S>
39 where
40 K: Debug,
41 V: Debug,
42{
43 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
44 let guard = self.collector.enter();
45 f.debug_map().finish()
46 }
47}
48
49impl<K, V, S> Clone for Map<K, V, S>
50 where
51 K: Sync + Send + Clone + Hash + Ord,
52 V: Sync + Send + Clone,
53 S: BuildHasher + Clone,
54{
55 fn clone(&self) -> Map<K, V, S> {
56 let mut cloned_map = Map::with_hasher(self.build_hasher.clone());
57
58 {
59 cloned_map.dirty = self.dirty.clone();
60 cloned_map.read = self.read.clone();
61 cloned_map.misses = AtomicUsize::new(self.misses.load(Ordering::SeqCst));
62 cloned_map.flag_ctl = AtomicIsize::new(self.flag_ctl.load(Ordering::SeqCst));
63
64 }
72 cloned_map
73 }
74}
75
76impl<K, V> Map<K, V, crate::DefaultHashBuilder> {
77 pub fn new() -> Self {
90 Self::default()
91 }
92}
93
94impl<K, V, S> Default for Map<K, V, S>
95 where
96 S: Default,
97{
98 fn default() -> Self {
99 Self::with_hasher(S::default())
100 }
101}
102
103impl<K, V, S> Drop for Map<K, V, S> {
104 fn drop(&mut self) {
105 let guard = unsafe { Guard::unprotected() };
106
107 let read = self.read.swap(Shared::null(), Ordering::SeqCst, &guard);
117 if !read.is_null() {
118 let read = unsafe { read.into_box() };
119 drop(read);
120 }
121 let moved = self.dirty.swap(Shared::null(), Ordering::SeqCst, &guard);
122 if moved.is_null() {
123 return;
124 }
125 assert!(
126 !moved.is_null(),
127 "self.moved is initialized together with the table"
128 );
129
130 let moved = unsafe { moved.into_box() };
132 drop(moved);
133 }
134}
135
136
137impl<K, V, S> Map<K, V, S> {
138 pub fn with_hasher(hash_builder: S) -> Self {
158 Self {
159 read: Atomic::null(),
160 dirty: Atomic::null(),
161 misses: AtomicUsize::new(0),
162 flag_ctl: AtomicIsize::new(0),
163 build_hasher: hash_builder,
164 collector: Collector::new(),
165 lock: Mutex::new(()),
166 }
167 }
168
169 pub fn guard(&self) -> Guard<'_> {
174 self.collector.enter()
175 }
176
177 #[inline]
178 fn check_guard(&self, guard: &Guard<'_>) {
179 if let Some(c) = guard.collector() {
181 assert!(Collector::ptr_eq(c, &self.collector));
182 }
183 }
184
185 fn init_table<'g>(&'g self, guard: &'g Guard<'_>) -> Shared<'g, ReadOnly<K, V>> {
186 loop {
187 let table = self.read.load(Ordering::SeqCst, guard);
188 if !table.is_null() {
191 break table;
192 }
193 let mut flag = self.flag_ctl.load(Ordering::SeqCst);
195 if flag < 0 {
196 std::thread::yield_now();
198 continue;
199 }
200
201 if self.flag_ctl
202 .compare_exchange(flag, -1, Ordering::SeqCst, Ordering::Relaxed).is_ok() {
203 let mut table = self.read.load(Ordering::SeqCst, guard);
204 if table.is_null() {
205 let n = if flag > 0 {
206 flag as usize
207 } else {
208 1
209 };
210 table = Shared::boxed(ReadOnly::new(), &self.collector);
211 self.read.store(table, Ordering::SeqCst);
212 let m = Shared::boxed(HashMap::new(), &self.collector);
213 self.dirty.store(m, Ordering::SeqCst);
214 flag = load_factor!(n as isize)
215 }
216 self.flag_ctl.store(flag, Ordering::SeqCst);
217 break table;
218 }
219 }
220 }
221}
222
223impl<K, V, S> Map<K, V, S>
224 where
225 K: Clone + Hash + Ord,
226 S: BuildHasher,
227{
228 #[inline]
229 fn hash<Q: ?Sized + Hash>(&self, key: &Q) -> u64 {
230 let mut h = self.build_hasher.build_hasher();
231 key.hash(&mut h);
232 h.finish()
233 }
234
235 pub fn len(&self) -> usize {
249 let guard = self.guard();
250 let map = self.dirty.load(Ordering::SeqCst, &guard);
251 if map.is_null() {
252 return 0;
253 }
254 unsafe { map.deref() }.len()
255 }
256
257 #[inline]
280 pub fn get<'g, Q>(&'g self, key: &Q, guard: &'g Guard<'_>) -> Option<&'g V>
281 where
282 K: Borrow<Q>,
283 Q: ?Sized + Hash + Ord,
284 {
285 self.check_guard(guard);
286
287 let read = self.read.load(Ordering::SeqCst, guard);
288 if read.is_null() {
289 return None;
290 }
291 let r = unsafe { read.deref() };
292 let mut e = r.m.get(key);
293 if e.is_none() && r.amended {
294 let lock = self.lock.lock();
295 let read = self.read.load(Ordering::SeqCst, guard);
296 let r = unsafe { read.deref() };
297 e = r.m.get(key);
298 if e.is_none() && r.amended {
299 let dirty = self.dirty.load(Ordering::SeqCst, guard);
300 if dirty.is_null() {
301 drop(lock);
302 return None;
303 }
304 e = unsafe { dirty.deref() }.get(key);
305 self.miss_locked(guard);
306 }
307 drop(lock)
308 }
309 if e.is_none() {
310 return None;
311 }
312
313 unsafe { e.unwrap().as_ref().unwrap().load(guard) }
323 }
324
325
326 fn miss_locked<'g>(&'g self, guard: &'g Guard) {
327 let miss = self.misses.fetch_add(1, Ordering::SeqCst);
328
329 let dirty = self.dirty.load(Ordering::SeqCst, guard);
330 if dirty.is_null() {
331 return;
332 }
333 if miss < unsafe { dirty.deref() }.len() {
334 return;
335 }
336 let mut map = HashMap::new();
337
338 for (key, value) in unsafe { dirty.deref() }.deref() {
339 map.insert(key.clone(), *value);
340 }
341 let read_only_map = Shared::boxed(ReadOnly {
342 m: map,
343 amended: false,
344 }, &self.collector);
345 self.read.store(read_only_map, Ordering::SeqCst);
346 let old_map = self.dirty.load(Ordering::SeqCst, guard);
347 if !old_map.is_null() {
348 self.dirty.compare_exchange(old_map, Shared::null(), Ordering::AcqRel, Ordering::Acquire, guard);
349 }
350 self.misses.store(0, Ordering::SeqCst);
351 }
352}
353
354impl<K, V, S> Map<K, V, S>
355 where
356 K: Sync + Send + Clone + Hash + Ord,
357 V: Sync + Send,
358 S: BuildHasher,
359{
360 pub fn insert<'g>(&'g self, key: K, value: V, guard: &'g Guard<'_>) {
380 self.check_guard(guard);
381 self.put(key, value, false, guard)
382 }
383
384 fn put<'g>(
385 &'g self,
386 mut key: K,
387 value: V,
388 no_replacement: bool,
389 guard: &'g Guard<'_>,
390 ) {
391 let mut table = self.read.load(Ordering::SeqCst, guard);
392 let entry_value = Shared::boxed(value, &self.collector);
393 loop {
394 if table.is_null() {
395 table = self.init_table(guard);
396 continue;
397 }
398
399 let read = unsafe { table.deref() };
400
401
402 if let Some(v) = read.m.get(&key) {
403 if unsafe { v.as_ref().unwrap() }.try_store(entry_value, guard) {
404 return;
405 }
406 }
407
408 let lock = self.lock.lock();
409 match read.m.get(&key) {
411 Some(e) => {
412 if unsafe { e.as_ref().unwrap() }.unexpunge_locked(guard) {
413 let mut table = self.read.load(Ordering::SeqCst, guard);
416 unsafe {
417 let read = table.as_ptr();
418 let read = read.as_mut().unwrap();
419 read
420 }.m.insert(key.clone(), *e);
421 }
422 unsafe { e.as_ref().unwrap() }.store_locked(entry_value, guard);
423 }
424 None => {
425 let dirty = self.dirty.load(Ordering::SeqCst, guard);
426 if dirty.is_null() {
427 table = self.read.load(Ordering::SeqCst, guard);
428 let m = Shared::boxed(HashMap::new(), &self.collector);
429 self.dirty.store(m, Ordering::SeqCst);
430 drop(lock);
431 continue;
432 }
433 let d = unsafe { dirty.deref() };
435 if !d.is_empty() {
436 if let Some(e) = d.get(&key) {
437 unsafe { e.as_ref() }.unwrap().store_locked(entry_value, guard);
438 drop(lock);
439 break;
440 }
441 }
442
443 if !read.amended {
444 self.dirty_locked(key, entry_value, guard);
447 let shard = self.read.load(Ordering::SeqCst, guard);
448 let mut map = HashMap::new();
449 for (key, value) in &unsafe { shard.deref() }.m {
450 map.insert(key.clone(), *value);
451 }
452 let shard_map = Shared::boxed(ReadOnly {
453 m: map,
454 amended: true,
455 }, &self.collector);
456 self.read.store(shard_map, Ordering::SeqCst);
457 drop(lock);
458 break;
459 }
460 let dirty2 = self.dirty.load(Ordering::SeqCst, guard);
461 if dirty != dirty2 {
462 continue;
463 }
464 let mut entry = Entry {
466 p: Atomic::null(),
467 expunged: Atomic::null(),
468 };
469
470 entry.p.store(entry_value, Ordering::SeqCst);
471 unsafe {
472 let dirty2 = dirty2.as_ptr();
473 dirty2.as_mut().unwrap().insert(key.clone(), Box::into_raw(Box::new(entry)));
474 };
475 }
476 }
477
478 drop(lock);
479 break;
480 }
481 }
482
483
484 pub fn remove<'g, Q>(&'g self, key: &Q, guard: &'g Guard<'_>) -> Option<&'g V>
504 where
505 K: Borrow<Q>,
506 Q: ?Sized + Hash + Ord,
507 {
508 self.check_guard(guard);
512
513 let mut read = self.read.load(Ordering::SeqCst, guard);
514 loop {
515 if read.is_null() {
516 break None;
517 }
518
519 let r = unsafe { read.deref() };
520 let mut remove_el: Option<*mut Entry<V>> = None;
521 let mut e = r.m.get(&key);
522 if e.is_none() && r.amended {
523 let lock = self.lock.lock();
524
525 e = r.m.get(&key);
526 if e.is_none() && r.amended {
527 let dirty = self.dirty.load(Ordering::SeqCst, guard);
528 if dirty.is_null() {
529 read = self.read.load(Ordering::SeqCst, guard);
530 drop(lock);
531 continue;
532 }
533 e = unsafe { dirty.deref() }.get(&key);
534
535 let dirty = unsafe { dirty.as_ptr() };
536
537 remove_el = unsafe { dirty.as_mut().unwrap().remove(&key) };
538
539 self.miss_locked(guard);
540 }
541 drop(lock)
542 } else {
543 if let Some(e) = e {
544 return unsafe { e.as_mut().unwrap().remove(guard) };
545 }
546 }
547
548 if remove_el.is_some() {
549 let data = unsafe { remove_el.unwrap().as_mut().unwrap().remove(guard) };
550 break data;
551 }
552 break None;
553 }
554 }
555
556 fn dirty_locked<'g>(&'g self, key: K, entry_value: Shared<V>, guard: &Guard<'_>) {
557 let dirty = self.dirty.load(Ordering::SeqCst, guard);
558 if dirty.is_null() {
559 return;
560 }
561 let read = self.read.load(Ordering::SeqCst, guard);
562 let mut map = HashMap::with_capacity(unsafe { read.deref() }.m.len());
563 for (key, value) in &unsafe { read.deref() }.m {
564 if !unsafe { value.as_ref().unwrap() }.try_unexpunge_locked(guard) {
565 map.insert(key.clone(), *value);
566 }
567 }
568 let entry = Entry {
569 p: Atomic::null(),
570 expunged: Atomic::null(),
571 };
572
573 entry.p.store(entry_value, Ordering::SeqCst);
574
575 map.insert(key, Box::into_raw(Box::new(entry)));
576 self.dirty.store(Shared::boxed(map, &self.collector), Ordering::SeqCst)
577 }
578}
579
580impl<K, V, S> Map<K, V, S>
581 where
582 K: Clone + Ord,
583{
584 pub fn clear<'g>(&'g self, guard: &'g Guard<'_>) {
597 let lock = self.lock.lock();
598
599 self.dirty.store(Shared::boxed(HashMap::new(), &self.collector), Ordering::SeqCst);
600 let read = self.read.load(Ordering::SeqCst, guard);
601 self.read.store(Shared::boxed(ReadOnly::new(), &self.collector), Ordering::SeqCst);
602 let sc = self.misses.load(Ordering::SeqCst);
603 self.misses.compare_exchange(sc, 0, Ordering::AcqRel, Ordering::Acquire).expect("change miess");
604
605 drop(lock);
606 }
607}
608
609
610struct ReadOnly<K, V> {
611 m: HashMap<K, *mut Entry<V>>,
612 amended: bool,
613}
614
615impl<K, V> ReadOnly<K, V> {
616 fn new() -> Self <> {
617 Self {
618 m: HashMap::new(),
619 amended: false,
620 }
621 }
622}
623
624
625#[cfg(test)]
626mod tests {
627 use super::*;
628 use std::sync::Arc;
629 use std::thread;
630 use std::time::Duration;
631 use rayon;
632 use rayon::prelude::*;
633
634 use crate::reclaim::Shared;
635 use super::*;
636
637 const ITER: u64 = 32 * 1024;
638
639 #[test]
640 #[cfg_attr(miri, ignore)]
641 fn remove_and_insert() {
642 let map = Arc::new(Map::<usize, usize>::new());
643 let guard = map.guard();
644 map.insert(1, 1, &guard);
645 assert_eq!(map.remove(&1, &guard), Some(&1));
646 assert_eq!(map.remove(&1, &guard), None)
647 }
648
649 #[test]
650 #[cfg_attr(miri, ignore)]
651 fn concurrent_insert() {
652 let map = Arc::new(Map::<usize, usize>::new());
653
654 let map1 = map.clone();
655 let t1 = std::thread::spawn(move || {
656 for i in 0..5000 {
657 map1.insert(i, 0, &map1.guard());
658 }
659 });
660 let map2 = map.clone();
661 let t2 = std::thread::spawn(move || {
662 for i in 0..5000 {
663 map2.insert(i, 1, &map2.guard());
664 }
665 });
666
667 t1.join().unwrap();
668 t2.join().unwrap();
669
670 thread::sleep(Duration::from_micros(1000));
671 let mut missed = 0;
672 let guard = map.guard();
673 for i in 0..5000 {
674 let v = map.get(&i, &guard);
675 if v.is_some() {
676 assert!(v == Some(&0) || v == Some(&1));
677 } else {
678 missed += 1;
679 }
680
681 }
684
685 println!("missed {}", missed)
686 }
687}