syncmap/
map.rs

1use std::borrow::Borrow;
2use std::collections::HashMap;
3use std::fmt;
4use std::fmt::{Debug, Formatter};
5use std::hash::{BuildHasher, Hash, Hasher};
6use std::ops::Deref;
7use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering};
8use std::sync::Mutex;
9use seize::{Collector, Guard};
10use crate::entry::Entry;
11use crate::reclaim::{Atomic, Shared};
12
13macro_rules! load_factor {
14    ($n: expr) => {
15        // ¾ n = n - n/4 = n - (n >> 2)
16        $n - ($n >> 2)
17    };
18}
19///Map is like a  Hashmap but is safe for concurrent use by multiple thread without additional locking or coordination.
20/// Loads, stores, and deletes run in amortized constant time.
21///The Map type is specialized. Most code should use a plain Rust HashMap instead, with separate locking or coordination, f
22/// or better type safety and to make it easier to maintain other invariants along with the map content.
23///The Map type is optimized for two common use cases: (1) when the entry for a given key is
24/// only ever written once but read many times, as in caches that only grow, or (2) when
25/// multiple thread read, write, and overwrite entries for disjoint sets of keys. In these two cases,
26/// use of a Map may significantly reduce lock contention compared to a Rust HashMap paired with a
27/// separate Mutex
28pub struct Map<K, V, S = crate::DefaultHashBuilder> {
29    read: Atomic<ReadOnly<K, V>>,
30    dirty: Atomic<HashMap<K, *mut Entry<V>>>,
31    misses: AtomicUsize,
32    flag_ctl: AtomicIsize,
33    build_hasher: S,
34    collector: Collector,
35    lock: Mutex<()>,
36}
37
38impl<K, V, S> fmt::Debug for Map<K, V, S>
39    where
40        K: Debug,
41        V: Debug,
42{
43    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
44        let guard = self.collector.enter();
45        f.debug_map().finish()
46    }
47}
48
49impl<K, V, S> Clone for Map<K, V, S>
50    where
51        K: Sync + Send + Clone + Hash + Ord,
52        V: Sync + Send + Clone,
53        S: BuildHasher + Clone,
54{
55    fn clone(&self) -> Map<K, V, S> {
56        let mut cloned_map = Map::with_hasher(self.build_hasher.clone());
57
58        {
59            cloned_map.dirty = self.dirty.clone();
60            cloned_map.read = self.read.clone();
61            cloned_map.misses = AtomicUsize::new(self.misses.load(Ordering::SeqCst));
62            cloned_map.flag_ctl = AtomicIsize::new(self.flag_ctl.load(Ordering::SeqCst));
63
64            // let dirty = self.dirty.load(Ordering::SeqCst, &guard);
65            // if !dirty.is_null() {
66            //     for (key, value) in unsafe { dirty.deref() }.deref() {
67            //         let value = unsafe { (value.as_ref().unwrap()).p.load(Ordering::SeqCst, &guard).deref().deref() };
68            //         cloned_map.insert(key.clone(), value.clone(), &cloned_guard)
69            //     }
70            // }
71        }
72        cloned_map
73    }
74}
75
76impl<K, V> Map<K, V, crate::DefaultHashBuilder> {
77    /// Creates an empty `HashMap`.
78    ///
79    /// The hash map is initially created with a capacity of 0, so it will not allocate until it
80    /// is first inserted into.
81    ///
82    /// # Examples
83    ///
84    /// ```
85    ///
86    /// use syncmap::map::Map;
87    /// let map: Map<&str, i32> = Map::new();
88    /// ```
89    pub fn new() -> Self {
90        Self::default()
91    }
92}
93
94impl<K, V, S> Default for Map<K, V, S>
95    where
96        S: Default,
97{
98    fn default() -> Self {
99        Self::with_hasher(S::default())
100    }
101}
102
103impl<K, V, S> Drop for Map<K, V, S> {
104    fn drop(&mut self) {
105        let guard = unsafe { Guard::unprotected() };
106
107        // let read = self.read.swap(Shared::null(), Ordering::SeqCst, &guard);
108        // assert!(
109        //     !read.is_null(),
110        //     "self.moved is initialized together with the table"
111        // );
112        //
113        // // safety: we have mut access to self, so no-one else will drop this value under us.
114        // let read = unsafe { read.into_box() };
115        // drop(read);
116         let read = self.read.swap(Shared::null(), Ordering::SeqCst, &guard);
117        if !read.is_null() {
118            let read = unsafe { read.into_box() };
119            drop(read);
120        }
121        let moved = self.dirty.swap(Shared::null(), Ordering::SeqCst, &guard);
122        if moved.is_null() {
123            return;
124        }
125        assert!(
126            !moved.is_null(),
127            "self.moved is initialized together with the table"
128        );
129
130        // safety: we have mut access to self, so no-one else will drop this value under us.
131        let moved = unsafe { moved.into_box() };
132        drop(moved);
133    }
134}
135
136
137impl<K, V, S> Map<K, V, S> {
138    /// Creates an empty map which will use `hash_builder` to hash keys.
139    ///
140    /// The created map has the default initial capacity.
141    ///
142    /// Warning: `hash_builder` is normally randomly generated, and is designed to
143    /// allow the map to be resistant to attacks that cause many collisions and
144    /// very poor performance. Setting it manually using this
145    /// function can expose a DoS attack vector.
146    ///
147    /// # Examples
148    ///
149    /// ```
150    ///
151    /// use syncmap::DefaultHashBuilder;
152    /// use syncmap::map::Map;
153    /// let map = Map::with_hasher(DefaultHashBuilder::default());
154    ///   let guard = map.guard();
155    /// map.insert(1, 2,&guard);
156    /// ```
157    pub fn with_hasher(hash_builder: S) -> Self {
158        Self {
159            read: Atomic::null(),
160            dirty: Atomic::null(),
161            misses: AtomicUsize::new(0),
162            flag_ctl: AtomicIsize::new(0),
163            build_hasher: hash_builder,
164            collector: Collector::new(),
165            lock: Mutex::new(()),
166        }
167    }
168
169    /// Pin a `Guard` for use with this map.
170    ///
171    /// Keep in mind that for as long as you hold onto this `Guard`, you are preventing the
172    /// collection of garbage generated by the map.
173    pub fn guard(&self) -> Guard<'_> {
174        self.collector.enter()
175    }
176
177    #[inline]
178    fn check_guard(&self, guard: &Guard<'_>) {
179        // guard.collector() may be `None` if it is unprotected
180        if let Some(c) = guard.collector() {
181            assert!(Collector::ptr_eq(c, &self.collector));
182        }
183    }
184
185    fn init_table<'g>(&'g self, guard: &'g Guard<'_>) -> Shared<'g, ReadOnly<K, V>> {
186        loop {
187            let table = self.read.load(Ordering::SeqCst, guard);
188            // safety: we loaded the ReadOnly while the thread was marked as active.
189            // ReadOnly won't be deallocated until the guard is dropped at the earliest.
190            if !table.is_null() {
191                break table;
192            }
193            //try allocate ReadOnly
194            let mut flag = self.flag_ctl.load(Ordering::SeqCst);
195            if flag < 0 {
196                //lost tje init race; just spin
197                std::thread::yield_now();
198                continue;
199            }
200
201            if self.flag_ctl
202                .compare_exchange(flag, -1, Ordering::SeqCst, Ordering::Relaxed).is_ok() {
203                let mut table = self.read.load(Ordering::SeqCst, guard);
204                if table.is_null() {
205                    let n = if flag > 0 {
206                        flag as usize
207                    } else {
208                        1
209                    };
210                    table = Shared::boxed(ReadOnly::new(), &self.collector);
211                    self.read.store(table, Ordering::SeqCst);
212                    let m = Shared::boxed(HashMap::new(), &self.collector);
213                    self.dirty.store(m, Ordering::SeqCst);
214                    flag = load_factor!(n as isize)
215                }
216                self.flag_ctl.store(flag, Ordering::SeqCst);
217                break table;
218            }
219        }
220    }
221}
222
223impl<K, V, S> Map<K, V, S>
224    where
225        K: Clone + Hash + Ord,
226        S: BuildHasher,
227{
228    #[inline]
229    fn hash<Q: ?Sized + Hash>(&self, key: &Q) -> u64 {
230        let mut h = self.build_hasher.build_hasher();
231        key.hash(&mut h);
232        h.finish()
233    }
234
235    /// Returns the number of entries in the map.
236    ///
237    /// # Examples
238    ///
239    /// ```
240    /// use syncmap::map::Map;
241    ///
242    /// let map = Map::new();
243    /// let guard = map.guard();
244    /// map.insert(1, "a",&guard);
245    /// map.insert(2, "b",&guard);
246    /// assert!(map.len() == 2);
247    /// ```
248    pub fn len(&self) -> usize {
249        let guard = self.guard();
250        let map = self.dirty.load(Ordering::SeqCst, &guard);
251        if map.is_null() {
252            return 0;
253        }
254        unsafe { map.deref() }.len()
255    }
256
257    /// Returns a reference to the value corresponding to the key.
258    ///
259    /// The key may be any borrowed form of the map's key type, but
260    /// [`Hash`] and [`Ord`] on the borrowed form *must* match those for
261    /// the key type.
262    ///
263    /// [`Ord`]: std::cmp::Ord
264    /// [`Hash`]: std::hash::Hash
265    ///
266    /// To obtain a `Guard`, use [`HashMap::guard`].
267    ///
268    /// # Examples
269    ///
270    /// ```
271    ///
272    /// use syncmap::map::Map;
273    /// let map = Map::new();
274    /// let guard = map.guard();
275    /// map.insert(1,"a",&guard);
276    /// assert_eq!(map.get(&1,&guard), Some(&"a"));
277    /// assert_eq!(map.get(&2,&guard), None);
278    /// ```
279    #[inline]
280    pub fn get<'g, Q>(&'g self, key: &Q, guard: &'g Guard<'_>) -> Option<&'g V>
281        where
282            K: Borrow<Q>,
283            Q: ?Sized + Hash + Ord,
284    {
285        self.check_guard(guard);
286
287        let read = self.read.load(Ordering::SeqCst, guard);
288        if read.is_null() {
289            return None;
290        }
291        let r = unsafe { read.deref() };
292        let mut e = r.m.get(key);
293        if e.is_none() && r.amended {
294            let lock = self.lock.lock();
295            let read = self.read.load(Ordering::SeqCst, guard);
296            let r = unsafe { read.deref() };
297            e = r.m.get(key);
298            if e.is_none() && r.amended {
299                let dirty = self.dirty.load(Ordering::SeqCst, guard);
300                if dirty.is_null() {
301                    drop(lock);
302                    return None;
303                }
304                e = unsafe { dirty.deref() }.get(key);
305                self.miss_locked(guard);
306            }
307            drop(lock)
308        }
309        if e.is_none() {
310            return None;
311        }
312
313        /*  let v = unsafe { Box::from_raw(e.unwrap().as_mut().unwrap()) };
314          let p = v.p.load(Ordering::SeqCst, &guard);
315          if p.is_null() {
316              return None;
317          }
318          if let Some(p) = unsafe {p.as_ref()} {
319              let v = &**p;
320              return Some(v)
321          }*/
322        unsafe { e.unwrap().as_ref().unwrap().load(guard) }
323    }
324
325
326    fn miss_locked<'g>(&'g self, guard: &'g Guard) {
327        let miss = self.misses.fetch_add(1, Ordering::SeqCst);
328
329        let dirty = self.dirty.load(Ordering::SeqCst, guard);
330        if dirty.is_null() {
331            return;
332        }
333        if miss < unsafe { dirty.deref() }.len() {
334            return;
335        }
336        let mut map = HashMap::new();
337
338        for (key, value) in unsafe { dirty.deref() }.deref() {
339            map.insert(key.clone(), *value);
340        }
341        let read_only_map = Shared::boxed(ReadOnly {
342            m: map,
343            amended: false,
344        }, &self.collector);
345        self.read.store(read_only_map, Ordering::SeqCst);
346        let old_map = self.dirty.load(Ordering::SeqCst, guard);
347        if !old_map.is_null() {
348            self.dirty.compare_exchange(old_map, Shared::null(), Ordering::AcqRel, Ordering::Acquire, guard);
349        }
350        self.misses.store(0, Ordering::SeqCst);
351    }
352}
353
354impl<K, V, S> Map<K, V, S>
355    where
356        K: Sync + Send + Clone + Hash + Ord,
357        V: Sync + Send,
358        S: BuildHasher,
359{
360    /// Inserts a key-value pair into the map.
361    ///
362    /// If the map did not have this key present, [`None`] is returned.
363    ///
364    /// If the map did have this key present, the value is updated, and the old
365    /// value is returned. The key is left unchanged. See the [std-collections
366    /// documentation] for more.
367    ///
368    /// [`None`]: std::option::Option::None
369    /// [std-collections documentation]: https://doc.rust-lang.org/std/collections/index.html#insert-and-complex-keys
370    ///
371    /// # Examples
372    ///
373    /// ```
374    ///
375    /// use syncmap::map::Map;
376    /// let map = Map::new();
377    /// let guard = map.guard();
378    /// map.insert(1,1,&guard)
379    pub fn insert<'g>(&'g self, key: K, value: V, guard: &'g Guard<'_>) {
380        self.check_guard(guard);
381        self.put(key, value, false, guard)
382    }
383
384    fn put<'g>(
385        &'g self,
386        mut key: K,
387        value: V,
388        no_replacement: bool,
389        guard: &'g Guard<'_>,
390    ) {
391        let mut table = self.read.load(Ordering::SeqCst, guard);
392        let entry_value = Shared::boxed(value, &self.collector);
393        loop {
394            if table.is_null() {
395                table = self.init_table(guard);
396                continue;
397            }
398
399            let read = unsafe { table.deref() };
400
401
402            if let Some(v) = read.m.get(&key) {
403                if unsafe { v.as_ref().unwrap() }.try_store(entry_value, guard) {
404                    return;
405                }
406            }
407
408            let lock = self.lock.lock();
409            // TODO need to load readonlu again
410            match read.m.get(&key) {
411                Some(e) => {
412                    if unsafe { e.as_ref().unwrap() }.unexpunge_locked(guard) {
413                        // The entry was previously expunged, which implies that there is a
414                        // non-nil dirty map and this entry is not in it.
415                        let mut table = self.read.load(Ordering::SeqCst, guard);
416                        unsafe {
417                            let read = table.as_ptr();
418                            let read = read.as_mut().unwrap();
419                            read
420                        }.m.insert(key.clone(), *e);
421                    }
422                    unsafe { e.as_ref().unwrap() }.store_locked(entry_value, guard);
423                }
424                None => {
425                    let dirty = self.dirty.load(Ordering::SeqCst, guard);
426                    if dirty.is_null() {
427                        table = self.read.load(Ordering::SeqCst, guard);
428                        let m = Shared::boxed(HashMap::new(), &self.collector);
429                        self.dirty.store(m, Ordering::SeqCst);
430                        drop(lock);
431                        continue;
432                    }
433                    // TODO: check the dirty is null here
434                    let d = unsafe { dirty.deref() };
435                    if !d.is_empty() {
436                        if let Some(e) = d.get(&key) {
437                            unsafe { e.as_ref() }.unwrap().store_locked(entry_value, guard);
438                            drop(lock);
439                            break;
440                        }
441                    }
442
443                    if !read.amended {
444                        // We're adding the first new key to the dirty map.
445                        // Make sure it is allocated and mark the read-only map as incomplete.
446                        self.dirty_locked(key, entry_value, guard);
447                        let shard = self.read.load(Ordering::SeqCst, guard);
448                        let mut map = HashMap::new();
449                        for (key, value) in &unsafe { shard.deref() }.m {
450                            map.insert(key.clone(), *value);
451                        }
452                        let shard_map = Shared::boxed(ReadOnly {
453                            m: map,
454                            amended: true,
455                        }, &self.collector);
456                        self.read.store(shard_map, Ordering::SeqCst);
457                        drop(lock);
458                        break;
459                    }
460                    let dirty2 = self.dirty.load(Ordering::SeqCst, guard);
461                    if dirty != dirty2 {
462                        continue;
463                    }
464                    //save entry;
465                    let mut entry = Entry {
466                        p: Atomic::null(),
467                        expunged: Atomic::null(),
468                    };
469
470                    entry.p.store(entry_value, Ordering::SeqCst);
471                    unsafe {
472                        let dirty2 = dirty2.as_ptr();
473                        dirty2.as_mut().unwrap().insert(key.clone(), Box::into_raw(Box::new(entry)));
474                    };
475                }
476            }
477
478            drop(lock);
479            break;
480        }
481    }
482
483
484    /// Removes a key-value pair from the map, and returns the removed value (if any).
485    ///
486    /// The key may be any borrowed form of the map's key type, but
487    /// [`Hash`] and [`Ord`] on the borrowed form *must* match those for
488    /// the key type.
489    ///
490    /// [`Ord`]: std::cmp::Ord
491    /// [`Hash`]: std::hash::Hash
492    ///
493    /// # Examples
494    ///
495    /// ```
496    ///
497    /// use syncmap::map::Map;
498    /// let map = Map::new();
499    /// map.insert(1, "a",&map.guard());
500    /// assert_eq!(map.remove(&1,&map.guard()), Some(&"a"));
501    /// assert_eq!(map.remove(&1,&map.guard()), None);
502    /// ```
503    pub fn remove<'g, Q>(&'g self, key: &Q, guard: &'g Guard<'_>) -> Option<&'g V>
504        where
505            K: Borrow<Q>,
506            Q: ?Sized + Hash + Ord,
507    {
508        // NOTE: _technically_, this method shouldn't require the thread-safety bounds, but a) that
509        // would require special-casing replace_node for when new_value.is_none(), and b) it's sort
510        // of useless to call remove on a collection that you know you can never insert into.
511        self.check_guard(guard);
512
513        let mut read = self.read.load(Ordering::SeqCst, guard);
514        loop {
515            if read.is_null() {
516                break None;
517            }
518
519            let r = unsafe { read.deref() };
520            let mut remove_el: Option<*mut Entry<V>> = None;
521            let mut e = r.m.get(&key);
522            if e.is_none() && r.amended {
523                let lock = self.lock.lock();
524
525                e = r.m.get(&key);
526                if e.is_none() && r.amended {
527                    let dirty = self.dirty.load(Ordering::SeqCst, guard);
528                    if dirty.is_null() {
529                        read = self.read.load(Ordering::SeqCst, guard);
530                        drop(lock);
531                        continue;
532                    }
533                    e = unsafe { dirty.deref() }.get(&key);
534
535                    let dirty = unsafe { dirty.as_ptr() };
536
537                    remove_el = unsafe { dirty.as_mut().unwrap().remove(&key) };
538
539                    self.miss_locked(guard);
540                }
541                drop(lock)
542            } else {
543                if let Some(e) = e {
544                    return unsafe { e.as_mut().unwrap().remove(guard) };
545                }
546            }
547
548            if remove_el.is_some() {
549                let data = unsafe { remove_el.unwrap().as_mut().unwrap().remove(guard) };
550                break data;
551            }
552            break None;
553        }
554    }
555
556    fn dirty_locked<'g>(&'g self, key: K, entry_value: Shared<V>, guard: &Guard<'_>) {
557        let dirty = self.dirty.load(Ordering::SeqCst, guard);
558        if dirty.is_null() {
559            return;
560        }
561        let read = self.read.load(Ordering::SeqCst, guard);
562        let mut map = HashMap::with_capacity(unsafe { read.deref() }.m.len());
563        for (key, value) in &unsafe { read.deref() }.m {
564            if !unsafe { value.as_ref().unwrap() }.try_unexpunge_locked(guard) {
565                map.insert(key.clone(), *value);
566            }
567        }
568        let entry = Entry {
569            p: Atomic::null(),
570            expunged: Atomic::null(),
571        };
572
573        entry.p.store(entry_value, Ordering::SeqCst);
574
575        map.insert(key, Box::into_raw(Box::new(entry)));
576        self.dirty.store(Shared::boxed(map, &self.collector), Ordering::SeqCst)
577    }
578}
579
580impl<K, V, S> Map<K, V, S>
581    where
582        K: Clone + Ord,
583{
584    /// Clears the map, removing all key-value pairs.
585    ///
586    /// # Examples
587    ///
588    /// ```
589    ///
590    /// use syncmap::map::Map;
591    /// let map = Map::new();
592    /// let guard = map.guard();
593    /// map.insert(1, "a",&guard);
594    /// map.clear(&guard);
595    /// ```
596    pub fn clear<'g>(&'g self, guard: &'g Guard<'_>) {
597        let lock = self.lock.lock();
598
599        self.dirty.store(Shared::boxed(HashMap::new(), &self.collector), Ordering::SeqCst);
600        let read = self.read.load(Ordering::SeqCst, guard);
601        self.read.store(Shared::boxed(ReadOnly::new(), &self.collector), Ordering::SeqCst);
602        let sc = self.misses.load(Ordering::SeqCst);
603        self.misses.compare_exchange(sc, 0, Ordering::AcqRel, Ordering::Acquire).expect("change miess");
604
605        drop(lock);
606    }
607}
608
609
610struct ReadOnly<K, V> {
611    m: HashMap<K, *mut Entry<V>>,
612    amended: bool,
613}
614
615impl<K, V> ReadOnly<K, V> {
616    fn new() -> Self <> {
617        Self {
618            m: HashMap::new(),
619            amended: false,
620        }
621    }
622}
623
624
625#[cfg(test)]
626mod tests {
627    use super::*;
628    use std::sync::Arc;
629    use std::thread;
630    use std::time::Duration;
631    use rayon;
632    use rayon::prelude::*;
633
634    use crate::reclaim::Shared;
635    use super::*;
636
637    const ITER: u64 = 32 * 1024;
638
639    #[test]
640    #[cfg_attr(miri, ignore)]
641    fn remove_and_insert() {
642        let map = Arc::new(Map::<usize, usize>::new());
643        let guard = map.guard();
644        map.insert(1, 1, &guard);
645        assert_eq!(map.remove(&1, &guard), Some(&1));
646        assert_eq!(map.remove(&1, &guard), None)
647    }
648
649    #[test]
650    #[cfg_attr(miri, ignore)]
651    fn concurrent_insert() {
652        let map = Arc::new(Map::<usize, usize>::new());
653
654        let map1 = map.clone();
655        let t1 = std::thread::spawn(move || {
656            for i in 0..5000 {
657                map1.insert(i, 0, &map1.guard());
658            }
659        });
660        let map2 = map.clone();
661        let t2 = std::thread::spawn(move || {
662            for i in 0..5000 {
663                map2.insert(i, 1, &map2.guard());
664            }
665        });
666
667        t1.join().unwrap();
668        t2.join().unwrap();
669
670        thread::sleep(Duration::from_micros(1000));
671        let mut missed = 0;
672        let guard = map.guard();
673        for i in 0..5000 {
674            let v = map.get(&i, &guard);
675            if v.is_some() {
676                assert!(v == Some(&0) || v == Some(&1));
677            } else {
678                missed += 1;
679            }
680
681            // let kv = map.get_key_value(&i, &guard).unwrap();
682            // assert!(kv == (&i, &0) || kv == (&i, &1));
683        }
684
685        println!("missed {}", missed)
686    }
687}