lightning/
map.rs

1// usize to usize lock-free, wait free table
2use crate::align_padding;
3use alloc::vec::Vec;
4use core::alloc::{GlobalAlloc, Layout};
5use core::hash::Hasher;
6use core::marker::PhantomData;
7use core::ops::Deref;
8use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
9use core::sync::atomic::{compiler_fence, fence, AtomicU64, AtomicUsize};
10use core::{intrinsics, mem, ptr};
11use crossbeam_epoch::*;
12use std::alloc::System;
13use std::collections::hash_map::DefaultHasher;
14use std::hash::Hash;
15use std::ops::DerefMut;
16use std::os::raw::c_void;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19pub struct EntryTemplate(usize, usize);
20
21const EMPTY_KEY: usize = 0;
22const EMPTY_VALUE: usize = 0;
23const SENTINEL_VALUE: usize = 1;
24const TOMBSTONE_VALUE: usize = 2;
25const VAL_BIT_MASK: usize = !0 << 1 >> 1;
26const INV_VAL_BIT_MASK: usize = !VAL_BIT_MASK;
27const MUTEX_BIT_MASK: usize = !WORD_MUTEX_DATA_BIT_MASK & VAL_BIT_MASK;
28const ENTRY_SIZE: usize = mem::size_of::<EntryTemplate>();
29
30struct Value {
31    raw: usize,
32    parsed: ParsedValue,
33}
34
35enum ParsedValue {
36    Val(usize), // None for tombstone
37    Prime(usize),
38    Sentinel,
39    Empty,
40}
41
42#[derive(Debug)]
43enum ModResult<V> {
44    Replaced(usize, V, usize), // (origin fval, val, index)
45    Existed(usize, V),
46    Fail,
47    Sentinel,
48    NotFound,
49    Done(usize, Option<V>, usize), // _, value, index
50    TableFull,
51    Aborted,
52}
53
54enum ModOp<'a, V> {
55    Insert(usize, &'a V),
56    UpsertFastVal(usize),
57    AttemptInsert(usize, &'a V),
58    SwapFastVal(Box<dyn Fn(usize) -> Option<usize>>),
59    Sentinel,
60    Tombstone,
61}
62
63pub enum InsertOp {
64    Insert,
65    UpsertFast,
66    TryInsert,
67}
68
69enum ResizeResult {
70    NoNeed,
71    SwapFailed,
72    ChunkChanged,
73    Done,
74}
75
76enum SwapResult<'a, K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> {
77    Succeed(usize, usize, Shared<'a, ChunkPtr<K, V, A, ALLOC>>),
78    NotFound,
79    Failed,
80    Aborted,
81}
82
83pub struct Chunk<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> {
84    capacity: usize,
85    base: usize,
86    occu_limit: usize,
87    occupation: AtomicUsize,
88    empty_entries: AtomicUsize,
89    total_size: usize,
90    attachment: A,
91    shadow: PhantomData<(K, V, ALLOC)>,
92}
93
94pub struct ChunkPtr<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> {
95    ptr: *mut Chunk<K, V, A, ALLOC>,
96}
97
98pub struct Table<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default, H: Hasher + Default> {
99    new_chunk: Atomic<ChunkPtr<K, V, A, ALLOC>>,
100    chunk: Atomic<ChunkPtr<K, V, A, ALLOC>>,
101    count: AtomicUsize,
102    epoch: AtomicUsize,
103    timestamp: AtomicU64,
104    mark: PhantomData<H>,
105}
106
107impl<
108        K: Clone + Hash + Eq,
109        V: Clone,
110        A: Attachment<K, V>,
111        ALLOC: GlobalAlloc + Default,
112        H: Hasher + Default,
113    > Table<K, V, A, ALLOC, H>
114{
115    pub fn with_capacity(cap: usize) -> Self {
116        if !is_power_of_2(cap) {
117            panic!("capacity is not power of 2");
118        }
119        // Each entry key value pair is 2 words
120        // steal 1 bit in the MSB of value indicate Prime(1)
121        let chunk = Chunk::alloc_chunk(cap);
122        Self {
123            chunk: Atomic::new(ChunkPtr::new(chunk)),
124            new_chunk: Atomic::null(),
125            count: AtomicUsize::new(0),
126            epoch: AtomicUsize::new(0),
127            timestamp: AtomicU64::new(timestamp()),
128            mark: PhantomData,
129        }
130    }
131
132    pub fn new() -> Self {
133        Self::with_capacity(64)
134    }
135
136    pub fn get(&self, key: &K, fkey: usize, read_attachment: bool) -> Option<(usize, Option<V>)> {
137        enum FromChunkRes<V> {
138            Value(usize, Value, Option<V>, usize, usize), // Last one is idx
139            Prime,
140            None,
141            Sentinel,
142        }
143        let guard = crossbeam_epoch::pin();
144        let backoff = crossbeam_utils::Backoff::new();
145        let hash = hash::<H>(fkey);
146        loop {
147            let epoch = self.now_epoch();
148            let chunk_ptr = self.chunk.load(Acquire, &guard);
149            let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
150            let chunk = unsafe { chunk_ptr.deref() };
151            let new_chunk = Self::to_chunk_ref(epoch, &chunk_ptr, &new_chunk_ptr);
152            debug_assert!(!chunk_ptr.is_null());
153            let get_from =
154                |chunk: &Chunk<K, V, A, ALLOC>, migrating: Option<&ChunkPtr<K, V, A, ALLOC>>| {
155                    let (val, idx, addr) = self.get_from_chunk(&*chunk, hash, key, fkey, migrating);
156                    match val.parsed {
157                        ParsedValue::Empty | ParsedValue::Val(0) => FromChunkRes::None,
158                        ParsedValue::Val(v) => FromChunkRes::Value(
159                            v,
160                            val,
161                            if Self::can_attach() && read_attachment {
162                                Some(chunk.attachment.get(idx).1)
163                            } else {
164                                None
165                            },
166                            idx,
167                            addr,
168                        ),
169                        ParsedValue::Prime(_) => FromChunkRes::Prime,
170                        ParsedValue::Sentinel => FromChunkRes::Sentinel,
171                    }
172                };
173            return match get_from(&chunk, new_chunk) {
174                FromChunkRes::Value(fval, val, attach_val, idx, addr) => {
175                    if let Some(new_chunk) = new_chunk {
176                        self.migrate_entry(fkey, idx, val, chunk, new_chunk, addr, &mut 0);
177                    }
178                    Some((fval, attach_val))
179                }
180                FromChunkRes::Sentinel => {
181                    if let Some(new_chunk) = new_chunk {
182                        dfence();
183                        match get_from(&new_chunk, None) {
184                            FromChunkRes::Value(fval, _, val, _, _) => Some((fval, val)),
185                            FromChunkRes::Sentinel => {
186                                // Sentinel in new chunk, should retry
187                                backoff.spin();
188                                continue;
189                            }
190                            FromChunkRes::None => {
191                                trace!(
192                                    "Got non from new chunk for {} at epoch {}",
193                                    fkey - 5,
194                                    epoch
195                                );
196                                None
197                            }
198                            FromChunkRes::Prime => {
199                                backoff.spin();
200                                continue;
201                            }
202                        }
203                    } else {
204                        warn!(
205                            "Got sentinel on get but new chunk is null for {}, retry. Copying {}, epoch {}, now epoch {}",
206                            fkey,
207                            new_chunk.is_some(),
208                            epoch,
209                            self.epoch.load(Acquire)
210                        );
211                        backoff.spin();
212                        continue;
213                    }
214                }
215                FromChunkRes::None => {
216                    if let Some(chunk) = new_chunk {
217                        dfence();
218                        match get_from(chunk, None) {
219                            FromChunkRes::Value(fval, _, val, _, _) => Some((fval, val)),
220                            FromChunkRes::Sentinel => {
221                                // Sentinel in new chunk, should retry
222                                backoff.spin();
223                                continue;
224                            }
225                            FromChunkRes::None => {
226                                trace!(
227                                    "Got non from new chunk for {} at epoch {}",
228                                    fkey - 5,
229                                    epoch
230                                );
231                                None
232                            }
233                            FromChunkRes::Prime => {
234                                backoff.spin();
235                                continue;
236                            }
237                        }
238                    } else {
239                        None
240                    }
241                }
242                FromChunkRes::Prime => {
243                    backoff.spin();
244                    continue;
245                }
246            };
247        }
248    }
249
250    pub fn insert(
251        &self,
252        op: InsertOp,
253        key: &K,
254        value: Option<V>,
255        fkey: usize,
256        fvalue: usize,
257    ) -> Option<(usize, V)> {
258        let backoff = crossbeam_utils::Backoff::new();
259        let guard = crossbeam_epoch::pin();
260        let hash = hash::<H>(fkey);
261        loop {
262            let epoch = self.now_epoch();
263            // trace!("Inserting key: {}, value: {}", fkey, fvalue);
264            let chunk_ptr = self.chunk.load(Acquire, &guard);
265            let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
266            let new_chunk = Self::to_chunk_ref(epoch, &chunk_ptr, &new_chunk_ptr);
267            if new_chunk.is_none() {
268                match self.check_migration(chunk_ptr, &guard) {
269                    ResizeResult::Done | ResizeResult::SwapFailed | ResizeResult::ChunkChanged => {
270                        debug!("Retry insert due to resize");
271                        backoff.spin();
272                        continue;
273                    }
274                    ResizeResult::NoNeed => {}
275                }
276            } else if new_chunk_ptr.is_null() {
277                // Copying, must have new chunk
278                warn!("Chunk ptrs does not consist with epoch");
279                continue;
280            }
281            let chunk = unsafe { chunk_ptr.deref() };
282            let modify_chunk = if let Some(new_chunk) = new_chunk {
283                new_chunk
284            } else {
285                chunk
286            };
287            let masked_value = fvalue & VAL_BIT_MASK;
288            let mod_op = match op {
289                InsertOp::Insert => ModOp::Insert(masked_value, value.as_ref().unwrap()),
290                InsertOp::UpsertFast => ModOp::UpsertFastVal(masked_value),
291                InsertOp::TryInsert => ModOp::AttemptInsert(masked_value, value.as_ref().unwrap()),
292            };
293            let value_insertion =
294                self.modify_entry(&*modify_chunk, hash, key, fkey, mod_op, None, &guard);
295            let mut result = None;
296            match value_insertion {
297                ModResult::Done(_, _, _) => {
298                    modify_chunk.occupation.fetch_add(1, Relaxed);
299                    self.count.fetch_add(1, AcqRel);
300                }
301                ModResult::Replaced(fv, v, _) | ModResult::Existed(fv, v) => result = Some((fv, v)),
302                ModResult::Fail => {
303                    // If fail insertion then retry
304                    warn!(
305                        "Insertion failed, do migration and retry. Copying {}, cap {}, count {}, old {:?}, new {:?}",
306                        new_chunk.is_some(),
307                        modify_chunk.capacity,
308                        modify_chunk.occupation.load(Relaxed),
309                        chunk_ptr,
310                        new_chunk_ptr
311                    );
312                    backoff.spin();
313                    continue;
314                }
315                ModResult::TableFull => {
316                    trace!(
317                        "Insertion is too fast, copying {}, cap {}, count {}, old {:?}, new {:?}.",
318                        new_chunk.is_some(),
319                        modify_chunk.capacity,
320                        modify_chunk.occupation.load(Relaxed),
321                        chunk_ptr,
322                        new_chunk_ptr
323                    );
324                    self.do_migration(chunk_ptr, &guard);
325                    backoff.spin();
326                    continue;
327                }
328                ModResult::Sentinel => {
329                    trace!("Discovered sentinel on insertion table upon probing, retry");
330                    backoff.spin();
331                    continue;
332                }
333                ModResult::NotFound => unreachable!("Not Found on insertion is impossible"),
334                ModResult::Aborted => unreachable!("Should no abort"),
335            }
336            if new_chunk.is_some() {
337                dfence();
338                assert_ne!(
339                    chunk_ptr, new_chunk_ptr,
340                    "at epoch {}, inserting k:{}, v:{}",
341                    epoch, fkey, fvalue
342                );
343                assert_ne!(
344                    new_chunk_ptr,
345                    Shared::null(),
346                    "at epoch {}, inserting k:{}, v:{}",
347                    epoch,
348                    fkey,
349                    fvalue
350                );
351                self.modify_entry(chunk, hash, key, fkey, ModOp::Sentinel, new_chunk, &guard);
352            }
353            // trace!("Inserted key {}, with value {}", fkey, fvalue);
354            return result;
355        }
356    }
357
358    #[inline(always)]
359    fn is_copying(epoch: usize) -> bool {
360        epoch | 1 == epoch
361    }
362
363    #[inline(always)]
364    fn epoch_changed(&self, epoch: usize) -> bool {
365        self.now_epoch() != epoch
366    }
367
368    fn swap<'a, F: Fn(usize) -> Option<usize> + Copy + 'static>(
369        &self,
370        fkey: usize,
371        key: &K,
372        func: F,
373        guard: &'a Guard,
374    ) -> SwapResult<'a, K, V, A, ALLOC> {
375        let backoff = crossbeam_utils::Backoff::new();
376        let hash = hash::<H>(fkey);
377        loop {
378            let epoch = self.now_epoch();
379            let chunk_ptr = self.chunk.load(Acquire, &guard);
380            let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
381            let chunk = unsafe { chunk_ptr.deref() };
382            let new_chunk = Self::to_chunk_ref(epoch, &chunk_ptr, &new_chunk_ptr);
383            if let Some(new_chunk) = new_chunk {
384                // && self.now_epoch() == epoch
385                // Copying is on the way, should try to get old value from old chunk then put new value in new chunk
386                let (old_parsed_val, old_index, _) =
387                    self.get_from_chunk(chunk, hash, key, fkey, Some(new_chunk));
388                let old_fval = old_parsed_val.raw;
389                if old_fval != SENTINEL_VALUE
390                    && old_fval != EMPTY_VALUE
391                    && old_fval != TOMBSTONE_VALUE
392                {
393                    if let Some(new_val) = func(old_fval) {
394                        let val = chunk.attachment.get(old_index).1;
395                        match self.modify_entry(
396                            new_chunk,
397                            hash,
398                            key,
399                            fkey,
400                            ModOp::AttemptInsert(new_val, &val),
401                            None,
402                            guard,
403                        ) {
404                            ModResult::Done(_, _, new_index)
405                            | ModResult::Replaced(_, _, new_index) => {
406                                let old_addr = chunk.base + old_index * ENTRY_SIZE;
407                                if self.cas_sentinel(old_addr, old_fval) {
408                                    // Put a sentinel in the old chunk
409                                    return SwapResult::Succeed(
410                                        old_fval & VAL_BIT_MASK,
411                                        new_index,
412                                        new_chunk_ptr,
413                                    );
414                                } else {
415                                    // If fail, we may have some problem here
416                                    // The best strategy can be CAS a tombstone to the new index and try everything again
417                                    // Note that we use attempt insert, it will be safe to just `remove` it
418                                    let new_addr = new_chunk.base + new_index * ENTRY_SIZE;
419                                    self.cas_tombstone(new_addr, new_val);
420                                    continue;
421                                }
422                            }
423                            _ => {}
424                        }
425                    }
426                }
427            }
428            let modify_chunk_ptr = if new_chunk.is_some() {
429                new_chunk_ptr
430            } else {
431                chunk_ptr
432            };
433            let modify_chunk = if let Some(new_chunk) = new_chunk {
434                new_chunk
435            } else {
436                chunk
437            };
438            trace!("Swaping for key {}, copying {}", fkey, new_chunk.is_some());
439            let mod_res = self.modify_entry(
440                modify_chunk,
441                hash,
442                key,
443                fkey,
444                ModOp::SwapFastVal(Box::new(func)),
445                None,
446                guard,
447            );
448            if new_chunk.is_some() {
449                assert_ne!(chunk_ptr, new_chunk_ptr);
450                assert_ne!(new_chunk_ptr, Shared::null());
451                self.modify_entry(chunk, hash, key, fkey, ModOp::Sentinel, new_chunk, &guard);
452            }
453            return match mod_res {
454                ModResult::Replaced(v, _, idx) => {
455                    SwapResult::Succeed(v & VAL_BIT_MASK, idx, modify_chunk_ptr)
456                }
457                ModResult::Aborted => SwapResult::Aborted,
458                ModResult::Fail => SwapResult::Failed,
459                ModResult::NotFound => SwapResult::NotFound,
460                ModResult::Sentinel => {
461                    backoff.spin();
462                    continue;
463                }
464                ModResult::Existed(_, _) => unreachable!("Swap have existed result"),
465                ModResult::Done(_, _, _) => unreachable!("Swap Done"),
466                ModResult::TableFull => unreachable!("Swap table full"),
467            };
468        }
469    }
470
471    #[inline(always)]
472    fn to_chunk_ref<'a>(
473        epoch: usize,
474        old_chunk_ptr: &'a Shared<ChunkPtr<K, V, A, ALLOC>>,
475        new_chunk_ptr: &'a Shared<ChunkPtr<K, V, A, ALLOC>>,
476    ) -> Option<&'a ChunkPtr<K, V, A, ALLOC>> {
477        if (Self::is_copying(epoch)) && (!old_chunk_ptr.eq(new_chunk_ptr)) {
478            unsafe { new_chunk_ptr.as_ref() }
479        } else {
480            None
481        }
482    }
483
484    #[inline(always)]
485    fn now_epoch(&self) -> usize {
486        self.epoch.load(Acquire)
487    }
488
489    pub fn remove(&self, key: &K, fkey: usize) -> Option<(usize, V)> {
490        let guard = crossbeam_epoch::pin();
491        let backoff = crossbeam_utils::Backoff::new();
492        let hash = hash::<H>(fkey);
493        loop {
494            let epoch = self.now_epoch();
495            let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
496            let old_chunk_ptr = self.chunk.load(Acquire, &guard);
497            let copying = Self::is_copying(epoch);
498            if copying && (new_chunk_ptr.is_null() || new_chunk_ptr == old_chunk_ptr) {
499                continue;
500            }
501            let new_chunk = unsafe { new_chunk_ptr.deref() };
502            let old_chunk = unsafe { old_chunk_ptr.deref() };
503            let mut retr = None;
504            if copying {
505                // Put sentinel to the old before putting tombstone to the new
506                // If not migration might put the old value back
507                trace!("Put sentinel in old chunk for removal");
508                assert_ne!(new_chunk_ptr, Shared::null());
509                let remove_from_old = self.modify_entry(
510                    &*old_chunk,
511                    hash,
512                    key,
513                    fkey,
514                    ModOp::Sentinel,
515                    Some(&new_chunk),
516                    &guard,
517                );
518                match remove_from_old {
519                    ModResult::Done(fvalue, Some(value), _)
520                    | ModResult::Replaced(fvalue, value, _) => {
521                        trace!("Sentinal placed");
522                        retr = Some((fvalue, value));
523                    }
524                    ModResult::Done(_, None, _) => {}
525                    _ => {
526                        trace!("Sentinal not placed");
527                    }
528                }
529            }
530            let modify_chunk = if copying { &new_chunk } else { &old_chunk };
531            let res = self.modify_entry(
532                &*modify_chunk,
533                hash,
534                key,
535                fkey,
536                ModOp::Tombstone,
537                None,
538                &guard,
539            );
540            match res {
541                ModResult::Replaced(fvalue, value, _) => {
542                    retr = Some((fvalue, value));
543                    self.count.fetch_sub(1, AcqRel);
544                }
545                ModResult::Done(_, _, _) => unreachable!("Remove shall not have done"),
546                ModResult::NotFound => {}
547                ModResult::Sentinel => {
548                    backoff.spin();
549                    continue;
550                }
551                ModResult::TableFull => unreachable!("TableFull on remove is not possible"),
552                _ => {}
553            };
554            if self.epoch_changed(epoch) {
555                if retr.is_none() {
556                    return self.remove(key, fkey);
557                }
558            }
559            return retr;
560        }
561    }
562
563    pub fn len(&self) -> usize {
564        self.count.load(Acquire)
565    }
566
567    fn get_from_chunk(
568        &self,
569        chunk: &Chunk<K, V, A, ALLOC>,
570        hash: usize,
571        key: &K,
572        fkey: usize,
573        migrating: Option<&ChunkPtr<K, V, A, ALLOC>>,
574    ) -> (Value, usize, usize) {
575        assert_ne!(chunk as *const Chunk<K, V, A, ALLOC> as usize, 0);
576        let mut idx = hash;
577        let cap = chunk.capacity;
578        let base = chunk.base;
579        let cap_mask = chunk.cap_mask();
580        let mut counter = 0;
581        while counter < cap {
582            idx &= cap_mask;
583            let addr = base + idx * ENTRY_SIZE;
584            let k = self.get_fast_key(addr);
585            if k == fkey && chunk.attachment.probe(idx, key) {
586                let val_res = self.get_fast_value(addr);
587                match val_res.parsed {
588                    ParsedValue::Empty => {}
589                    _ => return (val_res, idx, addr),
590                }
591            } else if k == EMPTY_KEY {
592                return (Value::new::<K, V, A, ALLOC, H>(0), 0, addr);
593            } else if let Some(new_chunk_ins) = migrating {
594                debug_assert!(new_chunk_ins.base != chunk.base);
595                let val_res = self.get_fast_value(addr);
596                if let &ParsedValue::Val(_) = &val_res.parsed {
597                    self.migrate_entry(k, idx, val_res, chunk, new_chunk_ins, addr, &mut 0);
598                }
599            }
600            idx += 1; // reprobe
601            counter += 1;
602        }
603
604        // not found
605        return (Value::new::<K, V, A, ALLOC, H>(0), 0, 0);
606    }
607
608    #[inline(always)]
609    fn modify_entry<'a>(
610        &self,
611        chunk: &'a Chunk<K, V, A, ALLOC>,
612        hash: usize,
613        key: &K,
614        fkey: usize,
615        op: ModOp<V>,
616        migration_chunk: Option<&ChunkPtr<K, V, A, ALLOC>>,
617        _guard: &'a Guard,
618    ) -> ModResult<V> {
619        let cap = chunk.capacity;
620        let base = chunk.base;
621        let mut idx = hash;
622        let mut count = 0;
623        let cap_mask = chunk.cap_mask();
624        let backoff = crossbeam_utils::Backoff::new();
625        while count <= cap {
626            idx &= cap_mask;
627            let addr = base + idx * ENTRY_SIZE;
628            let k = self.get_fast_key(addr);
629            let v = self.get_fast_value(addr);
630            {
631                // Early exit upon sentinel discovery
632                match v.parsed {
633                    ParsedValue::Sentinel => match &op {
634                        &ModOp::Sentinel => {
635                            // Sentinel op is allowed on old chunk
636                        }
637                        _ => {
638                            // Confirmed, this is possible
639                            return ModResult::Sentinel;
640                        }
641                    },
642                    _ => {}
643                }
644            }
645            if k == fkey && chunk.attachment.probe(idx, &key) {
646                // Probing non-empty entry
647                let val = v;
648                match &val.parsed {
649                    ParsedValue::Val(v) => {
650                        match &op {
651                            &ModOp::Sentinel => {
652                                if self.cas_sentinel(addr, val.raw) {
653                                    let (_, value) = chunk.attachment.get(idx);
654                                    chunk.attachment.erase(idx);
655                                    if *v == 0 {
656                                        return ModResult::Done(addr, None, idx);
657                                    } else {
658                                        return ModResult::Done(*v, Some(value), idx);
659                                    }
660                                } else {
661                                    return ModResult::Fail;
662                                }
663                            }
664                            &ModOp::Tombstone => {
665                                if *v == 0 {
666                                    // Already tombstone
667                                    return ModResult::NotFound;
668                                }
669                                if !self.cas_tombstone(addr, val.raw).1 {
670                                    // this insertion have conflict with others
671                                    // other thread changed the value (empty)
672                                    // should fail
673                                    return ModResult::Fail;
674                                } else {
675                                    // we have put tombstone on the value, get the attachment and erase it
676                                    let (_, value) = chunk.attachment.get(idx);
677                                    chunk.attachment.erase(idx);
678                                    chunk.empty_entries.fetch_add(1, Relaxed);
679                                    return ModResult::Replaced(*v, value, idx);
680                                }
681                            }
682                            &ModOp::UpsertFastVal(ref fv) => {
683                                if self.cas_value(addr, val.raw, *fv).1 {
684                                    let (_, value) = chunk.attachment.get(idx);
685                                    if *v == 0 {
686                                        return ModResult::Done(addr, None, idx);
687                                    } else {
688                                        return ModResult::Replaced(*v, value, idx);
689                                    }
690                                } else {
691                                    trace!("Cannot upsert fast value in place for {}", fkey);
692                                    return ModResult::Fail;
693                                }
694                            }
695                            &ModOp::AttemptInsert(fval, oval) => {
696                                if *v == 0 {
697                                    let primed_fval = if Self::can_attach() {
698                                        fval | INV_VAL_BIT_MASK
699                                    } else {
700                                        fval
701                                    };
702                                    let (act_val, replaced) =
703                                        self.cas_value(addr, val.raw, primed_fval);
704                                    if replaced {
705                                        let (_, prev_val) = chunk.attachment.get(idx);
706                                        if Self::can_attach() {
707                                            chunk.attachment.set(idx, key.clone(), (*oval).clone());
708                                            let stripped_prime =
709                                                self.cas_value(addr, primed_fval, fval).1;
710                                            debug_assert!(stripped_prime);
711                                        }
712                                        return ModResult::Replaced(val.raw, prev_val, idx);
713                                    } else {
714                                        let (_, value) = chunk.attachment.get(idx);
715                                        return ModResult::Existed(act_val, value);
716                                    }
717                                } else {
718                                    trace!(
719                                        "Attempting insert existed entry {}, {}, have key {:?}, skip",
720                                        k,
721                                        fval,
722                                        v
723                                    );
724                                    let (_, value) = chunk.attachment.get(idx);
725                                    return ModResult::Existed(*v, value);
726                                }
727                            }
728                            &ModOp::SwapFastVal(ref swap) => {
729                                trace!(
730                                    "Swaping found key {} have original value {:#064b}",
731                                    fkey,
732                                    val.raw
733                                );
734                                match &val.parsed {
735                                    ParsedValue::Val(pval) => {
736                                        let pval = *pval;
737                                        if pval == 0 {
738                                            return ModResult::NotFound;
739                                        }
740                                        let aval = chunk.attachment.get(idx).1;
741                                        if let Some(v) = swap(pval) {
742                                            if self.cas_value(addr, pval, v).1 {
743                                                // swap success
744                                                return ModResult::Replaced(val.raw, aval, idx);
745                                            } else {
746                                                return ModResult::Fail;
747                                            }
748                                        } else {
749                                            return ModResult::Aborted;
750                                        }
751                                    }
752                                    _ => {
753                                        return ModResult::Fail;
754                                    }
755                                }
756                            }
757                            &ModOp::Insert(fval, ref v) => {
758                                // Insert with attachment should prime value first when
759                                // duplicate key discovered
760                                debug!("Inserting in place for {}", fkey);
761                                let primed_fval = if Self::can_attach() {
762                                    fval | INV_VAL_BIT_MASK
763                                } else {
764                                    fval
765                                };
766                                if self.cas_value(addr, val.raw, primed_fval).1 {
767                                    let (_, prev_val) = chunk.attachment.get(idx);
768                                    if Self::can_attach() {
769                                        chunk.attachment.set(idx, key.clone(), (*v).clone());
770                                        let stripped_prime =
771                                            self.cas_value(addr, primed_fval, fval).1;
772                                        debug_assert!(stripped_prime);
773                                    }
774                                    return ModResult::Replaced(val.raw, prev_val, idx);
775                                } else {
776                                    trace!("Cannot insert in place for {}", fkey);
777                                    return ModResult::Fail;
778                                }
779                            }
780                        }
781                    }
782                    ParsedValue::Empty => {
783                        // found the key with empty value, shall do nothing and continue probing
784                        // because other thread is trying to write value into it
785                    }
786                    ParsedValue::Sentinel => return ModResult::Sentinel,
787                    ParsedValue::Prime(v) => {
788                        trace!(
789                            "Discovered prime for key {} with value {:#064b}, retry",
790                            fkey,
791                            v
792                        );
793                        backoff.spin();
794                        continue;
795                    }
796                }
797            } else if k == EMPTY_KEY {
798                match op {
799                    ModOp::Insert(fval, val) | ModOp::AttemptInsert(fval, val) => {
800                        trace!(
801                            "Inserting entry key: {}, value: {}, raw: {:b}, addr: {}",
802                            fkey,
803                            fval & VAL_BIT_MASK,
804                            fval,
805                            addr
806                        );
807                        if self.cas_value(addr, EMPTY_VALUE, fval).1 {
808                            // CAS value succeed, shall store key
809                            chunk.attachment.set(idx, key.clone(), (*val).clone());
810                            unsafe { intrinsics::atomic_store_rel(addr as *mut usize, fkey) }
811                            return ModResult::Done(addr, None, idx);
812                        } else {
813                            backoff.spin();
814                            continue;
815                        }
816                    }
817                    ModOp::UpsertFastVal(fval) => {
818                        trace!(
819                            "Upserting entry key: {}, value: {}, raw: {:b}, addr: {}",
820                            fkey,
821                            fval & VAL_BIT_MASK,
822                            fval,
823                            addr
824                        );
825                        if self.cas_value(addr, EMPTY_VALUE, fval).1 {
826                            unsafe { intrinsics::atomic_store_rel(addr as *mut usize, fkey) }
827                            return ModResult::Done(addr, None, idx);
828                        } else {
829                            backoff.spin();
830                            continue;
831                        }
832                    }
833                    ModOp::Sentinel => {
834                        if self.cas_sentinel(addr, 0) {
835                            // CAS value succeed, shall store key
836                            unsafe { intrinsics::atomic_store_rel(addr as *mut usize, fkey) }
837                            return ModResult::Done(addr, None, idx);
838                        } else {
839                            backoff.spin();
840                            continue;
841                        }
842                    }
843                    ModOp::Tombstone => return ModResult::Fail,
844                    ModOp::SwapFastVal(_) => return ModResult::NotFound,
845                };
846            } else if let (Some(migration_chunk), &ParsedValue::Val(_)) =
847                (migration_chunk, &v.parsed)
848            {
849                self.migrate_entry(k, idx, v, chunk, migration_chunk, addr, &mut 0);
850            }
851            idx += 1; // reprobe
852            count += 1;
853        }
854        match op {
855            ModOp::Insert(_fv, _v) | ModOp::AttemptInsert(_fv, _v) => ModResult::TableFull,
856            ModOp::UpsertFastVal(_fv) => ModResult::TableFull,
857            _ => ModResult::NotFound,
858        }
859    }
860
861    fn all_from_chunk(&self, chunk: &Chunk<K, V, A, ALLOC>) -> Vec<(usize, usize, K, V)> {
862        let mut idx = 0;
863        let cap = chunk.capacity;
864        let base = chunk.base;
865        let mut counter = 0;
866        let mut res = Vec::with_capacity(chunk.occupation.load(Relaxed));
867        let cap_mask = chunk.cap_mask();
868        while counter < cap {
869            idx &= cap_mask;
870            let addr = base + idx * ENTRY_SIZE;
871            let k = self.get_fast_key(addr);
872            if k != EMPTY_KEY {
873                let val_res = self.get_fast_value(addr);
874                match val_res.parsed {
875                    ParsedValue::Val(0) => {}
876                    ParsedValue::Val(v) => {
877                        let (key, value) = chunk.attachment.get(idx);
878                        res.push((k, v, key, value))
879                    }
880                    ParsedValue::Prime(_) => {
881                        continue;
882                    }
883                    _ => {}
884                }
885            }
886            idx += 1; // reprobe
887            counter += 1;
888        }
889        return res;
890    }
891
892    fn entries(&self) -> Vec<(usize, usize, K, V)> {
893        let guard = crossbeam_epoch::pin();
894        let old_chunk_ref = self.chunk.load(Acquire, &guard);
895        let new_chunk_ref = self.new_chunk.load(Acquire, &guard);
896        let old_chunk = unsafe { old_chunk_ref.deref() };
897        let new_chunk = unsafe { new_chunk_ref.deref() };
898        let mut res = self.all_from_chunk(&*old_chunk);
899        if !new_chunk_ref.is_null() && old_chunk_ref != new_chunk_ref {
900            res.append(&mut self.all_from_chunk(&*new_chunk));
901        }
902        return res;
903    }
904
905    #[inline(always)]
906    fn get_fast_key(&self, entry_addr: usize) -> usize {
907        debug_assert!(entry_addr > 0);
908        unsafe { intrinsics::atomic_load_acq(entry_addr as *mut usize) }
909    }
910
911    #[inline(always)]
912    fn get_fast_value(&self, entry_addr: usize) -> Value {
913        debug_assert!(entry_addr > 0);
914        let addr = entry_addr + mem::size_of::<usize>();
915        let val = unsafe { intrinsics::atomic_load_acq(addr as *mut usize) };
916        Value::new::<K, V, A, ALLOC, H>(val)
917    }
918
919    #[inline(always)]
920    fn cas_tombstone(&self, entry_addr: usize, original: usize) -> (usize, bool) {
921        debug_assert!(entry_addr > 0);
922        self.cas_value(entry_addr, original, TOMBSTONE_VALUE)
923    }
924    #[inline(always)]
925    fn cas_value(&self, entry_addr: usize, original: usize, value: usize) -> (usize, bool) {
926        debug_assert!(entry_addr > 0);
927        debug_assert_ne!(value & VAL_BIT_MASK, SENTINEL_VALUE);
928        let addr = entry_addr + mem::size_of::<usize>();
929        unsafe { intrinsics::atomic_cxchg_acqrel(addr as *mut usize, original, value) }
930    }
931    #[inline(always)]
932    fn cas_sentinel(&self, entry_addr: usize, original: usize) -> bool {
933        assert!(entry_addr > 0);
934        if cfg!(debug_assert) {
935            let guard = crossbeam_epoch::pin();
936            assert!(Self::is_copying(self.epoch.load(Acquire)));
937            assert!(!self.new_chunk.load(Acquire, &guard).is_null());
938            let chunk = self.chunk.load(Acquire, &guard);
939            let chunk_ref = unsafe { chunk.deref() };
940            assert!(entry_addr >= chunk_ref.base);
941            assert!(entry_addr < chunk_ref.base + chunk_ref.total_size);
942        }
943        let addr = entry_addr + mem::size_of::<usize>();
944        let (val, done) = unsafe {
945            intrinsics::atomic_cxchg_acqrel(addr as *mut usize, original, SENTINEL_VALUE)
946        };
947        done || val == SENTINEL_VALUE
948    }
949
950    /// Failed return old shared
951    fn check_migration<'a>(
952        &self,
953        old_chunk_ptr: Shared<'a, ChunkPtr<K, V, A, ALLOC>>,
954        guard: &crossbeam_epoch::Guard,
955    ) -> ResizeResult {
956        let old_chunk_ins = unsafe { old_chunk_ptr.deref() };
957        let occupation = old_chunk_ins.occupation.load(Relaxed);
958        let occu_limit = old_chunk_ins.occu_limit;
959        if occupation <= occu_limit {
960            return ResizeResult::NoNeed;
961        }
962        self.do_migration(old_chunk_ptr, guard)
963    }
964
965    fn do_migration<'a>(
966        &self,
967        old_chunk_ptr: Shared<'a, ChunkPtr<K, V, A, ALLOC>>,
968        guard: &crossbeam_epoch::Guard,
969    ) -> ResizeResult {
970        let epoch = self.now_epoch();
971        let old_chunk_ins = unsafe { old_chunk_ptr.deref() };
972        let empty_entries = old_chunk_ins.empty_entries.load(Relaxed);
973        let old_cap = old_chunk_ins.capacity;
974        let new_cap = if empty_entries > (old_cap >> 1) {
975            // Clear tombstones
976            old_cap
977        } else {
978            let mut cap = old_cap << 1;
979            if cap < 2048 {
980                cap <<= 1;
981            }
982            if epoch < 5 {
983                cap <<= 1;
984            }
985            if timestamp() - self.timestamp.load(Acquire) < 1000 {
986                cap <<= 1;
987            }
988            cap
989        };
990        debug!(
991            "New size for {:?} is {}, was {}",
992            old_chunk_ptr, new_cap, old_cap
993        );
994        // Swap in old chunk as placeholder for the lock
995        if let Err(_) = self
996            .new_chunk
997            .compare_and_set(Shared::null(), old_chunk_ptr, AcqRel, guard)
998        {
999            // other thread have allocated new chunk and wins the competition, exit
1000            trace!("Cannot obtain lock for resize, will retry");
1001            return ResizeResult::SwapFailed;
1002        }
1003        dfence();
1004        if self.chunk.load(Acquire, guard) != old_chunk_ptr {
1005            warn!(
1006                "Give up on resize due to old chunk changed after lock obtained, epoch {} to {}",
1007                epoch,
1008                self.now_epoch()
1009            );
1010            self.new_chunk.store(Shared::null(), Release);
1011            dfence();
1012            debug_assert_eq!(self.now_epoch() % 2, 0);
1013            return ResizeResult::ChunkChanged;
1014        }
1015        debug!("Resizing {:?}", old_chunk_ptr);
1016        let new_chunk_ptr =
1017            Owned::new(ChunkPtr::new(Chunk::alloc_chunk(new_cap))).into_shared(guard);
1018        let new_chunk_ins = unsafe { new_chunk_ptr.deref() };
1019        assert_ne!(new_chunk_ptr, old_chunk_ptr);
1020        self.new_chunk.store(new_chunk_ptr, Release); // Stump becasue we have the lock already
1021        dfence();
1022        let prev_epoch = self.epoch.fetch_add(1, AcqRel); // Increase epoch by one
1023        debug_assert_eq!(prev_epoch % 2, 0);
1024        dfence();
1025        // Migrate entries
1026        self.migrate_entries(old_chunk_ins, new_chunk_ins, guard);
1027        // Assertion check
1028        debug_assert_ne!(old_chunk_ins.ptr as usize, new_chunk_ins.base);
1029        debug_assert_ne!(old_chunk_ins.ptr, unsafe { new_chunk_ptr.deref().ptr });
1030        debug_assert!(!new_chunk_ptr.is_null());
1031        dfence();
1032        let prev_epoch = self.epoch.fetch_add(1, AcqRel); // Increase epoch by one
1033        debug_assert_eq!(prev_epoch % 2, 1);
1034        dfence();
1035        self.chunk.store(new_chunk_ptr, Release);
1036        self.timestamp.store(timestamp(), Release);
1037        dfence();
1038        unsafe {
1039            guard.defer_destroy(old_chunk_ptr);
1040            guard.flush();
1041        }
1042        self.new_chunk.store(Shared::null(), Release);
1043        debug!(
1044            "Migration for {:?} completed, new chunk is {:?}, size from {} to {}",
1045            old_chunk_ptr, new_chunk_ptr, old_cap, new_cap
1046        );
1047        ResizeResult::Done
1048    }
1049
1050    fn migrate_entries(
1051        &self,
1052        old_chunk_ins: &Chunk<K, V, A, ALLOC>,
1053        new_chunk_ins: &Chunk<K, V, A, ALLOC>,
1054        _guard: &crossbeam_epoch::Guard,
1055    ) -> usize {
1056        let mut old_address = old_chunk_ins.base as usize;
1057        let boundary = old_address + chunk_size_of(old_chunk_ins.capacity);
1058        let mut effective_copy = 0;
1059        let mut idx = 0;
1060        let backoff = crossbeam_utils::Backoff::new();
1061        while old_address < boundary {
1062            // iterate the old chunk to extract entries that is NOT empty
1063            let fvalue = self.get_fast_value(old_address);
1064            let fkey = self.get_fast_key(old_address);
1065            // Reasoning value states
1066            match &fvalue.parsed {
1067                ParsedValue::Empty | ParsedValue::Val(0) => {
1068                    if !self.cas_sentinel(old_address, fvalue.raw) {
1069                        warn!("Filling empty with sentinel for old table should succeed but not, retry");
1070                        backoff.spin();
1071                        continue;
1072                    }
1073                }
1074                ParsedValue::Val(_) => {
1075                    if !self.migrate_entry(
1076                        fkey,
1077                        idx,
1078                        fvalue,
1079                        old_chunk_ins,
1080                        new_chunk_ins,
1081                        old_address,
1082                        &mut effective_copy,
1083                    ) {
1084                        continue;
1085                    }
1086                }
1087                ParsedValue::Prime(_) => {
1088                    unreachable!("Shall not have prime in old table");
1089                }
1090                ParsedValue::Sentinel => {
1091                    // Sentinel, skip
1092                    // Sentinel in old chunk implies its new value have already in the new chunk
1093                    // It can also be other thread have moved this key-value pair to the new chunk
1094                    trace!("Skip copy sentinel");
1095                }
1096            }
1097            old_address += ENTRY_SIZE;
1098            idx += 1;
1099            dfence();
1100        }
1101        // resize finished, make changes on the numbers
1102        debug!("Migrated {} entries to new chunk", effective_copy);
1103        new_chunk_ins.occupation.fetch_add(effective_copy, Relaxed);
1104        return effective_copy;
1105    }
1106
1107    #[inline(always)]
1108    fn migrate_entry(
1109        &self,
1110        fkey: usize,
1111        old_idx: usize,
1112        fvalue: Value,
1113        old_chunk_ins: &Chunk<K, V, A, ALLOC>,
1114        new_chunk_ins: &Chunk<K, V, A, ALLOC>,
1115        old_address: usize,
1116        effective_copy: &mut usize,
1117    ) -> bool {
1118        debug_assert_ne!(old_chunk_ins.base, new_chunk_ins.base);
1119        if fkey == EMPTY_KEY {
1120            // Value have no key, insertion in progress
1121            return false;
1122        }
1123        // Insert entry into new chunk, in case of failure, skip this entry
1124        // Value should be primed
1125        assert_ne!(fvalue.raw & VAL_BIT_MASK, SENTINEL_VALUE);
1126        let (key, value) = old_chunk_ins.attachment.get(old_idx);
1127        let inserted_addr = {
1128            // Make insertion for migration inlined, hopefully the ordering will be right
1129            let cap = new_chunk_ins.capacity;
1130            let base = new_chunk_ins.base;
1131            let mut idx = hash::<H>(fkey);
1132            let cap_mask = new_chunk_ins.cap_mask();
1133            let mut count = 0;
1134            let mut res = None;
1135            while count < cap {
1136                idx &= cap_mask;
1137                let addr = base + idx * ENTRY_SIZE;
1138                let k = self.get_fast_key(addr);
1139                if k == fkey && new_chunk_ins.attachment.probe(idx, &key) {
1140                    // New value existed, skip with None result
1141                    break;
1142                } else if k == EMPTY_KEY {
1143                    // Try insert to this slot
1144                    let (val, done) = self.cas_value(addr, EMPTY_VALUE, fvalue.raw);
1145                    debug_assert_ne!(val & VAL_BIT_MASK, SENTINEL_VALUE);
1146                    if done {
1147                        new_chunk_ins.attachment.set(idx, key, value);
1148                        unsafe { intrinsics::atomic_store_rel(addr as *mut usize, fkey) }
1149                        res = Some(addr);
1150                        break;
1151                    }
1152                }
1153                idx += 1; // reprobe
1154                count += 1;
1155            }
1156            res
1157        };
1158        // CAS to ensure sentinel into old chunk (spec)
1159        // Use CAS for old threads may working on this one
1160        dfence(); // fence to ensure sentinel appears righr after pair copied to new chunk
1161        trace!("Copied key {} to new chunk", fkey);
1162        if self.cas_sentinel(old_address, fvalue.raw) {
1163            dfence();
1164            if let Some(_new_entry_addr) = inserted_addr {
1165                old_chunk_ins.attachment.erase(old_idx);
1166                *effective_copy += 1;
1167                return true;
1168            }
1169        }
1170        false
1171    }
1172
1173    pub fn map_is_copying(&self) -> bool {
1174        Self::is_copying(self.now_epoch())
1175    }
1176
1177    #[inline(always)]
1178    fn can_attach() -> bool {
1179        can_attach::<K, V, A>()
1180    }
1181}
1182
1183impl Value {
1184    pub fn new<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default, H: Hasher + Default>(
1185        val: usize,
1186    ) -> Self {
1187        let res = {
1188            if val == EMPTY_VALUE {
1189                ParsedValue::Empty
1190            } else if val == TOMBSTONE_VALUE {
1191                ParsedValue::Val(0)
1192            } else {
1193                let actual_val = val & VAL_BIT_MASK;
1194                let flag = val & INV_VAL_BIT_MASK;
1195                if flag != 0 {
1196                    ParsedValue::Prime(actual_val)
1197                } else if actual_val == SENTINEL_VALUE {
1198                    ParsedValue::Sentinel
1199                } else if actual_val == TOMBSTONE_VALUE {
1200                    unreachable!("");
1201                } else {
1202                    ParsedValue::Val(actual_val)
1203                }
1204            }
1205        };
1206        Value {
1207            raw: val,
1208            parsed: res,
1209        }
1210    }
1211}
1212
1213impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> Chunk<K, V, A, ALLOC> {
1214    fn alloc_chunk(capacity: usize) -> *mut Self {
1215        let capacity = capacity;
1216        let self_size = mem::size_of::<Self>();
1217        let self_align = align_padding(self_size, 64);
1218        let self_size_aligned = self_size + self_align;
1219        let chunk_size = chunk_size_of(capacity);
1220        let attachment_heap = A::heap_size_of(capacity);
1221        let total_size = self_size_aligned + chunk_size + attachment_heap;
1222        let ptr = alloc_mem::<ALLOC>(total_size) as *mut Self;
1223        let addr = ptr as usize;
1224        let data_base = addr + self_size_aligned;
1225        let attachment_base = data_base + chunk_size;
1226        unsafe {
1227            ptr::write(
1228                ptr,
1229                Self {
1230                    base: data_base,
1231                    capacity,
1232                    occupation: AtomicUsize::new(0),
1233                    empty_entries: AtomicUsize::new(0),
1234                    occu_limit: occupation_limit(capacity),
1235                    total_size,
1236                    attachment: A::new(capacity, attachment_base, attachment_heap),
1237                    shadow: PhantomData,
1238                },
1239            )
1240        };
1241        ptr
1242    }
1243
1244    unsafe fn gc(ptr: *mut Chunk<K, V, A, ALLOC>) {
1245        debug_assert_ne!(ptr as usize, 0);
1246        let chunk = &*ptr;
1247        chunk.attachment.dealloc();
1248        dealloc_mem::<ALLOC>(ptr as usize, chunk.total_size);
1249    }
1250
1251    #[inline]
1252    fn cap_mask(&self) -> usize {
1253        self.capacity - 1
1254    }
1255}
1256
1257impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Clone
1258    for Table<K, V, A, ALLOC, H>
1259{
1260    fn clone(&self) -> Self {
1261        let new_table = Table {
1262            chunk: Default::default(),
1263            new_chunk: Default::default(),
1264            count: AtomicUsize::new(0),
1265            epoch: AtomicUsize::new(0),
1266            timestamp: AtomicU64::new(timestamp()),
1267            mark: PhantomData,
1268        };
1269        let guard = crossbeam_epoch::pin();
1270        let old_chunk_ptr = self.chunk.load(Acquire, &guard);
1271        let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
1272        unsafe {
1273            // Hold references first so they won't get reclaimed
1274            let old_chunk = old_chunk_ptr.deref();
1275            let old_total_size = old_chunk.total_size;
1276
1277            let cloned_old_ptr = alloc_mem::<ALLOC>(old_total_size) as *mut Chunk<K, V, A, ALLOC>;
1278            debug_assert_ne!(cloned_old_ptr as usize, 0);
1279            debug_assert_ne!(old_chunk.ptr as usize, 0);
1280            libc::memcpy(
1281                cloned_old_ptr as *mut c_void,
1282                old_chunk.ptr as *const c_void,
1283                old_total_size,
1284            );
1285            let cloned_old_ref = Owned::new(ChunkPtr::new(cloned_old_ptr));
1286            new_table.chunk.store(cloned_old_ref, Release);
1287
1288            if new_chunk_ptr != Shared::null() {
1289                let new_chunk = new_chunk_ptr.deref();
1290                let new_total_size = new_chunk.total_size;
1291                let cloned_new_ptr =
1292                    alloc_mem::<ALLOC>(new_total_size) as *mut Chunk<K, V, A, ALLOC>;
1293                libc::memcpy(
1294                    cloned_new_ptr as *mut c_void,
1295                    new_chunk.ptr as *const c_void,
1296                    new_total_size,
1297                );
1298                let cloned_new_ref = Owned::new(ChunkPtr::new(cloned_new_ptr));
1299                new_table.new_chunk.store(cloned_new_ref, Release);
1300            } else {
1301                new_table.new_chunk.store(Shared::null(), Release);
1302            }
1303        }
1304        new_table.count.store(self.count.load(Acquire), Release);
1305        new_table
1306    }
1307}
1308
1309impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop
1310    for Table<K, V, A, ALLOC, H>
1311{
1312    fn drop(&mut self) {
1313        let guard = crossbeam_epoch::pin();
1314        unsafe {
1315            guard.defer_destroy(self.chunk.load(Acquire, &guard));
1316            let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
1317            if new_chunk_ptr != Shared::null() {
1318                guard.defer_destroy(new_chunk_ptr);
1319            }
1320            guard.flush();
1321        }
1322    }
1323}
1324
1325unsafe impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> Send
1326    for ChunkPtr<K, V, A, ALLOC>
1327{
1328}
1329unsafe impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> Sync
1330    for ChunkPtr<K, V, A, ALLOC>
1331{
1332}
1333
1334impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> Drop for ChunkPtr<K, V, A, ALLOC> {
1335    fn drop(&mut self) {
1336        debug_assert_ne!(self.ptr as usize, 0);
1337
1338        unsafe {
1339            Chunk::gc(self.ptr);
1340        }
1341    }
1342}
1343
1344impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> Deref for ChunkPtr<K, V, A, ALLOC> {
1345    type Target = Chunk<K, V, A, ALLOC>;
1346
1347    fn deref(&self) -> &Self::Target {
1348        debug_assert_ne!(self.ptr as usize, 0);
1349        unsafe { &*self.ptr }
1350    }
1351}
1352
1353impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> ChunkPtr<K, V, A, ALLOC> {
1354    fn new(ptr: *mut Chunk<K, V, A, ALLOC>) -> Self {
1355        debug_assert_ne!(ptr as usize, 0);
1356        Self { ptr }
1357    }
1358}
1359
1360#[inline(always)]
1361fn is_power_of_2(x: usize) -> bool {
1362    (x != 0) && ((x & (x - 1)) == 0)
1363}
1364
1365#[inline(always)]
1366fn occupation_limit(cap: usize) -> usize {
1367    (cap as f64 * 0.75f64) as usize
1368}
1369
1370#[inline(always)]
1371fn chunk_size_of(cap: usize) -> usize {
1372    cap * ENTRY_SIZE
1373}
1374
1375#[inline(always)]
1376pub fn hash<H: Hasher + Default>(num: usize) -> usize {
1377    let mut hasher = H::default();
1378    hasher.write_usize(num);
1379    hasher.finish() as usize
1380}
1381
1382#[inline(always)]
1383pub fn hash_key<K: Hash, H: Hasher + Default>(key: &K) -> usize {
1384    let mut hasher = H::default();
1385    key.hash(&mut hasher);
1386    hasher.finish() as usize
1387}
1388
1389#[inline(always)]
1390fn dfence() {
1391    compiler_fence(SeqCst);
1392    fence(SeqCst);
1393}
1394
1395const fn can_attach<K, V, A: Attachment<K, V>>() -> bool {
1396    mem::size_of::<(K, V)>() != 0
1397}
1398
1399pub trait Attachment<K, V> {
1400    fn heap_size_of(cap: usize) -> usize;
1401    fn new(cap: usize, heap_ptr: usize, heap_size: usize) -> Self;
1402    fn get(&self, index: usize) -> (K, V);
1403    fn set(&self, index: usize, key: K, value: V);
1404    fn erase(&self, index: usize);
1405    fn dealloc(&self);
1406    fn probe(&self, index: usize, probe_key: &K) -> bool;
1407}
1408
1409pub struct WordAttachment;
1410
1411// this attachment basically do nothing and sized zero
1412impl Attachment<(), ()> for WordAttachment {
1413    fn heap_size_of(_cap: usize) -> usize {
1414        0
1415    }
1416
1417    fn new(_cap: usize, _heap_ptr: usize, _heap_size: usize) -> Self {
1418        Self
1419    }
1420
1421    #[inline(always)]
1422    fn get(&self, _index: usize) -> ((), ()) {
1423        ((), ())
1424    }
1425
1426    #[inline(always)]
1427    fn set(&self, _index: usize, _key: (), _value: ()) {}
1428
1429    #[inline(always)]
1430    fn erase(&self, _index: usize) {}
1431
1432    #[inline(always)]
1433    fn dealloc(&self) {}
1434
1435    #[inline(always)]
1436    fn probe(&self, _index: usize, _value: &()) -> bool {
1437        true
1438    }
1439}
1440
1441pub type WordTable<H, ALLOC> = Table<(), (), WordAttachment, H, ALLOC>;
1442
1443pub struct WordObjectAttachment<T, A: GlobalAlloc + Default> {
1444    obj_chunk: usize,
1445    obj_size: usize,
1446    shadow: PhantomData<(T, A)>,
1447}
1448
1449impl<T: Clone, A: GlobalAlloc + Default> Attachment<(), T> for WordObjectAttachment<T, A> {
1450    fn heap_size_of(cap: usize) -> usize {
1451        let obj_size = mem::size_of::<T>();
1452        cap * obj_size
1453    }
1454
1455    fn new(_cap: usize, heap_ptr: usize, _heap_size: usize) -> Self {
1456        Self {
1457            obj_chunk: heap_ptr,
1458            obj_size: mem::size_of::<T>(),
1459            shadow: PhantomData,
1460        }
1461    }
1462
1463    #[inline(always)]
1464    fn get(&self, index: usize) -> ((), T) {
1465        let addr = self.addr_by_index(index);
1466        let v = unsafe { (*(addr as *mut T)).clone() };
1467        ((), v)
1468    }
1469
1470    #[inline(always)]
1471    fn set(&self, index: usize, _key: (), val: T) {
1472        let addr = self.addr_by_index(index);
1473        unsafe { ptr::write(addr as *mut T, val) }
1474    }
1475
1476    #[inline(always)]
1477    fn erase(&self, index: usize) {
1478        drop(self.addr_by_index(index) as *mut T)
1479    }
1480
1481    #[inline(always)]
1482    fn dealloc(&self) {}
1483
1484    fn probe(&self, _index: usize, _value: &()) -> bool {
1485        true
1486    }
1487}
1488
1489pub type HashTable<K, V, ALLOC> =
1490    Table<K, V, HashKVAttachment<K, V, ALLOC>, ALLOC, PassthroughHasher>;
1491
1492pub struct HashKVAttachment<K, V, A: GlobalAlloc + Default> {
1493    obj_chunk: usize,
1494    obj_size: usize,
1495    shadow: PhantomData<(K, V, A)>,
1496}
1497
1498impl<K: Clone + Hash + Eq, V: Clone, A: GlobalAlloc + Default> Attachment<K, V>
1499    for HashKVAttachment<K, V, A>
1500{
1501    fn heap_size_of(cap: usize) -> usize {
1502        let obj_size = mem::size_of::<(K, V)>();
1503        cap * obj_size
1504    }
1505
1506    fn new(_cap: usize, heap_ptr: usize, _heap_size: usize) -> Self {
1507        Self {
1508            obj_chunk: heap_ptr,
1509            obj_size: mem::size_of::<(K, V)>(),
1510            shadow: PhantomData,
1511        }
1512    }
1513
1514    #[inline(always)]
1515    fn get(&self, index: usize) -> (K, V) {
1516        let addr = self.addr_by_index(index);
1517        unsafe { (*(addr as *mut (K, V))).clone() }
1518    }
1519
1520    #[inline(always)]
1521    fn set(&self, index: usize, key: K, val: V) {
1522        let addr = self.addr_by_index(index);
1523        unsafe { ptr::write(addr as *mut (K, V), (key, val)) }
1524    }
1525
1526    #[inline(always)]
1527    fn erase(&self, index: usize) {
1528        drop(self.addr_by_index(index) as *mut (K, V))
1529    }
1530
1531    #[inline(always)]
1532    fn dealloc(&self) {}
1533
1534    fn probe(&self, index: usize, key: &K) -> bool {
1535        let addr = self.addr_by_index(index);
1536        let pos_key = unsafe { &*(addr as *mut K) };
1537        pos_key == key
1538    }
1539}
1540
1541pub trait Map<K, V: Clone> {
1542    fn with_capacity(cap: usize) -> Self;
1543    fn get(&self, key: &K) -> Option<V>;
1544    fn insert(&self, key: &K, value: V) -> Option<V>;
1545    // Return None if insertion successful
1546    fn try_insert(&self, key: &K, value: V) -> Option<V>;
1547    fn remove(&self, key: &K) -> Option<V>;
1548    fn entries(&self) -> Vec<(K, V)>;
1549    fn contains_key(&self, key: &K) -> bool;
1550    fn len(&self) -> usize;
1551    // The func function should  be pure and have no side effect
1552    fn get_or_insert<F: Fn() -> V>(&self, key: &K, func: F) -> V {
1553        loop {
1554            if self.contains_key(key) {
1555                if let Some(value) = self.get(key) {
1556                    return value;
1557                }
1558            } else {
1559                let value = func();
1560                if let Some(value) = self.try_insert(key, value.clone()) {
1561                    return value;
1562                }
1563                return value;
1564            }
1565        }
1566    }
1567}
1568
1569const NUM_FIX: usize = 5;
1570const PLACEHOLDER_VAL: usize = NUM_FIX + 1;
1571
1572impl<K: Clone + Hash + Eq, V: Clone, A: GlobalAlloc + Default> HashKVAttachment<K, V, A> {
1573    fn addr_by_index(&self, index: usize) -> usize {
1574        self.obj_chunk + index * self.obj_size
1575    }
1576}
1577
1578pub struct HashMap<
1579    K: Clone + Hash + Eq,
1580    V: Clone,
1581    ALLOC: GlobalAlloc + Default = System,
1582    H: Hasher + Default = DefaultHasher,
1583> {
1584    table: HashTable<K, V, ALLOC>,
1585    shadow: PhantomData<H>,
1586}
1587
1588impl<K: Clone + Hash + Eq, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default>
1589    HashMap<K, V, ALLOC, H>
1590{
1591    pub fn insert_with_op(&self, op: InsertOp, key: &K, value: V) -> Option<V> {
1592        let hash = hash_key::<K, H>(&key);
1593        self.table
1594            .insert(op, key, Some(value), hash, PLACEHOLDER_VAL)
1595            .map(|(_, v)| v)
1596    }
1597
1598    pub fn write(&self, key: &K) -> Option<HashMapWriteGuard<K, V, ALLOC, H>> {
1599        HashMapWriteGuard::new(&self.table, key)
1600    }
1601    pub fn read(&self, key: &K) -> Option<HashMapReadGuard<K, V, ALLOC, H>> {
1602        HashMapReadGuard::new(&self.table, key)
1603    }
1604}
1605
1606impl<K: Clone + Hash + Eq, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Map<K, V>
1607    for HashMap<K, V, ALLOC, H>
1608{
1609    fn with_capacity(cap: usize) -> Self {
1610        Self {
1611            table: Table::with_capacity(cap),
1612            shadow: PhantomData,
1613        }
1614    }
1615
1616    #[inline(always)]
1617    fn get(&self, key: &K) -> Option<V> {
1618        let hash = hash_key::<K, H>(key);
1619        self.table.get(key, hash, true).map(|v| v.1.unwrap())
1620    }
1621
1622    #[inline(always)]
1623    fn insert(&self, key: &K, value: V) -> Option<V> {
1624        self.insert_with_op(InsertOp::Insert, key, value)
1625    }
1626
1627    #[inline(always)]
1628    fn try_insert(&self, key: &K, value: V) -> Option<V> {
1629        self.insert_with_op(InsertOp::TryInsert, key, value)
1630    }
1631
1632    #[inline(always)]
1633    fn remove(&self, key: &K) -> Option<V> {
1634        let hash = hash_key::<K, H>(&key);
1635        self.table.remove(key, hash).map(|(_, v)| v)
1636    }
1637
1638    #[inline(always)]
1639    fn entries(&self) -> Vec<(K, V)> {
1640        self.table
1641            .entries()
1642            .into_iter()
1643            .map(|(_, _, k, v)| (k, v))
1644            .collect()
1645    }
1646
1647    #[inline(always)]
1648    fn contains_key(&self, key: &K) -> bool {
1649        let hash = hash_key::<K, H>(&key);
1650        self.table.get(key, hash, false).is_some()
1651    }
1652
1653    #[inline(always)]
1654    fn len(&self) -> usize {
1655        self.table.len()
1656    }
1657}
1658
1659impl<T, A: GlobalAlloc + Default> WordObjectAttachment<T, A> {
1660    fn addr_by_index(&self, index: usize) -> usize {
1661        self.obj_chunk + index * self.obj_size
1662    }
1663}
1664
1665type ObjectTable<V, ALLOC, H> = Table<(), V, WordObjectAttachment<V, ALLOC>, ALLOC, H>;
1666
1667#[derive(Clone)]
1668pub struct ObjectMap<
1669    V: Clone,
1670    ALLOC: GlobalAlloc + Default = System,
1671    H: Hasher + Default = DefaultHasher,
1672> {
1673    table: ObjectTable<V, ALLOC, H>,
1674}
1675
1676impl<V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> ObjectMap<V, ALLOC, H> {
1677    fn insert_with_op(&self, op: InsertOp, key: &usize, value: V) -> Option<V> {
1678        self.table
1679            .insert(op, &(), Some(value), key + NUM_FIX, PLACEHOLDER_VAL)
1680            .map(|(_, v)| v)
1681    }
1682
1683    pub fn read(&self, key: usize) -> Option<ObjectMapReadGuard<V, ALLOC, H>> {
1684        ObjectMapReadGuard::new(&self.table, key)
1685    }
1686
1687    pub fn write(&self, key: usize) -> Option<ObjectMapWriteGuard<V, ALLOC, H>> {
1688        ObjectMapWriteGuard::new(&self.table, key)
1689    }
1690}
1691
1692impl<V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Map<usize, V>
1693    for ObjectMap<V, ALLOC, H>
1694{
1695    fn with_capacity(cap: usize) -> Self {
1696        Self {
1697            table: Table::with_capacity(cap),
1698        }
1699    }
1700
1701    #[inline(always)]
1702    fn get(&self, key: &usize) -> Option<V> {
1703        self.table
1704            .get(&(), key + NUM_FIX, true)
1705            .map(|v| v.1.unwrap())
1706    }
1707
1708    #[inline(always)]
1709    fn insert(&self, key: &usize, value: V) -> Option<V> {
1710        self.insert_with_op(InsertOp::Insert, key, value)
1711    }
1712
1713    #[inline(always)]
1714    fn try_insert(&self, key: &usize, value: V) -> Option<V> {
1715        self.insert_with_op(InsertOp::TryInsert, key, value)
1716    }
1717
1718    #[inline(always)]
1719    fn remove(&self, key: &usize) -> Option<V> {
1720        self.table.remove(&(), key + NUM_FIX).map(|(_, v)| v)
1721    }
1722
1723    #[inline(always)]
1724    fn entries(&self) -> Vec<(usize, V)> {
1725        self.table
1726            .entries()
1727            .into_iter()
1728            .map(|(_, k, _, v)| (k - NUM_FIX, v))
1729            .collect()
1730    }
1731
1732    #[inline(always)]
1733    fn contains_key(&self, key: &usize) -> bool {
1734        self.table.get(&(), key + NUM_FIX, false).is_some()
1735    }
1736
1737    #[inline(always)]
1738    fn len(&self) -> usize {
1739        self.table.len()
1740    }
1741}
1742
1743#[derive(Clone)]
1744pub struct WordMap<ALLOC: GlobalAlloc + Default = System, H: Hasher + Default = DefaultHasher> {
1745    table: WordTable<ALLOC, H>,
1746}
1747
1748impl<ALLOC: GlobalAlloc + Default, H: Hasher + Default> WordMap<ALLOC, H> {
1749    fn insert_with_op(&self, op: InsertOp, key: &usize, value: usize) -> Option<usize> {
1750        self.table
1751            .insert(op, &(), None, key + NUM_FIX, value + NUM_FIX)
1752            .map(|(v, _)| v)
1753    }
1754
1755    pub fn get_from_mutex(&self, key: &usize) -> Option<usize> {
1756        self.get(key).map(|v| v & WORD_MUTEX_DATA_BIT_MASK)
1757    }
1758}
1759
1760impl<ALLOC: GlobalAlloc + Default, H: Hasher + Default> Map<usize, usize> for WordMap<ALLOC, H> {
1761    fn with_capacity(cap: usize) -> Self {
1762        Self {
1763            table: Table::with_capacity(cap),
1764        }
1765    }
1766
1767    #[inline(always)]
1768    fn get(&self, key: &usize) -> Option<usize> {
1769        self.table
1770            .get(&(), key + NUM_FIX, false)
1771            .map(|v| v.0 - NUM_FIX)
1772    }
1773
1774    #[inline(always)]
1775    fn insert(&self, key: &usize, value: usize) -> Option<usize> {
1776        self.insert_with_op(InsertOp::UpsertFast, key, value)
1777    }
1778
1779    #[inline(always)]
1780    fn try_insert(&self, key: &usize, value: usize) -> Option<usize> {
1781        self.insert_with_op(InsertOp::TryInsert, key, value)
1782    }
1783
1784    #[inline(always)]
1785    fn remove(&self, key: &usize) -> Option<usize> {
1786        self.table
1787            .remove(&(), key + NUM_FIX)
1788            .map(|(v, _)| v - NUM_FIX)
1789    }
1790    fn entries(&self) -> Vec<(usize, usize)> {
1791        self.table
1792            .entries()
1793            .into_iter()
1794            .map(|(k, v, _, _)| (k - NUM_FIX, v - NUM_FIX))
1795            .collect()
1796    }
1797
1798    #[inline(always)]
1799    fn contains_key(&self, key: &usize) -> bool {
1800        self.get(key).is_some()
1801    }
1802
1803    #[inline(always)]
1804    fn len(&self) -> usize {
1805        self.table.len()
1806    }
1807}
1808
1809const WORD_MUTEX_DATA_BIT_MASK: usize = !0 << 2 >> 2;
1810
1811pub struct WordMutexGuard<
1812    'a,
1813    ALLOC: GlobalAlloc + Default = System,
1814    H: Hasher + Default = DefaultHasher,
1815> {
1816    table: &'a WordTable<ALLOC, H>,
1817    key: usize,
1818    value: usize,
1819}
1820
1821impl<'a, ALLOC: GlobalAlloc + Default, H: Hasher + Default> WordMutexGuard<'a, ALLOC, H> {
1822    fn create(table: &'a WordTable<ALLOC, H>, key: usize) -> Option<Self> {
1823        let key = key + NUM_FIX;
1824        let value = 0;
1825        match table.insert(
1826            InsertOp::TryInsert,
1827            &(),
1828            Some(()),
1829            key,
1830            value | MUTEX_BIT_MASK,
1831        ) {
1832            None | Some((TOMBSTONE_VALUE, ())) | Some((EMPTY_VALUE, ())) => {
1833                trace!("Created locked key {}", key);
1834                Some(Self { table, key, value })
1835            }
1836            _ => {
1837                trace!("Cannot create locked key {} ", key);
1838                None
1839            }
1840        }
1841    }
1842    fn new(table: &'a WordTable<ALLOC, H>, key: usize) -> Option<Self> {
1843        let key = key + NUM_FIX;
1844        let backoff = crossbeam_utils::Backoff::new();
1845        let guard = crossbeam_epoch::pin();
1846        let value;
1847        loop {
1848            let swap_res = table.swap(
1849                key,
1850                &(),
1851                move |fast_value| {
1852                    trace!("The key {} have value {}", key, fast_value);
1853                    let locked_val = fast_value | MUTEX_BIT_MASK;
1854                    if fast_value == locked_val {
1855                        // Locked, unchanged
1856                        trace!("The key {} have locked, unchanged and try again", key);
1857                        None
1858                    } else {
1859                        // Obtain lock
1860                        trace!(
1861                            "The key {} have obtained, with value {}",
1862                            key,
1863                            fast_value & WORD_MUTEX_DATA_BIT_MASK
1864                        );
1865                        Some(locked_val)
1866                    }
1867                },
1868                &guard,
1869            );
1870            match swap_res {
1871                SwapResult::Succeed(val, _idx, _chunk) => {
1872                    trace!("Lock on key {} succeed with value {}", key, val);
1873                    value = val & WORD_MUTEX_DATA_BIT_MASK;
1874                    break;
1875                }
1876                SwapResult::Failed | SwapResult::Aborted => {
1877                    trace!("Lock on key {} failed, retry", key);
1878                    backoff.spin();
1879                    continue;
1880                }
1881                SwapResult::NotFound => {
1882                    trace!("Cannot found key {} to lock", key);
1883                    return None;
1884                }
1885            }
1886        }
1887        debug_assert_ne!(value, 0);
1888        let value = value - NUM_FIX;
1889        Some(Self { table, key, value })
1890    }
1891
1892    pub fn remove(self) -> usize {
1893        trace!("Removing {}", self.key);
1894        let res = self.table.remove(&(), self.key).unwrap().0;
1895        mem::forget(self);
1896        res | MUTEX_BIT_MASK
1897    }
1898}
1899
1900impl<'a, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Deref for WordMutexGuard<'a, ALLOC, H> {
1901    type Target = usize;
1902
1903    fn deref(&self) -> &Self::Target {
1904        &self.value
1905    }
1906}
1907
1908impl<'a, ALLOC: GlobalAlloc + Default, H: Hasher + Default> DerefMut
1909    for WordMutexGuard<'a, ALLOC, H>
1910{
1911    fn deref_mut(&mut self) -> &mut Self::Target {
1912        &mut self.value
1913    }
1914}
1915
1916impl<'a, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop for WordMutexGuard<'a, ALLOC, H> {
1917    fn drop(&mut self) {
1918        self.value += NUM_FIX;
1919        trace!(
1920            "Release lock for key {} with value {}",
1921            self.key,
1922            self.value
1923        );
1924        self.table.insert(
1925            InsertOp::UpsertFast,
1926            &(),
1927            None,
1928            self.key,
1929            self.value & WORD_MUTEX_DATA_BIT_MASK,
1930        );
1931    }
1932}
1933
1934impl<ALLOC: GlobalAlloc + Default, H: Hasher + Default> WordMap<ALLOC, H> {
1935    pub fn lock(&self, key: usize) -> Option<WordMutexGuard<ALLOC, H>> {
1936        WordMutexGuard::new(&self.table, key)
1937    }
1938    pub fn try_insert_locked(&self, key: usize) -> Option<WordMutexGuard<ALLOC, H>> {
1939        WordMutexGuard::create(&self.table, key)
1940    }
1941}
1942
1943pub struct HashMapReadGuard<
1944    'a,
1945    K: Clone + Eq + Hash,
1946    V: Clone,
1947    ALLOC: GlobalAlloc + Default = System,
1948    H: Hasher + Default = DefaultHasher,
1949> {
1950    table: &'a HashTable<K, V, ALLOC>,
1951    hash: usize,
1952    key: K,
1953    value: V,
1954    _mark: PhantomData<H>,
1955}
1956
1957impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default>
1958    HashMapReadGuard<'a, K, V, ALLOC, H>
1959{
1960    fn new(table: &'a HashTable<K, V, ALLOC>, key: &K) -> Option<Self> {
1961        let backoff = crossbeam_utils::Backoff::new();
1962        let guard = crossbeam_epoch::pin();
1963        let hash = hash_key::<K, H>(&key);
1964        let value: V;
1965        loop {
1966            let swap_res = table.swap(
1967                hash,
1968                key,
1969                move |fast_value| {
1970                    if fast_value != PLACEHOLDER_VAL - 1 {
1971                        // Not write locked, can bump it by one
1972                        trace!("Key hash {} is not write locked, will read lock", hash);
1973                        Some(fast_value + 1)
1974                    } else {
1975                        trace!("Key hash {} is write locked, unchanged", hash);
1976                        None
1977                    }
1978                },
1979                &guard,
1980            );
1981            match swap_res {
1982                SwapResult::Succeed(_, idx, chunk) => {
1983                    let chunk_ref = unsafe { chunk.deref() };
1984                    let (_, v) = chunk_ref.attachment.get(idx);
1985                    value = v;
1986                    break;
1987                }
1988                SwapResult::Failed | SwapResult::Aborted => {
1989                    trace!("Lock on key hash {} failed, retry", hash);
1990                    backoff.spin();
1991                    continue;
1992                }
1993                SwapResult::NotFound => {
1994                    debug!("Cannot found hash key {} to lock", hash);
1995                    return None;
1996                }
1997            }
1998        }
1999        Some(Self {
2000            table,
2001            key: key.clone(),
2002            value,
2003            hash,
2004            _mark: Default::default(),
2005        })
2006    }
2007}
2008
2009impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Deref
2010    for HashMapReadGuard<'a, K, V, ALLOC, H>
2011{
2012    type Target = V;
2013
2014    fn deref(&self) -> &Self::Target {
2015        &self.value
2016    }
2017}
2018
2019impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop
2020    for HashMapReadGuard<'a, K, V, ALLOC, H>
2021{
2022    fn drop(&mut self) {
2023        trace!("Release read lock for hash key {}", self.hash);
2024        let guard = crossbeam_epoch::pin();
2025        self.table.swap(
2026            self.hash,
2027            &self.key,
2028            |fast_value| {
2029                debug_assert!(fast_value > PLACEHOLDER_VAL);
2030                Some(fast_value - 1)
2031            },
2032            &guard,
2033        );
2034    }
2035}
2036
2037pub struct HashMapWriteGuard<
2038    'a,
2039    K: Clone + Eq + Hash,
2040    V: Clone,
2041    ALLOC: GlobalAlloc + Default = System,
2042    H: Hasher + Default = DefaultHasher,
2043> {
2044    table: &'a HashTable<K, V, ALLOC>,
2045    hash: usize,
2046    key: K,
2047    value: V,
2048    _mark: PhantomData<H>,
2049}
2050
2051impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default>
2052    HashMapWriteGuard<'a, K, V, ALLOC, H>
2053{
2054    fn new(table: &'a HashTable<K, V, ALLOC>, key: &K) -> Option<Self> {
2055        let backoff = crossbeam_utils::Backoff::new();
2056        let guard = crossbeam_epoch::pin();
2057        let hash = hash_key::<K, H>(&key);
2058        let value: V;
2059        loop {
2060            let swap_res = table.swap(
2061                hash,
2062                key,
2063                move |fast_value| {
2064                    if fast_value == PLACEHOLDER_VAL {
2065                        // Not write locked, can bump it by one
2066                        trace!("Key hash {} is write lockable, will write lock", hash);
2067                        Some(fast_value - 1)
2068                    } else {
2069                        trace!("Key hash {} is write locked, unchanged", hash);
2070                        None
2071                    }
2072                },
2073                &guard,
2074            );
2075            match swap_res {
2076                SwapResult::Succeed(_, idx, chunk) => {
2077                    let chunk_ref = unsafe { chunk.deref() };
2078                    let (_, v) = chunk_ref.attachment.get(idx);
2079                    value = v;
2080                    break;
2081                }
2082                SwapResult::Failed | SwapResult::Aborted => {
2083                    trace!("Lock on key hash {} failed, retry", hash);
2084                    backoff.spin();
2085                    continue;
2086                }
2087                SwapResult::NotFound => {
2088                    debug!("Cannot found hash key {} to lock", hash);
2089                    return None;
2090                }
2091            }
2092        }
2093        Some(Self {
2094            table,
2095            key: key.clone(),
2096            value,
2097            hash,
2098            _mark: Default::default(),
2099        })
2100    }
2101
2102    pub fn remove(self) -> V {
2103        let res = self.table.remove(&self.key, self.hash).unwrap().1;
2104        mem::forget(self);
2105        res
2106    }
2107}
2108
2109impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Deref
2110    for HashMapWriteGuard<'a, K, V, ALLOC, H>
2111{
2112    type Target = V;
2113
2114    fn deref(&self) -> &Self::Target {
2115        &self.value
2116    }
2117}
2118
2119impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> DerefMut
2120    for HashMapWriteGuard<'a, K, V, ALLOC, H>
2121{
2122    fn deref_mut(&mut self) -> &mut Self::Target {
2123        &mut self.value
2124    }
2125}
2126
2127impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop
2128    for HashMapWriteGuard<'a, K, V, ALLOC, H>
2129{
2130    fn drop(&mut self) {
2131        trace!("Release read lock for hash key {}", self.hash);
2132        let hash = hash_key::<K, H>(&self.key);
2133        self.table.insert(
2134            InsertOp::Insert,
2135            &self.key,
2136            Some(self.value.clone()),
2137            hash,
2138            PLACEHOLDER_VAL,
2139        );
2140    }
2141}
2142
2143pub struct ObjectMapReadGuard<
2144    'a,
2145    V: Clone,
2146    ALLOC: GlobalAlloc + Default = System,
2147    H: Hasher + Default = DefaultHasher,
2148> {
2149    table: &'a ObjectTable<V, ALLOC, H>,
2150    key: usize,
2151    value: V,
2152    _mark: PhantomData<H>,
2153}
2154
2155impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default>
2156    ObjectMapReadGuard<'a, V, ALLOC, H>
2157{
2158    fn new(table: &'a ObjectTable<V, ALLOC, H>, key: usize) -> Option<Self> {
2159        let backoff = crossbeam_utils::Backoff::new();
2160        let guard = crossbeam_epoch::pin();
2161        let hash = hash_key::<usize, H>(&key);
2162        let value: V;
2163        loop {
2164            let swap_res = table.swap(
2165                key,
2166                &(),
2167                move |fast_value| {
2168                    if fast_value != PLACEHOLDER_VAL - 1 {
2169                        // Not write locked, can bump it by one
2170                        trace!("Key {} is not write locked, will read lock", hash);
2171                        Some(fast_value + 1)
2172                    } else {
2173                        trace!("Key {} is write locked, unchanged", hash);
2174                        None
2175                    }
2176                },
2177                &guard,
2178            );
2179            match swap_res {
2180                SwapResult::Succeed(_, idx, chunk) => {
2181                    let chunk_ref = unsafe { chunk.deref() };
2182                    let (_, v) = chunk_ref.attachment.get(idx);
2183                    value = v;
2184                    break;
2185                }
2186                SwapResult::Failed | SwapResult::Aborted => {
2187                    trace!("Lock on key {} failed, retry", hash);
2188                    backoff.spin();
2189                    continue;
2190                }
2191                SwapResult::NotFound => {
2192                    debug!("Cannot found hash key {} to lock", hash);
2193                    return None;
2194                }
2195            }
2196        }
2197        Some(Self {
2198            table,
2199            key,
2200            value,
2201            _mark: Default::default(),
2202        })
2203    }
2204}
2205
2206impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Deref
2207    for ObjectMapReadGuard<'a, V, ALLOC, H>
2208{
2209    type Target = V;
2210
2211    fn deref(&self) -> &Self::Target {
2212        &self.value
2213    }
2214}
2215
2216impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop
2217    for ObjectMapReadGuard<'a, V, ALLOC, H>
2218{
2219    fn drop(&mut self) {
2220        trace!("Release read lock for hash key {}", self.key);
2221        let guard = crossbeam_epoch::pin();
2222        self.table.swap(
2223            self.key,
2224            &(),
2225            |fast_value| {
2226                debug_assert!(fast_value > PLACEHOLDER_VAL);
2227                Some(fast_value - 1)
2228            },
2229            &guard,
2230        );
2231    }
2232}
2233
2234pub struct ObjectMapWriteGuard<
2235    'a,
2236    V: Clone,
2237    ALLOC: GlobalAlloc + Default = System,
2238    H: Hasher + Default = DefaultHasher,
2239> {
2240    table: &'a ObjectTable<V, ALLOC, H>,
2241    key: usize,
2242    value: V,
2243    _mark: PhantomData<H>,
2244}
2245
2246impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default>
2247    ObjectMapWriteGuard<'a, V, ALLOC, H>
2248{
2249    fn new(table: &'a ObjectTable<V, ALLOC, H>, key: usize) -> Option<Self> {
2250        let backoff = crossbeam_utils::Backoff::new();
2251        let guard = crossbeam_epoch::pin();
2252        let value: V;
2253        let key = key + NUM_FIX;
2254        loop {
2255            let swap_res = table.swap(
2256                key,
2257                &(),
2258                move |fast_value| {
2259                    if fast_value == PLACEHOLDER_VAL {
2260                        // Not write locked, can bump it by one
2261                        trace!("Key {} is write lockable, will write lock", key);
2262                        Some(fast_value - 1)
2263                    } else {
2264                        trace!("Key {} is write locked, unchanged", key);
2265                        None
2266                    }
2267                },
2268                &guard,
2269            );
2270            match swap_res {
2271                SwapResult::Succeed(_, idx, chunk) => {
2272                    let chunk_ref = unsafe { chunk.deref() };
2273                    let (_, v) = chunk_ref.attachment.get(idx);
2274                    value = v;
2275                    break;
2276                }
2277                SwapResult::Failed | SwapResult::Aborted => {
2278                    trace!("Lock on key {} failed, retry", key);
2279                    backoff.spin();
2280                    continue;
2281                }
2282                SwapResult::NotFound => {
2283                    debug!("Cannot found key {} to lock", key);
2284                    return None;
2285                }
2286            }
2287        }
2288        Some(Self {
2289            table,
2290            key,
2291            value,
2292            _mark: Default::default(),
2293        })
2294    }
2295
2296    pub fn remove(self) -> V {
2297        let res = self.table.remove(&(), self.key).unwrap().1;
2298        mem::forget(self);
2299        res
2300    }
2301}
2302
2303impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Deref
2304    for ObjectMapWriteGuard<'a, V, ALLOC, H>
2305{
2306    type Target = V;
2307
2308    fn deref(&self) -> &Self::Target {
2309        &self.value
2310    }
2311}
2312
2313impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> DerefMut
2314    for ObjectMapWriteGuard<'a, V, ALLOC, H>
2315{
2316    fn deref_mut(&mut self) -> &mut Self::Target {
2317        &mut self.value
2318    }
2319}
2320
2321impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop
2322    for ObjectMapWriteGuard<'a, V, ALLOC, H>
2323{
2324    fn drop(&mut self) {
2325        trace!("Release read lock for key {}", self.key);
2326        self.table.insert(
2327            InsertOp::Insert,
2328            &(),
2329            Some(self.value.clone()),
2330            self.key,
2331            PLACEHOLDER_VAL,
2332        );
2333    }
2334}
2335
2336pub struct HashSet<
2337    T: Clone + Hash + Eq,
2338    ALLOC: GlobalAlloc + Default = System,
2339    H: Hasher + Default = DefaultHasher,
2340> {
2341    table: HashTable<T, (), ALLOC>,
2342    shadow: PhantomData<H>,
2343}
2344
2345impl<T: Clone + Hash + Eq, ALLOC: GlobalAlloc + Default, H: Hasher + Default> HashSet<T, ALLOC, H> {
2346    pub fn with_capacity(cap: usize) -> Self {
2347        Self {
2348            table: Table::with_capacity(cap),
2349            shadow: PhantomData,
2350        }
2351    }
2352
2353    pub fn contains(&self, item: &T) -> bool {
2354        let hash = hash_key::<T, H>(item);
2355        self.table.get(item, hash, false).is_some()
2356    }
2357
2358    pub fn insert(&self, item: &T) -> bool {
2359        let hash = hash_key::<T, H>(item);
2360        self.table
2361            .insert(InsertOp::TryInsert, item, None, hash, !0)
2362            .is_none()
2363    }
2364
2365    pub fn remove(&self, item: &T) -> bool {
2366        let hash = hash_key::<T, H>(item);
2367        self.table.remove(item, hash).is_some()
2368    }
2369
2370    pub fn items(&self) -> std::collections::HashSet<T> {
2371        self.table
2372            .entries()
2373            .into_iter()
2374            .map(|(_, _, item, _)| item)
2375            .collect()
2376    }
2377
2378    #[inline(always)]
2379    pub fn len(&self) -> usize {
2380        self.table.len()
2381    }
2382}
2383
2384#[inline(always)]
2385fn alloc_mem<A: GlobalAlloc + Default>(size: usize) -> usize {
2386    let align = 64;
2387    let layout = Layout::from_size_align(size, align).unwrap();
2388    let alloc = A::default();
2389    // must be all zeroed
2390    unsafe {
2391        let addr = alloc.alloc(layout) as usize;
2392        ptr::write_bytes(addr as *mut u8, 0, size);
2393        debug_assert_eq!(addr % 64, 0);
2394        addr
2395    }
2396}
2397
2398#[inline(always)]
2399fn dealloc_mem<A: GlobalAlloc + Default + Default>(ptr: usize, size: usize) {
2400    let align = 64;
2401    let layout = Layout::from_size_align(size, align).unwrap();
2402    let alloc = A::default();
2403    unsafe { alloc.dealloc(ptr as *mut u8, layout) }
2404}
2405
2406pub struct PassthroughHasher {
2407    num: u64,
2408}
2409
2410impl Hasher for PassthroughHasher {
2411    fn finish(&self) -> u64 {
2412        self.num
2413    }
2414
2415    fn write(&mut self, _bytes: &[u8]) {
2416        unimplemented!()
2417    }
2418
2419    fn write_usize(&mut self, i: usize) {
2420        self.num = i as u64
2421    }
2422}
2423
2424impl Default for PassthroughHasher {
2425    fn default() -> Self {
2426        Self { num: 0 }
2427    }
2428}
2429
2430fn timestamp() -> u64 {
2431    let start = SystemTime::now();
2432    let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap();
2433    since_the_epoch.as_millis() as u64
2434}
2435
2436#[cfg(test)]
2437mod tests {
2438    use crate::map::*;
2439    use alloc::sync::Arc;
2440    use chashmap::CHashMap;
2441    use rayon::prelude::*;
2442    use std::collections::HashMap;
2443    use std::thread;
2444    use std::{
2445        alloc::System,
2446        sync::{Mutex, RwLock},
2447    };
2448    use test::Bencher;
2449
2450    #[test]
2451    fn will_not_overflow() {
2452        let _ = env_logger::try_init();
2453        let table = WordMap::<System>::with_capacity(16);
2454        for i in 50..60 {
2455            assert_eq!(table.insert(&i, i), None);
2456        }
2457        for i in 50..60 {
2458            assert_eq!(table.get(&i), Some(i));
2459        }
2460        for i in 50..60 {
2461            assert_eq!(table.remove(&i), Some(i));
2462        }
2463    }
2464
2465    #[test]
2466    fn resize() {
2467        let _ = env_logger::try_init();
2468        let map = WordMap::<System>::with_capacity(16);
2469        for i in 5..2048 {
2470            map.insert(&i, i * 2);
2471        }
2472        for i in 5..2048 {
2473            match map.get(&i) {
2474                Some(r) => assert_eq!(r, i * 2),
2475                None => panic!("{}", i),
2476            }
2477        }
2478    }
2479
2480    #[test]
2481    fn parallel_no_resize() {
2482        let _ = env_logger::try_init();
2483        let map = Arc::new(WordMap::<System>::with_capacity(65536));
2484        let mut threads = vec![];
2485        for i in 5..99 {
2486            map.insert(&i, i * 10);
2487        }
2488        for i in 100..900 {
2489            let map = map.clone();
2490            threads.push(thread::spawn(move || {
2491                for j in 5..60 {
2492                    map.insert(&(i * 100 + j), i * j);
2493                }
2494            }));
2495        }
2496        for i in 5..9 {
2497            for j in 1..10 {
2498                map.remove(&(i * j));
2499            }
2500        }
2501        for thread in threads {
2502            let _ = thread.join();
2503        }
2504        for i in 100..900 {
2505            for j in 5..60 {
2506                assert_eq!(map.get(&(i * 100 + j)), Some(i * j))
2507            }
2508        }
2509        for i in 5..9 {
2510            for j in 1..10 {
2511                assert!(map.get(&(i * j)).is_none())
2512            }
2513        }
2514    }
2515
2516    #[test]
2517    fn parallel_with_resize() {
2518        let _ = env_logger::try_init();
2519        let num_threads = num_cpus::get();
2520        let test_load = 4096;
2521        let repeat_load = 16;
2522        let map = Arc::new(WordMap::<System>::with_capacity(32));
2523        let mut threads = vec![];
2524        for i in 0..num_threads {
2525            let map = map.clone();
2526            threads.push(thread::spawn(move || {
2527                for j in 5..test_load {
2528                    let key = i * 10000000 + j;
2529                    let value_prefix = i * j * 100;
2530                    for k in 1..repeat_load {
2531                        let value = value_prefix + k;
2532                        if k != 1 {
2533                            assert_eq!(map.get(&key), Some(value - 1));
2534                        }
2535                        let pre_insert_epoch = map.table.now_epoch();
2536                        map.insert(&key, value);
2537                        let post_insert_epoch = map.table.now_epoch();
2538                        for l in 1..128 {
2539                            let pre_fail_get_epoch = map.table.now_epoch();
2540                            let left = map.get(&key);
2541                            let post_fail_get_epoch = map.table.now_epoch();
2542                            let right = Some(value);
2543                            if left != right {
2544                                for m in 1..1024 {
2545                                    let left = map.get(&key);
2546                                    let right = Some(value);
2547                                    if left == right {
2548                                        panic!(
2549                                            "Recovered at turn {} for {}, copying {}, epoch {} to {}, now {}, PIE: {} to {}. Migration problem!!!", 
2550                                            m, 
2551                                            key, 
2552                                            map.table.map_is_copying(),
2553                                            pre_fail_get_epoch,
2554                                            post_fail_get_epoch,
2555                                            map.table.now_epoch(),
2556                                            pre_insert_epoch, 
2557                                            post_insert_epoch
2558                                        );
2559                                    }
2560                                }
2561                                panic!("Unable to recover for {}, round {}, copying {}", key, l , map.table.map_is_copying());
2562                            }
2563                        }
2564                        if j % 7 == 0 {
2565                            assert_eq!(
2566                                map.remove(&key),
2567                                Some(value),
2568                                "Remove result, get {:?}, copying {}, round {}",
2569                                map.get(&key),
2570                                map.table.map_is_copying(),
2571                                k
2572                            );
2573                            assert_eq!(map.get(&key), None, "Remove recursion");
2574                            assert!(map.lock(key).is_none(), "Remove recursion with lock");
2575                            map.insert(&key, value);
2576                        }
2577                        if j % 3 == 0 {
2578                            let new_value = value + 7;
2579                            let pre_insert_epoch = map.table.now_epoch();
2580                            map.insert(&key, new_value);
2581                            let post_insert_epoch = map.table.now_epoch();
2582                            assert_eq!(
2583                                map.get(&key), 
2584                                Some(new_value), 
2585                                "Checking immediate update, key {}, epoch {} to {}",
2586                                key, pre_insert_epoch, post_insert_epoch
2587                            );
2588                            map.insert(&key, value);
2589                        }
2590                    }
2591                }
2592            }));
2593        }
2594        info!("Waiting for intensive insertion to finish");
2595        for thread in threads {
2596            let _ = thread.join();
2597        }
2598        info!("Checking final value");
2599        (0..num_threads)
2600            .collect::<Vec<_>>()
2601            .par_iter()
2602            .for_each(|i| {
2603                for j in 5..test_load {
2604                    let k = i * 10000000 + j;
2605                    let value = i * j * 100 + repeat_load - 1;
2606                    let get_res = map.get(&k);
2607                    assert_eq!(
2608                        get_res,
2609                        Some(value),
2610                        "New k {}, i {}, j {}, epoch {}",
2611                        k,
2612                        i,
2613                        j,
2614                        map.table.now_epoch()
2615                    );
2616                }
2617            });
2618    }
2619
2620    #[test]
2621    fn parallel_hybrid() {
2622        let _ = env_logger::try_init();
2623        let map = Arc::new(WordMap::<System>::with_capacity(4));
2624        for i in 5..128 {
2625            map.insert(&i, i * 10);
2626        }
2627        let mut threads = vec![];
2628        for i in 256..265 {
2629            let map = map.clone();
2630            threads.push(thread::spawn(move || {
2631                for j in 5..60 {
2632                    map.insert(&(i * 10 + j), 10);
2633                }
2634            }));
2635        }
2636        for i in 5..8 {
2637            let map = map.clone();
2638            threads.push(thread::spawn(move || {
2639                for j in 5..8 {
2640                    map.remove(&(i * j));
2641                }
2642            }));
2643        }
2644        for thread in threads {
2645            let _ = thread.join();
2646        }
2647        for i in 256..265 {
2648            for j in 5..60 {
2649                assert_eq!(map.get(&(i * 10 + j)), Some(10))
2650            }
2651        }
2652    }
2653
2654    #[test]
2655    fn parallel_word_map_mutex() {
2656        let _ = env_logger::try_init();
2657        let map = Arc::new(WordMap::<System>::with_capacity(4));
2658        map.insert(&1, 0);
2659        let mut threads = vec![];
2660        let num_threads = 256;
2661        for _ in 0..num_threads {
2662            let map = map.clone();
2663            threads.push(thread::spawn(move || {
2664                let mut guard = map.lock(1).unwrap();
2665                *guard += 1;
2666            }));
2667        }
2668        for thread in threads {
2669            let _ = thread.join();
2670        }
2671        assert_eq!(map.get(&1).unwrap(), num_threads);
2672    }
2673
2674    #[test]
2675    fn parallel_word_map_multi_mutex() {
2676        let _ = env_logger::try_init();
2677        let map = Arc::new(WordMap::<System>::with_capacity(4));
2678        let mut threads = vec![];
2679        let num_threads = num_cpus::get();
2680        let test_load = 4096;
2681        let update_load = 128;
2682        for thread_id in 0..num_threads {
2683            let map = map.clone();
2684            threads.push(thread::spawn(move || {
2685                let target = thread_id;
2686                for i in 0..test_load {
2687                    let key = target * 1000000 + i;
2688                    {
2689                        let mut mutex = map.try_insert_locked(key).unwrap();
2690                        *mutex = 1;
2691                    }
2692                    for j in 1..update_load {
2693                        assert!(
2694                            map.get(&key).is_some(),
2695                            "Pre getting value for mutex, key {}, epoch {}",
2696                            key,
2697                            map.table.now_epoch()
2698                        );
2699                        let val = {
2700                            let mut mutex = map.lock(key).expect(&format!(
2701                                "Locking key {}, copying {}",
2702                                key,
2703                                map.table.now_epoch()
2704                            ));
2705                            assert_eq!(*mutex, j);
2706                            *mutex += 1;
2707                            *mutex
2708                        };
2709                        assert!(
2710                            map.get(&key).is_some(),
2711                            "Post getting value for mutex, key {}, epoch {}",
2712                            key,
2713                            map.table.now_epoch()
2714                        );
2715                        if j % 7 == 0 {
2716                            {
2717                                let mutex = map.lock(key).expect(&format!(
2718                                    "Remove locking key {}, copying {}",
2719                                    key,
2720                                    map.table.now_epoch()
2721                                ));
2722                                mutex.remove();
2723                            }
2724                            assert!(map.lock(key).is_none());
2725                            *map.try_insert_locked(key).unwrap() = val;
2726                        }
2727                    }
2728                    assert_eq!(*map.lock(key).unwrap(), update_load);
2729                }
2730            }));
2731        }
2732        for thread in threads {
2733            let _ = thread.join();
2734        }
2735    }
2736
2737    #[test]
2738    fn parallel_obj_map_rwlock() {
2739        let _ = env_logger::try_init();
2740        let map_cont = ObjectMap::<Obj, System, DefaultHasher>::with_capacity(4);
2741        let map = Arc::new(map_cont);
2742        map.insert(&1, Obj::new(0));
2743        let mut threads = vec![];
2744        let num_threads = 256;
2745        for i in 0..num_threads {
2746            let map = map.clone();
2747            threads.push(thread::spawn(move || {
2748                let mut guard = map.write(1).unwrap();
2749                let val = guard.get();
2750                guard.set(val + 1);
2751                trace!("Dealt with {}", i);
2752            }));
2753        }
2754        for thread in threads {
2755            let _ = thread.join();
2756        }
2757        map.get(&1).unwrap().validate(num_threads);
2758    }
2759
2760    #[test]
2761    fn parallel_hash_map_rwlock() {
2762        let _ = env_logger::try_init();
2763        let map_cont = super::HashMap::<u32, Obj, System, DefaultHasher>::with_capacity(4);
2764        let map = Arc::new(map_cont);
2765        map.insert(&1, Obj::new(0));
2766        let mut threads = vec![];
2767        let num_threads = 256;
2768        for i in 0..num_threads {
2769            let map = map.clone();
2770            threads.push(thread::spawn(move || {
2771                let mut guard = map.write(&1u32).unwrap();
2772                let val = guard.get();
2773                guard.set(val + 1);
2774                trace!("Dealt with {}", i);
2775            }));
2776        }
2777        for thread in threads {
2778            let _ = thread.join();
2779        }
2780        map.get(&1).unwrap().validate(num_threads);
2781    }
2782
2783    #[derive(Copy, Clone)]
2784    struct Obj {
2785        a: usize,
2786        b: usize,
2787        c: usize,
2788        d: usize,
2789    }
2790    impl Obj {
2791        fn new(num: usize) -> Self {
2792            Obj {
2793                a: num,
2794                b: num + 1,
2795                c: num + 2,
2796                d: num + 3,
2797            }
2798        }
2799        fn validate(&self, num: usize) {
2800            assert_eq!(self.a, num);
2801            assert_eq!(self.b, num + 1);
2802            assert_eq!(self.c, num + 2);
2803            assert_eq!(self.d, num + 3);
2804        }
2805        fn get(&self) -> usize {
2806            self.a
2807        }
2808        fn set(&mut self, num: usize) {
2809            *self = Self::new(num)
2810        }
2811    }
2812
2813    #[test]
2814    fn obj_map() {
2815        let _ = env_logger::try_init();
2816        let map = ObjectMap::<Obj>::with_capacity(16);
2817        for i in 5..2048 {
2818            map.insert(&i, Obj::new(i));
2819        }
2820        for i in 5..2048 {
2821            match map.get(&i) {
2822                Some(r) => r.validate(i),
2823                None => panic!("{}", i),
2824            }
2825        }
2826    }
2827
2828    #[test]
2829    fn parallel_obj_hybrid() {
2830        let _ = env_logger::try_init();
2831        let map = Arc::new(ObjectMap::<Obj>::with_capacity(4));
2832        for i in 5..128 {
2833            map.insert(&i, Obj::new(i * 10));
2834        }
2835        let mut threads = vec![];
2836        for i in 256..265 {
2837            let map = map.clone();
2838            threads.push(thread::spawn(move || {
2839                for j in 5..60 {
2840                    map.insert(&(i * 10 + j), Obj::new(10));
2841                }
2842            }));
2843        }
2844        for i in 5..8 {
2845            let map = map.clone();
2846            threads.push(thread::spawn(move || {
2847                for j in 5..8 {
2848                    map.remove(&(i * j));
2849                }
2850            }));
2851        }
2852        for thread in threads {
2853            let _ = thread.join();
2854        }
2855        for i in 256..265 {
2856            for j in 5..60 {
2857                match map.get(&(i * 10 + j)) {
2858                    Some(r) => r.validate(10),
2859                    None => panic!("{}", i),
2860                }
2861            }
2862        }
2863    }
2864
2865    #[test]
2866    fn parallel_hashmap_hybrid() {
2867        let _ = env_logger::try_init();
2868        let map = Arc::new(super::HashMap::<u32, Obj>::with_capacity(4));
2869        for i in 5..128u32 {
2870            map.insert(&i, Obj::new((i * 10) as usize));
2871        }
2872        let mut threads = vec![];
2873        for i in 256..265u32 {
2874            let map = map.clone();
2875            threads.push(thread::spawn(move || {
2876                for j in 5..60u32 {
2877                    map.insert(&(i * 10 + j), Obj::new(10usize));
2878                }
2879            }));
2880        }
2881        for i in 5..8 {
2882            let map = map.clone();
2883            threads.push(thread::spawn(move || {
2884                for j in 5..8 {
2885                    map.remove(&(i * j));
2886                }
2887            }));
2888        }
2889        for thread in threads {
2890            let _ = thread.join();
2891        }
2892        for i in 256..265 {
2893            for j in 5..60 {
2894                match map.get(&(i * 10 + j)) {
2895                    Some(r) => r.validate(10),
2896                    None => panic!("{}", i),
2897                }
2898            }
2899        }
2900    }
2901
2902    use std::thread::JoinHandle;
2903    #[test]
2904    fn atomic_ordering() {
2905        let test_load = 102400;
2906        let epoch = Arc::new(AtomicUsize::new(0));
2907        let old_ptr = Arc::new(AtomicUsize::new(0));
2908        let new_ptr = Arc::new(AtomicUsize::new(0));
2909        let write = || -> JoinHandle<()> {
2910            let epoch = epoch.clone();
2911            let old_ptr = old_ptr.clone();
2912            let new_ptr = new_ptr.clone();
2913            thread::spawn(move || {
2914                for _ in 0..test_load {
2915                    let old = old_ptr.load(Acquire);
2916                    if new_ptr.compare_and_swap(0, old, AcqRel) != 0 {
2917                        return;
2918                    }
2919                    dfence();
2920                    if old_ptr.load(Acquire) != old {
2921                        new_ptr.store(0, Release);
2922                        dfence();
2923                        return;
2924                    }
2925                    let new = old + 1;
2926                    new_ptr.store(new, Release);
2927                    dfence();
2928                    assert_eq!(epoch.fetch_add(1, AcqRel) % 2, 0);
2929                    // Do something
2930                    for _ in 0..1000 {
2931                        std::sync::atomic::spin_loop_hint();
2932                    }
2933                    old_ptr.store(new, Release);
2934                    dfence();
2935                    assert_eq!(epoch.fetch_add(1, AcqRel) % 2, 1);
2936                    dfence();
2937                    new_ptr.store(0, Release);
2938                }
2939            })
2940        };
2941        let read = || -> JoinHandle<()> {
2942            let epoch = epoch.clone();
2943            let old_ptr = old_ptr.clone();
2944            let new_ptr = new_ptr.clone();
2945            thread::spawn(move || {
2946                for _ in 0..test_load {
2947                    let epoch_val = epoch.load(Acquire);
2948                    let old = old_ptr.load(Acquire);
2949                    let new = new_ptr.load(Acquire);
2950                    let changing = epoch_val % 2 == 1;
2951                    for _ in 0..500 {
2952                        std::sync::atomic::spin_loop_hint();
2953                    }
2954                    if changing && epoch.load(Acquire) == epoch_val {
2955                        assert_ne!(old, new);
2956                        assert_ne!(new, 0);
2957                    }
2958                }
2959            })
2960        };
2961        let num_writers = 5;
2962        let mut writers = Vec::with_capacity(num_writers);
2963        for _ in 0..num_writers {
2964            writers.push(write());
2965        }
2966        let num_readers = num_cpus::get();
2967        let mut readers = Vec::with_capacity(num_readers);
2968        for _ in 0..num_readers {
2969            readers.push(read());
2970        }
2971        for reader in readers {
2972            reader.join().unwrap();
2973        }
2974        for writer in writers {
2975            writer.join().unwrap();
2976        }
2977    }
2978
2979    #[bench]
2980    fn lfmap(b: &mut Bencher) {
2981        let _ = env_logger::try_init();
2982        let map = WordMap::<System, DefaultHasher>::with_capacity(8);
2983        let mut i = 5;
2984        b.iter(|| {
2985            map.insert(&i, i);
2986            i += 1;
2987        });
2988    }
2989
2990    #[bench]
2991    fn hashmap(b: &mut Bencher) {
2992        let _ = env_logger::try_init();
2993        let mut map = HashMap::new();
2994        let mut i = 5;
2995        b.iter(|| {
2996            map.insert(i, i);
2997            i += 1;
2998        });
2999    }
3000
3001    #[bench]
3002    fn mutex_hashmap(b: &mut Bencher) {
3003        let _ = env_logger::try_init();
3004        let map = Mutex::new(HashMap::new());
3005        let mut i = 5;
3006        b.iter(|| {
3007            map.lock().unwrap().insert(i, i);
3008            i += 1;
3009        });
3010    }
3011
3012    #[bench]
3013    fn rwlock_hashmap(b: &mut Bencher) {
3014        let _ = env_logger::try_init();
3015        let map = RwLock::new(HashMap::new());
3016        let mut i = 5;
3017        b.iter(|| {
3018            map.write().unwrap().insert(i, i);
3019            i += 1;
3020        });
3021    }
3022
3023    #[bench]
3024    fn chashmap(b: &mut Bencher) {
3025        let _ = env_logger::try_init();
3026        let map = CHashMap::new();
3027        let mut i = 5;
3028        b.iter(|| {
3029            map.insert(i, i);
3030            i += 1;
3031        });
3032    }
3033
3034    #[bench]
3035    fn default_hasher(b: &mut Bencher) {
3036        let _ = env_logger::try_init();
3037        b.iter(|| {
3038            let mut hasher = DefaultHasher::default();
3039            hasher.write_u64(123);
3040            hasher.finish();
3041        });
3042    }
3043}