1use crate::align_padding;
3use alloc::vec::Vec;
4use core::alloc::{GlobalAlloc, Layout};
5use core::hash::Hasher;
6use core::marker::PhantomData;
7use core::ops::Deref;
8use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
9use core::sync::atomic::{compiler_fence, fence, AtomicU64, AtomicUsize};
10use core::{intrinsics, mem, ptr};
11use crossbeam_epoch::*;
12use std::alloc::System;
13use std::collections::hash_map::DefaultHasher;
14use std::hash::Hash;
15use std::ops::DerefMut;
16use std::os::raw::c_void;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19pub struct EntryTemplate(usize, usize);
20
21const EMPTY_KEY: usize = 0;
22const EMPTY_VALUE: usize = 0;
23const SENTINEL_VALUE: usize = 1;
24const TOMBSTONE_VALUE: usize = 2;
25const VAL_BIT_MASK: usize = !0 << 1 >> 1;
26const INV_VAL_BIT_MASK: usize = !VAL_BIT_MASK;
27const MUTEX_BIT_MASK: usize = !WORD_MUTEX_DATA_BIT_MASK & VAL_BIT_MASK;
28const ENTRY_SIZE: usize = mem::size_of::<EntryTemplate>();
29
30struct Value {
31 raw: usize,
32 parsed: ParsedValue,
33}
34
35enum ParsedValue {
36 Val(usize), Prime(usize),
38 Sentinel,
39 Empty,
40}
41
42#[derive(Debug)]
43enum ModResult<V> {
44 Replaced(usize, V, usize), Existed(usize, V),
46 Fail,
47 Sentinel,
48 NotFound,
49 Done(usize, Option<V>, usize), TableFull,
51 Aborted,
52}
53
54enum ModOp<'a, V> {
55 Insert(usize, &'a V),
56 UpsertFastVal(usize),
57 AttemptInsert(usize, &'a V),
58 SwapFastVal(Box<dyn Fn(usize) -> Option<usize>>),
59 Sentinel,
60 Tombstone,
61}
62
63pub enum InsertOp {
64 Insert,
65 UpsertFast,
66 TryInsert,
67}
68
69enum ResizeResult {
70 NoNeed,
71 SwapFailed,
72 ChunkChanged,
73 Done,
74}
75
76enum SwapResult<'a, K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> {
77 Succeed(usize, usize, Shared<'a, ChunkPtr<K, V, A, ALLOC>>),
78 NotFound,
79 Failed,
80 Aborted,
81}
82
83pub struct Chunk<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> {
84 capacity: usize,
85 base: usize,
86 occu_limit: usize,
87 occupation: AtomicUsize,
88 empty_entries: AtomicUsize,
89 total_size: usize,
90 attachment: A,
91 shadow: PhantomData<(K, V, ALLOC)>,
92}
93
94pub struct ChunkPtr<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> {
95 ptr: *mut Chunk<K, V, A, ALLOC>,
96}
97
98pub struct Table<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default, H: Hasher + Default> {
99 new_chunk: Atomic<ChunkPtr<K, V, A, ALLOC>>,
100 chunk: Atomic<ChunkPtr<K, V, A, ALLOC>>,
101 count: AtomicUsize,
102 epoch: AtomicUsize,
103 timestamp: AtomicU64,
104 mark: PhantomData<H>,
105}
106
107impl<
108 K: Clone + Hash + Eq,
109 V: Clone,
110 A: Attachment<K, V>,
111 ALLOC: GlobalAlloc + Default,
112 H: Hasher + Default,
113 > Table<K, V, A, ALLOC, H>
114{
115 pub fn with_capacity(cap: usize) -> Self {
116 if !is_power_of_2(cap) {
117 panic!("capacity is not power of 2");
118 }
119 let chunk = Chunk::alloc_chunk(cap);
122 Self {
123 chunk: Atomic::new(ChunkPtr::new(chunk)),
124 new_chunk: Atomic::null(),
125 count: AtomicUsize::new(0),
126 epoch: AtomicUsize::new(0),
127 timestamp: AtomicU64::new(timestamp()),
128 mark: PhantomData,
129 }
130 }
131
132 pub fn new() -> Self {
133 Self::with_capacity(64)
134 }
135
136 pub fn get(&self, key: &K, fkey: usize, read_attachment: bool) -> Option<(usize, Option<V>)> {
137 enum FromChunkRes<V> {
138 Value(usize, Value, Option<V>, usize, usize), Prime,
140 None,
141 Sentinel,
142 }
143 let guard = crossbeam_epoch::pin();
144 let backoff = crossbeam_utils::Backoff::new();
145 let hash = hash::<H>(fkey);
146 loop {
147 let epoch = self.now_epoch();
148 let chunk_ptr = self.chunk.load(Acquire, &guard);
149 let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
150 let chunk = unsafe { chunk_ptr.deref() };
151 let new_chunk = Self::to_chunk_ref(epoch, &chunk_ptr, &new_chunk_ptr);
152 debug_assert!(!chunk_ptr.is_null());
153 let get_from =
154 |chunk: &Chunk<K, V, A, ALLOC>, migrating: Option<&ChunkPtr<K, V, A, ALLOC>>| {
155 let (val, idx, addr) = self.get_from_chunk(&*chunk, hash, key, fkey, migrating);
156 match val.parsed {
157 ParsedValue::Empty | ParsedValue::Val(0) => FromChunkRes::None,
158 ParsedValue::Val(v) => FromChunkRes::Value(
159 v,
160 val,
161 if Self::can_attach() && read_attachment {
162 Some(chunk.attachment.get(idx).1)
163 } else {
164 None
165 },
166 idx,
167 addr,
168 ),
169 ParsedValue::Prime(_) => FromChunkRes::Prime,
170 ParsedValue::Sentinel => FromChunkRes::Sentinel,
171 }
172 };
173 return match get_from(&chunk, new_chunk) {
174 FromChunkRes::Value(fval, val, attach_val, idx, addr) => {
175 if let Some(new_chunk) = new_chunk {
176 self.migrate_entry(fkey, idx, val, chunk, new_chunk, addr, &mut 0);
177 }
178 Some((fval, attach_val))
179 }
180 FromChunkRes::Sentinel => {
181 if let Some(new_chunk) = new_chunk {
182 dfence();
183 match get_from(&new_chunk, None) {
184 FromChunkRes::Value(fval, _, val, _, _) => Some((fval, val)),
185 FromChunkRes::Sentinel => {
186 backoff.spin();
188 continue;
189 }
190 FromChunkRes::None => {
191 trace!(
192 "Got non from new chunk for {} at epoch {}",
193 fkey - 5,
194 epoch
195 );
196 None
197 }
198 FromChunkRes::Prime => {
199 backoff.spin();
200 continue;
201 }
202 }
203 } else {
204 warn!(
205 "Got sentinel on get but new chunk is null for {}, retry. Copying {}, epoch {}, now epoch {}",
206 fkey,
207 new_chunk.is_some(),
208 epoch,
209 self.epoch.load(Acquire)
210 );
211 backoff.spin();
212 continue;
213 }
214 }
215 FromChunkRes::None => {
216 if let Some(chunk) = new_chunk {
217 dfence();
218 match get_from(chunk, None) {
219 FromChunkRes::Value(fval, _, val, _, _) => Some((fval, val)),
220 FromChunkRes::Sentinel => {
221 backoff.spin();
223 continue;
224 }
225 FromChunkRes::None => {
226 trace!(
227 "Got non from new chunk for {} at epoch {}",
228 fkey - 5,
229 epoch
230 );
231 None
232 }
233 FromChunkRes::Prime => {
234 backoff.spin();
235 continue;
236 }
237 }
238 } else {
239 None
240 }
241 }
242 FromChunkRes::Prime => {
243 backoff.spin();
244 continue;
245 }
246 };
247 }
248 }
249
250 pub fn insert(
251 &self,
252 op: InsertOp,
253 key: &K,
254 value: Option<V>,
255 fkey: usize,
256 fvalue: usize,
257 ) -> Option<(usize, V)> {
258 let backoff = crossbeam_utils::Backoff::new();
259 let guard = crossbeam_epoch::pin();
260 let hash = hash::<H>(fkey);
261 loop {
262 let epoch = self.now_epoch();
263 let chunk_ptr = self.chunk.load(Acquire, &guard);
265 let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
266 let new_chunk = Self::to_chunk_ref(epoch, &chunk_ptr, &new_chunk_ptr);
267 if new_chunk.is_none() {
268 match self.check_migration(chunk_ptr, &guard) {
269 ResizeResult::Done | ResizeResult::SwapFailed | ResizeResult::ChunkChanged => {
270 debug!("Retry insert due to resize");
271 backoff.spin();
272 continue;
273 }
274 ResizeResult::NoNeed => {}
275 }
276 } else if new_chunk_ptr.is_null() {
277 warn!("Chunk ptrs does not consist with epoch");
279 continue;
280 }
281 let chunk = unsafe { chunk_ptr.deref() };
282 let modify_chunk = if let Some(new_chunk) = new_chunk {
283 new_chunk
284 } else {
285 chunk
286 };
287 let masked_value = fvalue & VAL_BIT_MASK;
288 let mod_op = match op {
289 InsertOp::Insert => ModOp::Insert(masked_value, value.as_ref().unwrap()),
290 InsertOp::UpsertFast => ModOp::UpsertFastVal(masked_value),
291 InsertOp::TryInsert => ModOp::AttemptInsert(masked_value, value.as_ref().unwrap()),
292 };
293 let value_insertion =
294 self.modify_entry(&*modify_chunk, hash, key, fkey, mod_op, None, &guard);
295 let mut result = None;
296 match value_insertion {
297 ModResult::Done(_, _, _) => {
298 modify_chunk.occupation.fetch_add(1, Relaxed);
299 self.count.fetch_add(1, AcqRel);
300 }
301 ModResult::Replaced(fv, v, _) | ModResult::Existed(fv, v) => result = Some((fv, v)),
302 ModResult::Fail => {
303 warn!(
305 "Insertion failed, do migration and retry. Copying {}, cap {}, count {}, old {:?}, new {:?}",
306 new_chunk.is_some(),
307 modify_chunk.capacity,
308 modify_chunk.occupation.load(Relaxed),
309 chunk_ptr,
310 new_chunk_ptr
311 );
312 backoff.spin();
313 continue;
314 }
315 ModResult::TableFull => {
316 trace!(
317 "Insertion is too fast, copying {}, cap {}, count {}, old {:?}, new {:?}.",
318 new_chunk.is_some(),
319 modify_chunk.capacity,
320 modify_chunk.occupation.load(Relaxed),
321 chunk_ptr,
322 new_chunk_ptr
323 );
324 self.do_migration(chunk_ptr, &guard);
325 backoff.spin();
326 continue;
327 }
328 ModResult::Sentinel => {
329 trace!("Discovered sentinel on insertion table upon probing, retry");
330 backoff.spin();
331 continue;
332 }
333 ModResult::NotFound => unreachable!("Not Found on insertion is impossible"),
334 ModResult::Aborted => unreachable!("Should no abort"),
335 }
336 if new_chunk.is_some() {
337 dfence();
338 assert_ne!(
339 chunk_ptr, new_chunk_ptr,
340 "at epoch {}, inserting k:{}, v:{}",
341 epoch, fkey, fvalue
342 );
343 assert_ne!(
344 new_chunk_ptr,
345 Shared::null(),
346 "at epoch {}, inserting k:{}, v:{}",
347 epoch,
348 fkey,
349 fvalue
350 );
351 self.modify_entry(chunk, hash, key, fkey, ModOp::Sentinel, new_chunk, &guard);
352 }
353 return result;
355 }
356 }
357
358 #[inline(always)]
359 fn is_copying(epoch: usize) -> bool {
360 epoch | 1 == epoch
361 }
362
363 #[inline(always)]
364 fn epoch_changed(&self, epoch: usize) -> bool {
365 self.now_epoch() != epoch
366 }
367
368 fn swap<'a, F: Fn(usize) -> Option<usize> + Copy + 'static>(
369 &self,
370 fkey: usize,
371 key: &K,
372 func: F,
373 guard: &'a Guard,
374 ) -> SwapResult<'a, K, V, A, ALLOC> {
375 let backoff = crossbeam_utils::Backoff::new();
376 let hash = hash::<H>(fkey);
377 loop {
378 let epoch = self.now_epoch();
379 let chunk_ptr = self.chunk.load(Acquire, &guard);
380 let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
381 let chunk = unsafe { chunk_ptr.deref() };
382 let new_chunk = Self::to_chunk_ref(epoch, &chunk_ptr, &new_chunk_ptr);
383 if let Some(new_chunk) = new_chunk {
384 let (old_parsed_val, old_index, _) =
387 self.get_from_chunk(chunk, hash, key, fkey, Some(new_chunk));
388 let old_fval = old_parsed_val.raw;
389 if old_fval != SENTINEL_VALUE
390 && old_fval != EMPTY_VALUE
391 && old_fval != TOMBSTONE_VALUE
392 {
393 if let Some(new_val) = func(old_fval) {
394 let val = chunk.attachment.get(old_index).1;
395 match self.modify_entry(
396 new_chunk,
397 hash,
398 key,
399 fkey,
400 ModOp::AttemptInsert(new_val, &val),
401 None,
402 guard,
403 ) {
404 ModResult::Done(_, _, new_index)
405 | ModResult::Replaced(_, _, new_index) => {
406 let old_addr = chunk.base + old_index * ENTRY_SIZE;
407 if self.cas_sentinel(old_addr, old_fval) {
408 return SwapResult::Succeed(
410 old_fval & VAL_BIT_MASK,
411 new_index,
412 new_chunk_ptr,
413 );
414 } else {
415 let new_addr = new_chunk.base + new_index * ENTRY_SIZE;
419 self.cas_tombstone(new_addr, new_val);
420 continue;
421 }
422 }
423 _ => {}
424 }
425 }
426 }
427 }
428 let modify_chunk_ptr = if new_chunk.is_some() {
429 new_chunk_ptr
430 } else {
431 chunk_ptr
432 };
433 let modify_chunk = if let Some(new_chunk) = new_chunk {
434 new_chunk
435 } else {
436 chunk
437 };
438 trace!("Swaping for key {}, copying {}", fkey, new_chunk.is_some());
439 let mod_res = self.modify_entry(
440 modify_chunk,
441 hash,
442 key,
443 fkey,
444 ModOp::SwapFastVal(Box::new(func)),
445 None,
446 guard,
447 );
448 if new_chunk.is_some() {
449 assert_ne!(chunk_ptr, new_chunk_ptr);
450 assert_ne!(new_chunk_ptr, Shared::null());
451 self.modify_entry(chunk, hash, key, fkey, ModOp::Sentinel, new_chunk, &guard);
452 }
453 return match mod_res {
454 ModResult::Replaced(v, _, idx) => {
455 SwapResult::Succeed(v & VAL_BIT_MASK, idx, modify_chunk_ptr)
456 }
457 ModResult::Aborted => SwapResult::Aborted,
458 ModResult::Fail => SwapResult::Failed,
459 ModResult::NotFound => SwapResult::NotFound,
460 ModResult::Sentinel => {
461 backoff.spin();
462 continue;
463 }
464 ModResult::Existed(_, _) => unreachable!("Swap have existed result"),
465 ModResult::Done(_, _, _) => unreachable!("Swap Done"),
466 ModResult::TableFull => unreachable!("Swap table full"),
467 };
468 }
469 }
470
471 #[inline(always)]
472 fn to_chunk_ref<'a>(
473 epoch: usize,
474 old_chunk_ptr: &'a Shared<ChunkPtr<K, V, A, ALLOC>>,
475 new_chunk_ptr: &'a Shared<ChunkPtr<K, V, A, ALLOC>>,
476 ) -> Option<&'a ChunkPtr<K, V, A, ALLOC>> {
477 if (Self::is_copying(epoch)) && (!old_chunk_ptr.eq(new_chunk_ptr)) {
478 unsafe { new_chunk_ptr.as_ref() }
479 } else {
480 None
481 }
482 }
483
484 #[inline(always)]
485 fn now_epoch(&self) -> usize {
486 self.epoch.load(Acquire)
487 }
488
489 pub fn remove(&self, key: &K, fkey: usize) -> Option<(usize, V)> {
490 let guard = crossbeam_epoch::pin();
491 let backoff = crossbeam_utils::Backoff::new();
492 let hash = hash::<H>(fkey);
493 loop {
494 let epoch = self.now_epoch();
495 let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
496 let old_chunk_ptr = self.chunk.load(Acquire, &guard);
497 let copying = Self::is_copying(epoch);
498 if copying && (new_chunk_ptr.is_null() || new_chunk_ptr == old_chunk_ptr) {
499 continue;
500 }
501 let new_chunk = unsafe { new_chunk_ptr.deref() };
502 let old_chunk = unsafe { old_chunk_ptr.deref() };
503 let mut retr = None;
504 if copying {
505 trace!("Put sentinel in old chunk for removal");
508 assert_ne!(new_chunk_ptr, Shared::null());
509 let remove_from_old = self.modify_entry(
510 &*old_chunk,
511 hash,
512 key,
513 fkey,
514 ModOp::Sentinel,
515 Some(&new_chunk),
516 &guard,
517 );
518 match remove_from_old {
519 ModResult::Done(fvalue, Some(value), _)
520 | ModResult::Replaced(fvalue, value, _) => {
521 trace!("Sentinal placed");
522 retr = Some((fvalue, value));
523 }
524 ModResult::Done(_, None, _) => {}
525 _ => {
526 trace!("Sentinal not placed");
527 }
528 }
529 }
530 let modify_chunk = if copying { &new_chunk } else { &old_chunk };
531 let res = self.modify_entry(
532 &*modify_chunk,
533 hash,
534 key,
535 fkey,
536 ModOp::Tombstone,
537 None,
538 &guard,
539 );
540 match res {
541 ModResult::Replaced(fvalue, value, _) => {
542 retr = Some((fvalue, value));
543 self.count.fetch_sub(1, AcqRel);
544 }
545 ModResult::Done(_, _, _) => unreachable!("Remove shall not have done"),
546 ModResult::NotFound => {}
547 ModResult::Sentinel => {
548 backoff.spin();
549 continue;
550 }
551 ModResult::TableFull => unreachable!("TableFull on remove is not possible"),
552 _ => {}
553 };
554 if self.epoch_changed(epoch) {
555 if retr.is_none() {
556 return self.remove(key, fkey);
557 }
558 }
559 return retr;
560 }
561 }
562
563 pub fn len(&self) -> usize {
564 self.count.load(Acquire)
565 }
566
567 fn get_from_chunk(
568 &self,
569 chunk: &Chunk<K, V, A, ALLOC>,
570 hash: usize,
571 key: &K,
572 fkey: usize,
573 migrating: Option<&ChunkPtr<K, V, A, ALLOC>>,
574 ) -> (Value, usize, usize) {
575 assert_ne!(chunk as *const Chunk<K, V, A, ALLOC> as usize, 0);
576 let mut idx = hash;
577 let cap = chunk.capacity;
578 let base = chunk.base;
579 let cap_mask = chunk.cap_mask();
580 let mut counter = 0;
581 while counter < cap {
582 idx &= cap_mask;
583 let addr = base + idx * ENTRY_SIZE;
584 let k = self.get_fast_key(addr);
585 if k == fkey && chunk.attachment.probe(idx, key) {
586 let val_res = self.get_fast_value(addr);
587 match val_res.parsed {
588 ParsedValue::Empty => {}
589 _ => return (val_res, idx, addr),
590 }
591 } else if k == EMPTY_KEY {
592 return (Value::new::<K, V, A, ALLOC, H>(0), 0, addr);
593 } else if let Some(new_chunk_ins) = migrating {
594 debug_assert!(new_chunk_ins.base != chunk.base);
595 let val_res = self.get_fast_value(addr);
596 if let &ParsedValue::Val(_) = &val_res.parsed {
597 self.migrate_entry(k, idx, val_res, chunk, new_chunk_ins, addr, &mut 0);
598 }
599 }
600 idx += 1; counter += 1;
602 }
603
604 return (Value::new::<K, V, A, ALLOC, H>(0), 0, 0);
606 }
607
608 #[inline(always)]
609 fn modify_entry<'a>(
610 &self,
611 chunk: &'a Chunk<K, V, A, ALLOC>,
612 hash: usize,
613 key: &K,
614 fkey: usize,
615 op: ModOp<V>,
616 migration_chunk: Option<&ChunkPtr<K, V, A, ALLOC>>,
617 _guard: &'a Guard,
618 ) -> ModResult<V> {
619 let cap = chunk.capacity;
620 let base = chunk.base;
621 let mut idx = hash;
622 let mut count = 0;
623 let cap_mask = chunk.cap_mask();
624 let backoff = crossbeam_utils::Backoff::new();
625 while count <= cap {
626 idx &= cap_mask;
627 let addr = base + idx * ENTRY_SIZE;
628 let k = self.get_fast_key(addr);
629 let v = self.get_fast_value(addr);
630 {
631 match v.parsed {
633 ParsedValue::Sentinel => match &op {
634 &ModOp::Sentinel => {
635 }
637 _ => {
638 return ModResult::Sentinel;
640 }
641 },
642 _ => {}
643 }
644 }
645 if k == fkey && chunk.attachment.probe(idx, &key) {
646 let val = v;
648 match &val.parsed {
649 ParsedValue::Val(v) => {
650 match &op {
651 &ModOp::Sentinel => {
652 if self.cas_sentinel(addr, val.raw) {
653 let (_, value) = chunk.attachment.get(idx);
654 chunk.attachment.erase(idx);
655 if *v == 0 {
656 return ModResult::Done(addr, None, idx);
657 } else {
658 return ModResult::Done(*v, Some(value), idx);
659 }
660 } else {
661 return ModResult::Fail;
662 }
663 }
664 &ModOp::Tombstone => {
665 if *v == 0 {
666 return ModResult::NotFound;
668 }
669 if !self.cas_tombstone(addr, val.raw).1 {
670 return ModResult::Fail;
674 } else {
675 let (_, value) = chunk.attachment.get(idx);
677 chunk.attachment.erase(idx);
678 chunk.empty_entries.fetch_add(1, Relaxed);
679 return ModResult::Replaced(*v, value, idx);
680 }
681 }
682 &ModOp::UpsertFastVal(ref fv) => {
683 if self.cas_value(addr, val.raw, *fv).1 {
684 let (_, value) = chunk.attachment.get(idx);
685 if *v == 0 {
686 return ModResult::Done(addr, None, idx);
687 } else {
688 return ModResult::Replaced(*v, value, idx);
689 }
690 } else {
691 trace!("Cannot upsert fast value in place for {}", fkey);
692 return ModResult::Fail;
693 }
694 }
695 &ModOp::AttemptInsert(fval, oval) => {
696 if *v == 0 {
697 let primed_fval = if Self::can_attach() {
698 fval | INV_VAL_BIT_MASK
699 } else {
700 fval
701 };
702 let (act_val, replaced) =
703 self.cas_value(addr, val.raw, primed_fval);
704 if replaced {
705 let (_, prev_val) = chunk.attachment.get(idx);
706 if Self::can_attach() {
707 chunk.attachment.set(idx, key.clone(), (*oval).clone());
708 let stripped_prime =
709 self.cas_value(addr, primed_fval, fval).1;
710 debug_assert!(stripped_prime);
711 }
712 return ModResult::Replaced(val.raw, prev_val, idx);
713 } else {
714 let (_, value) = chunk.attachment.get(idx);
715 return ModResult::Existed(act_val, value);
716 }
717 } else {
718 trace!(
719 "Attempting insert existed entry {}, {}, have key {:?}, skip",
720 k,
721 fval,
722 v
723 );
724 let (_, value) = chunk.attachment.get(idx);
725 return ModResult::Existed(*v, value);
726 }
727 }
728 &ModOp::SwapFastVal(ref swap) => {
729 trace!(
730 "Swaping found key {} have original value {:#064b}",
731 fkey,
732 val.raw
733 );
734 match &val.parsed {
735 ParsedValue::Val(pval) => {
736 let pval = *pval;
737 if pval == 0 {
738 return ModResult::NotFound;
739 }
740 let aval = chunk.attachment.get(idx).1;
741 if let Some(v) = swap(pval) {
742 if self.cas_value(addr, pval, v).1 {
743 return ModResult::Replaced(val.raw, aval, idx);
745 } else {
746 return ModResult::Fail;
747 }
748 } else {
749 return ModResult::Aborted;
750 }
751 }
752 _ => {
753 return ModResult::Fail;
754 }
755 }
756 }
757 &ModOp::Insert(fval, ref v) => {
758 debug!("Inserting in place for {}", fkey);
761 let primed_fval = if Self::can_attach() {
762 fval | INV_VAL_BIT_MASK
763 } else {
764 fval
765 };
766 if self.cas_value(addr, val.raw, primed_fval).1 {
767 let (_, prev_val) = chunk.attachment.get(idx);
768 if Self::can_attach() {
769 chunk.attachment.set(idx, key.clone(), (*v).clone());
770 let stripped_prime =
771 self.cas_value(addr, primed_fval, fval).1;
772 debug_assert!(stripped_prime);
773 }
774 return ModResult::Replaced(val.raw, prev_val, idx);
775 } else {
776 trace!("Cannot insert in place for {}", fkey);
777 return ModResult::Fail;
778 }
779 }
780 }
781 }
782 ParsedValue::Empty => {
783 }
786 ParsedValue::Sentinel => return ModResult::Sentinel,
787 ParsedValue::Prime(v) => {
788 trace!(
789 "Discovered prime for key {} with value {:#064b}, retry",
790 fkey,
791 v
792 );
793 backoff.spin();
794 continue;
795 }
796 }
797 } else if k == EMPTY_KEY {
798 match op {
799 ModOp::Insert(fval, val) | ModOp::AttemptInsert(fval, val) => {
800 trace!(
801 "Inserting entry key: {}, value: {}, raw: {:b}, addr: {}",
802 fkey,
803 fval & VAL_BIT_MASK,
804 fval,
805 addr
806 );
807 if self.cas_value(addr, EMPTY_VALUE, fval).1 {
808 chunk.attachment.set(idx, key.clone(), (*val).clone());
810 unsafe { intrinsics::atomic_store_rel(addr as *mut usize, fkey) }
811 return ModResult::Done(addr, None, idx);
812 } else {
813 backoff.spin();
814 continue;
815 }
816 }
817 ModOp::UpsertFastVal(fval) => {
818 trace!(
819 "Upserting entry key: {}, value: {}, raw: {:b}, addr: {}",
820 fkey,
821 fval & VAL_BIT_MASK,
822 fval,
823 addr
824 );
825 if self.cas_value(addr, EMPTY_VALUE, fval).1 {
826 unsafe { intrinsics::atomic_store_rel(addr as *mut usize, fkey) }
827 return ModResult::Done(addr, None, idx);
828 } else {
829 backoff.spin();
830 continue;
831 }
832 }
833 ModOp::Sentinel => {
834 if self.cas_sentinel(addr, 0) {
835 unsafe { intrinsics::atomic_store_rel(addr as *mut usize, fkey) }
837 return ModResult::Done(addr, None, idx);
838 } else {
839 backoff.spin();
840 continue;
841 }
842 }
843 ModOp::Tombstone => return ModResult::Fail,
844 ModOp::SwapFastVal(_) => return ModResult::NotFound,
845 };
846 } else if let (Some(migration_chunk), &ParsedValue::Val(_)) =
847 (migration_chunk, &v.parsed)
848 {
849 self.migrate_entry(k, idx, v, chunk, migration_chunk, addr, &mut 0);
850 }
851 idx += 1; count += 1;
853 }
854 match op {
855 ModOp::Insert(_fv, _v) | ModOp::AttemptInsert(_fv, _v) => ModResult::TableFull,
856 ModOp::UpsertFastVal(_fv) => ModResult::TableFull,
857 _ => ModResult::NotFound,
858 }
859 }
860
861 fn all_from_chunk(&self, chunk: &Chunk<K, V, A, ALLOC>) -> Vec<(usize, usize, K, V)> {
862 let mut idx = 0;
863 let cap = chunk.capacity;
864 let base = chunk.base;
865 let mut counter = 0;
866 let mut res = Vec::with_capacity(chunk.occupation.load(Relaxed));
867 let cap_mask = chunk.cap_mask();
868 while counter < cap {
869 idx &= cap_mask;
870 let addr = base + idx * ENTRY_SIZE;
871 let k = self.get_fast_key(addr);
872 if k != EMPTY_KEY {
873 let val_res = self.get_fast_value(addr);
874 match val_res.parsed {
875 ParsedValue::Val(0) => {}
876 ParsedValue::Val(v) => {
877 let (key, value) = chunk.attachment.get(idx);
878 res.push((k, v, key, value))
879 }
880 ParsedValue::Prime(_) => {
881 continue;
882 }
883 _ => {}
884 }
885 }
886 idx += 1; counter += 1;
888 }
889 return res;
890 }
891
892 fn entries(&self) -> Vec<(usize, usize, K, V)> {
893 let guard = crossbeam_epoch::pin();
894 let old_chunk_ref = self.chunk.load(Acquire, &guard);
895 let new_chunk_ref = self.new_chunk.load(Acquire, &guard);
896 let old_chunk = unsafe { old_chunk_ref.deref() };
897 let new_chunk = unsafe { new_chunk_ref.deref() };
898 let mut res = self.all_from_chunk(&*old_chunk);
899 if !new_chunk_ref.is_null() && old_chunk_ref != new_chunk_ref {
900 res.append(&mut self.all_from_chunk(&*new_chunk));
901 }
902 return res;
903 }
904
905 #[inline(always)]
906 fn get_fast_key(&self, entry_addr: usize) -> usize {
907 debug_assert!(entry_addr > 0);
908 unsafe { intrinsics::atomic_load_acq(entry_addr as *mut usize) }
909 }
910
911 #[inline(always)]
912 fn get_fast_value(&self, entry_addr: usize) -> Value {
913 debug_assert!(entry_addr > 0);
914 let addr = entry_addr + mem::size_of::<usize>();
915 let val = unsafe { intrinsics::atomic_load_acq(addr as *mut usize) };
916 Value::new::<K, V, A, ALLOC, H>(val)
917 }
918
919 #[inline(always)]
920 fn cas_tombstone(&self, entry_addr: usize, original: usize) -> (usize, bool) {
921 debug_assert!(entry_addr > 0);
922 self.cas_value(entry_addr, original, TOMBSTONE_VALUE)
923 }
924 #[inline(always)]
925 fn cas_value(&self, entry_addr: usize, original: usize, value: usize) -> (usize, bool) {
926 debug_assert!(entry_addr > 0);
927 debug_assert_ne!(value & VAL_BIT_MASK, SENTINEL_VALUE);
928 let addr = entry_addr + mem::size_of::<usize>();
929 unsafe { intrinsics::atomic_cxchg_acqrel(addr as *mut usize, original, value) }
930 }
931 #[inline(always)]
932 fn cas_sentinel(&self, entry_addr: usize, original: usize) -> bool {
933 assert!(entry_addr > 0);
934 if cfg!(debug_assert) {
935 let guard = crossbeam_epoch::pin();
936 assert!(Self::is_copying(self.epoch.load(Acquire)));
937 assert!(!self.new_chunk.load(Acquire, &guard).is_null());
938 let chunk = self.chunk.load(Acquire, &guard);
939 let chunk_ref = unsafe { chunk.deref() };
940 assert!(entry_addr >= chunk_ref.base);
941 assert!(entry_addr < chunk_ref.base + chunk_ref.total_size);
942 }
943 let addr = entry_addr + mem::size_of::<usize>();
944 let (val, done) = unsafe {
945 intrinsics::atomic_cxchg_acqrel(addr as *mut usize, original, SENTINEL_VALUE)
946 };
947 done || val == SENTINEL_VALUE
948 }
949
950 fn check_migration<'a>(
952 &self,
953 old_chunk_ptr: Shared<'a, ChunkPtr<K, V, A, ALLOC>>,
954 guard: &crossbeam_epoch::Guard,
955 ) -> ResizeResult {
956 let old_chunk_ins = unsafe { old_chunk_ptr.deref() };
957 let occupation = old_chunk_ins.occupation.load(Relaxed);
958 let occu_limit = old_chunk_ins.occu_limit;
959 if occupation <= occu_limit {
960 return ResizeResult::NoNeed;
961 }
962 self.do_migration(old_chunk_ptr, guard)
963 }
964
965 fn do_migration<'a>(
966 &self,
967 old_chunk_ptr: Shared<'a, ChunkPtr<K, V, A, ALLOC>>,
968 guard: &crossbeam_epoch::Guard,
969 ) -> ResizeResult {
970 let epoch = self.now_epoch();
971 let old_chunk_ins = unsafe { old_chunk_ptr.deref() };
972 let empty_entries = old_chunk_ins.empty_entries.load(Relaxed);
973 let old_cap = old_chunk_ins.capacity;
974 let new_cap = if empty_entries > (old_cap >> 1) {
975 old_cap
977 } else {
978 let mut cap = old_cap << 1;
979 if cap < 2048 {
980 cap <<= 1;
981 }
982 if epoch < 5 {
983 cap <<= 1;
984 }
985 if timestamp() - self.timestamp.load(Acquire) < 1000 {
986 cap <<= 1;
987 }
988 cap
989 };
990 debug!(
991 "New size for {:?} is {}, was {}",
992 old_chunk_ptr, new_cap, old_cap
993 );
994 if let Err(_) = self
996 .new_chunk
997 .compare_and_set(Shared::null(), old_chunk_ptr, AcqRel, guard)
998 {
999 trace!("Cannot obtain lock for resize, will retry");
1001 return ResizeResult::SwapFailed;
1002 }
1003 dfence();
1004 if self.chunk.load(Acquire, guard) != old_chunk_ptr {
1005 warn!(
1006 "Give up on resize due to old chunk changed after lock obtained, epoch {} to {}",
1007 epoch,
1008 self.now_epoch()
1009 );
1010 self.new_chunk.store(Shared::null(), Release);
1011 dfence();
1012 debug_assert_eq!(self.now_epoch() % 2, 0);
1013 return ResizeResult::ChunkChanged;
1014 }
1015 debug!("Resizing {:?}", old_chunk_ptr);
1016 let new_chunk_ptr =
1017 Owned::new(ChunkPtr::new(Chunk::alloc_chunk(new_cap))).into_shared(guard);
1018 let new_chunk_ins = unsafe { new_chunk_ptr.deref() };
1019 assert_ne!(new_chunk_ptr, old_chunk_ptr);
1020 self.new_chunk.store(new_chunk_ptr, Release); dfence();
1022 let prev_epoch = self.epoch.fetch_add(1, AcqRel); debug_assert_eq!(prev_epoch % 2, 0);
1024 dfence();
1025 self.migrate_entries(old_chunk_ins, new_chunk_ins, guard);
1027 debug_assert_ne!(old_chunk_ins.ptr as usize, new_chunk_ins.base);
1029 debug_assert_ne!(old_chunk_ins.ptr, unsafe { new_chunk_ptr.deref().ptr });
1030 debug_assert!(!new_chunk_ptr.is_null());
1031 dfence();
1032 let prev_epoch = self.epoch.fetch_add(1, AcqRel); debug_assert_eq!(prev_epoch % 2, 1);
1034 dfence();
1035 self.chunk.store(new_chunk_ptr, Release);
1036 self.timestamp.store(timestamp(), Release);
1037 dfence();
1038 unsafe {
1039 guard.defer_destroy(old_chunk_ptr);
1040 guard.flush();
1041 }
1042 self.new_chunk.store(Shared::null(), Release);
1043 debug!(
1044 "Migration for {:?} completed, new chunk is {:?}, size from {} to {}",
1045 old_chunk_ptr, new_chunk_ptr, old_cap, new_cap
1046 );
1047 ResizeResult::Done
1048 }
1049
1050 fn migrate_entries(
1051 &self,
1052 old_chunk_ins: &Chunk<K, V, A, ALLOC>,
1053 new_chunk_ins: &Chunk<K, V, A, ALLOC>,
1054 _guard: &crossbeam_epoch::Guard,
1055 ) -> usize {
1056 let mut old_address = old_chunk_ins.base as usize;
1057 let boundary = old_address + chunk_size_of(old_chunk_ins.capacity);
1058 let mut effective_copy = 0;
1059 let mut idx = 0;
1060 let backoff = crossbeam_utils::Backoff::new();
1061 while old_address < boundary {
1062 let fvalue = self.get_fast_value(old_address);
1064 let fkey = self.get_fast_key(old_address);
1065 match &fvalue.parsed {
1067 ParsedValue::Empty | ParsedValue::Val(0) => {
1068 if !self.cas_sentinel(old_address, fvalue.raw) {
1069 warn!("Filling empty with sentinel for old table should succeed but not, retry");
1070 backoff.spin();
1071 continue;
1072 }
1073 }
1074 ParsedValue::Val(_) => {
1075 if !self.migrate_entry(
1076 fkey,
1077 idx,
1078 fvalue,
1079 old_chunk_ins,
1080 new_chunk_ins,
1081 old_address,
1082 &mut effective_copy,
1083 ) {
1084 continue;
1085 }
1086 }
1087 ParsedValue::Prime(_) => {
1088 unreachable!("Shall not have prime in old table");
1089 }
1090 ParsedValue::Sentinel => {
1091 trace!("Skip copy sentinel");
1095 }
1096 }
1097 old_address += ENTRY_SIZE;
1098 idx += 1;
1099 dfence();
1100 }
1101 debug!("Migrated {} entries to new chunk", effective_copy);
1103 new_chunk_ins.occupation.fetch_add(effective_copy, Relaxed);
1104 return effective_copy;
1105 }
1106
1107 #[inline(always)]
1108 fn migrate_entry(
1109 &self,
1110 fkey: usize,
1111 old_idx: usize,
1112 fvalue: Value,
1113 old_chunk_ins: &Chunk<K, V, A, ALLOC>,
1114 new_chunk_ins: &Chunk<K, V, A, ALLOC>,
1115 old_address: usize,
1116 effective_copy: &mut usize,
1117 ) -> bool {
1118 debug_assert_ne!(old_chunk_ins.base, new_chunk_ins.base);
1119 if fkey == EMPTY_KEY {
1120 return false;
1122 }
1123 assert_ne!(fvalue.raw & VAL_BIT_MASK, SENTINEL_VALUE);
1126 let (key, value) = old_chunk_ins.attachment.get(old_idx);
1127 let inserted_addr = {
1128 let cap = new_chunk_ins.capacity;
1130 let base = new_chunk_ins.base;
1131 let mut idx = hash::<H>(fkey);
1132 let cap_mask = new_chunk_ins.cap_mask();
1133 let mut count = 0;
1134 let mut res = None;
1135 while count < cap {
1136 idx &= cap_mask;
1137 let addr = base + idx * ENTRY_SIZE;
1138 let k = self.get_fast_key(addr);
1139 if k == fkey && new_chunk_ins.attachment.probe(idx, &key) {
1140 break;
1142 } else if k == EMPTY_KEY {
1143 let (val, done) = self.cas_value(addr, EMPTY_VALUE, fvalue.raw);
1145 debug_assert_ne!(val & VAL_BIT_MASK, SENTINEL_VALUE);
1146 if done {
1147 new_chunk_ins.attachment.set(idx, key, value);
1148 unsafe { intrinsics::atomic_store_rel(addr as *mut usize, fkey) }
1149 res = Some(addr);
1150 break;
1151 }
1152 }
1153 idx += 1; count += 1;
1155 }
1156 res
1157 };
1158 dfence(); trace!("Copied key {} to new chunk", fkey);
1162 if self.cas_sentinel(old_address, fvalue.raw) {
1163 dfence();
1164 if let Some(_new_entry_addr) = inserted_addr {
1165 old_chunk_ins.attachment.erase(old_idx);
1166 *effective_copy += 1;
1167 return true;
1168 }
1169 }
1170 false
1171 }
1172
1173 pub fn map_is_copying(&self) -> bool {
1174 Self::is_copying(self.now_epoch())
1175 }
1176
1177 #[inline(always)]
1178 fn can_attach() -> bool {
1179 can_attach::<K, V, A>()
1180 }
1181}
1182
1183impl Value {
1184 pub fn new<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default, H: Hasher + Default>(
1185 val: usize,
1186 ) -> Self {
1187 let res = {
1188 if val == EMPTY_VALUE {
1189 ParsedValue::Empty
1190 } else if val == TOMBSTONE_VALUE {
1191 ParsedValue::Val(0)
1192 } else {
1193 let actual_val = val & VAL_BIT_MASK;
1194 let flag = val & INV_VAL_BIT_MASK;
1195 if flag != 0 {
1196 ParsedValue::Prime(actual_val)
1197 } else if actual_val == SENTINEL_VALUE {
1198 ParsedValue::Sentinel
1199 } else if actual_val == TOMBSTONE_VALUE {
1200 unreachable!("");
1201 } else {
1202 ParsedValue::Val(actual_val)
1203 }
1204 }
1205 };
1206 Value {
1207 raw: val,
1208 parsed: res,
1209 }
1210 }
1211}
1212
1213impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> Chunk<K, V, A, ALLOC> {
1214 fn alloc_chunk(capacity: usize) -> *mut Self {
1215 let capacity = capacity;
1216 let self_size = mem::size_of::<Self>();
1217 let self_align = align_padding(self_size, 64);
1218 let self_size_aligned = self_size + self_align;
1219 let chunk_size = chunk_size_of(capacity);
1220 let attachment_heap = A::heap_size_of(capacity);
1221 let total_size = self_size_aligned + chunk_size + attachment_heap;
1222 let ptr = alloc_mem::<ALLOC>(total_size) as *mut Self;
1223 let addr = ptr as usize;
1224 let data_base = addr + self_size_aligned;
1225 let attachment_base = data_base + chunk_size;
1226 unsafe {
1227 ptr::write(
1228 ptr,
1229 Self {
1230 base: data_base,
1231 capacity,
1232 occupation: AtomicUsize::new(0),
1233 empty_entries: AtomicUsize::new(0),
1234 occu_limit: occupation_limit(capacity),
1235 total_size,
1236 attachment: A::new(capacity, attachment_base, attachment_heap),
1237 shadow: PhantomData,
1238 },
1239 )
1240 };
1241 ptr
1242 }
1243
1244 unsafe fn gc(ptr: *mut Chunk<K, V, A, ALLOC>) {
1245 debug_assert_ne!(ptr as usize, 0);
1246 let chunk = &*ptr;
1247 chunk.attachment.dealloc();
1248 dealloc_mem::<ALLOC>(ptr as usize, chunk.total_size);
1249 }
1250
1251 #[inline]
1252 fn cap_mask(&self) -> usize {
1253 self.capacity - 1
1254 }
1255}
1256
1257impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Clone
1258 for Table<K, V, A, ALLOC, H>
1259{
1260 fn clone(&self) -> Self {
1261 let new_table = Table {
1262 chunk: Default::default(),
1263 new_chunk: Default::default(),
1264 count: AtomicUsize::new(0),
1265 epoch: AtomicUsize::new(0),
1266 timestamp: AtomicU64::new(timestamp()),
1267 mark: PhantomData,
1268 };
1269 let guard = crossbeam_epoch::pin();
1270 let old_chunk_ptr = self.chunk.load(Acquire, &guard);
1271 let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
1272 unsafe {
1273 let old_chunk = old_chunk_ptr.deref();
1275 let old_total_size = old_chunk.total_size;
1276
1277 let cloned_old_ptr = alloc_mem::<ALLOC>(old_total_size) as *mut Chunk<K, V, A, ALLOC>;
1278 debug_assert_ne!(cloned_old_ptr as usize, 0);
1279 debug_assert_ne!(old_chunk.ptr as usize, 0);
1280 libc::memcpy(
1281 cloned_old_ptr as *mut c_void,
1282 old_chunk.ptr as *const c_void,
1283 old_total_size,
1284 );
1285 let cloned_old_ref = Owned::new(ChunkPtr::new(cloned_old_ptr));
1286 new_table.chunk.store(cloned_old_ref, Release);
1287
1288 if new_chunk_ptr != Shared::null() {
1289 let new_chunk = new_chunk_ptr.deref();
1290 let new_total_size = new_chunk.total_size;
1291 let cloned_new_ptr =
1292 alloc_mem::<ALLOC>(new_total_size) as *mut Chunk<K, V, A, ALLOC>;
1293 libc::memcpy(
1294 cloned_new_ptr as *mut c_void,
1295 new_chunk.ptr as *const c_void,
1296 new_total_size,
1297 );
1298 let cloned_new_ref = Owned::new(ChunkPtr::new(cloned_new_ptr));
1299 new_table.new_chunk.store(cloned_new_ref, Release);
1300 } else {
1301 new_table.new_chunk.store(Shared::null(), Release);
1302 }
1303 }
1304 new_table.count.store(self.count.load(Acquire), Release);
1305 new_table
1306 }
1307}
1308
1309impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop
1310 for Table<K, V, A, ALLOC, H>
1311{
1312 fn drop(&mut self) {
1313 let guard = crossbeam_epoch::pin();
1314 unsafe {
1315 guard.defer_destroy(self.chunk.load(Acquire, &guard));
1316 let new_chunk_ptr = self.new_chunk.load(Acquire, &guard);
1317 if new_chunk_ptr != Shared::null() {
1318 guard.defer_destroy(new_chunk_ptr);
1319 }
1320 guard.flush();
1321 }
1322 }
1323}
1324
1325unsafe impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> Send
1326 for ChunkPtr<K, V, A, ALLOC>
1327{
1328}
1329unsafe impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> Sync
1330 for ChunkPtr<K, V, A, ALLOC>
1331{
1332}
1333
1334impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> Drop for ChunkPtr<K, V, A, ALLOC> {
1335 fn drop(&mut self) {
1336 debug_assert_ne!(self.ptr as usize, 0);
1337
1338 unsafe {
1339 Chunk::gc(self.ptr);
1340 }
1341 }
1342}
1343
1344impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> Deref for ChunkPtr<K, V, A, ALLOC> {
1345 type Target = Chunk<K, V, A, ALLOC>;
1346
1347 fn deref(&self) -> &Self::Target {
1348 debug_assert_ne!(self.ptr as usize, 0);
1349 unsafe { &*self.ptr }
1350 }
1351}
1352
1353impl<K, V, A: Attachment<K, V>, ALLOC: GlobalAlloc + Default> ChunkPtr<K, V, A, ALLOC> {
1354 fn new(ptr: *mut Chunk<K, V, A, ALLOC>) -> Self {
1355 debug_assert_ne!(ptr as usize, 0);
1356 Self { ptr }
1357 }
1358}
1359
1360#[inline(always)]
1361fn is_power_of_2(x: usize) -> bool {
1362 (x != 0) && ((x & (x - 1)) == 0)
1363}
1364
1365#[inline(always)]
1366fn occupation_limit(cap: usize) -> usize {
1367 (cap as f64 * 0.75f64) as usize
1368}
1369
1370#[inline(always)]
1371fn chunk_size_of(cap: usize) -> usize {
1372 cap * ENTRY_SIZE
1373}
1374
1375#[inline(always)]
1376pub fn hash<H: Hasher + Default>(num: usize) -> usize {
1377 let mut hasher = H::default();
1378 hasher.write_usize(num);
1379 hasher.finish() as usize
1380}
1381
1382#[inline(always)]
1383pub fn hash_key<K: Hash, H: Hasher + Default>(key: &K) -> usize {
1384 let mut hasher = H::default();
1385 key.hash(&mut hasher);
1386 hasher.finish() as usize
1387}
1388
1389#[inline(always)]
1390fn dfence() {
1391 compiler_fence(SeqCst);
1392 fence(SeqCst);
1393}
1394
1395const fn can_attach<K, V, A: Attachment<K, V>>() -> bool {
1396 mem::size_of::<(K, V)>() != 0
1397}
1398
1399pub trait Attachment<K, V> {
1400 fn heap_size_of(cap: usize) -> usize;
1401 fn new(cap: usize, heap_ptr: usize, heap_size: usize) -> Self;
1402 fn get(&self, index: usize) -> (K, V);
1403 fn set(&self, index: usize, key: K, value: V);
1404 fn erase(&self, index: usize);
1405 fn dealloc(&self);
1406 fn probe(&self, index: usize, probe_key: &K) -> bool;
1407}
1408
1409pub struct WordAttachment;
1410
1411impl Attachment<(), ()> for WordAttachment {
1413 fn heap_size_of(_cap: usize) -> usize {
1414 0
1415 }
1416
1417 fn new(_cap: usize, _heap_ptr: usize, _heap_size: usize) -> Self {
1418 Self
1419 }
1420
1421 #[inline(always)]
1422 fn get(&self, _index: usize) -> ((), ()) {
1423 ((), ())
1424 }
1425
1426 #[inline(always)]
1427 fn set(&self, _index: usize, _key: (), _value: ()) {}
1428
1429 #[inline(always)]
1430 fn erase(&self, _index: usize) {}
1431
1432 #[inline(always)]
1433 fn dealloc(&self) {}
1434
1435 #[inline(always)]
1436 fn probe(&self, _index: usize, _value: &()) -> bool {
1437 true
1438 }
1439}
1440
1441pub type WordTable<H, ALLOC> = Table<(), (), WordAttachment, H, ALLOC>;
1442
1443pub struct WordObjectAttachment<T, A: GlobalAlloc + Default> {
1444 obj_chunk: usize,
1445 obj_size: usize,
1446 shadow: PhantomData<(T, A)>,
1447}
1448
1449impl<T: Clone, A: GlobalAlloc + Default> Attachment<(), T> for WordObjectAttachment<T, A> {
1450 fn heap_size_of(cap: usize) -> usize {
1451 let obj_size = mem::size_of::<T>();
1452 cap * obj_size
1453 }
1454
1455 fn new(_cap: usize, heap_ptr: usize, _heap_size: usize) -> Self {
1456 Self {
1457 obj_chunk: heap_ptr,
1458 obj_size: mem::size_of::<T>(),
1459 shadow: PhantomData,
1460 }
1461 }
1462
1463 #[inline(always)]
1464 fn get(&self, index: usize) -> ((), T) {
1465 let addr = self.addr_by_index(index);
1466 let v = unsafe { (*(addr as *mut T)).clone() };
1467 ((), v)
1468 }
1469
1470 #[inline(always)]
1471 fn set(&self, index: usize, _key: (), val: T) {
1472 let addr = self.addr_by_index(index);
1473 unsafe { ptr::write(addr as *mut T, val) }
1474 }
1475
1476 #[inline(always)]
1477 fn erase(&self, index: usize) {
1478 drop(self.addr_by_index(index) as *mut T)
1479 }
1480
1481 #[inline(always)]
1482 fn dealloc(&self) {}
1483
1484 fn probe(&self, _index: usize, _value: &()) -> bool {
1485 true
1486 }
1487}
1488
1489pub type HashTable<K, V, ALLOC> =
1490 Table<K, V, HashKVAttachment<K, V, ALLOC>, ALLOC, PassthroughHasher>;
1491
1492pub struct HashKVAttachment<K, V, A: GlobalAlloc + Default> {
1493 obj_chunk: usize,
1494 obj_size: usize,
1495 shadow: PhantomData<(K, V, A)>,
1496}
1497
1498impl<K: Clone + Hash + Eq, V: Clone, A: GlobalAlloc + Default> Attachment<K, V>
1499 for HashKVAttachment<K, V, A>
1500{
1501 fn heap_size_of(cap: usize) -> usize {
1502 let obj_size = mem::size_of::<(K, V)>();
1503 cap * obj_size
1504 }
1505
1506 fn new(_cap: usize, heap_ptr: usize, _heap_size: usize) -> Self {
1507 Self {
1508 obj_chunk: heap_ptr,
1509 obj_size: mem::size_of::<(K, V)>(),
1510 shadow: PhantomData,
1511 }
1512 }
1513
1514 #[inline(always)]
1515 fn get(&self, index: usize) -> (K, V) {
1516 let addr = self.addr_by_index(index);
1517 unsafe { (*(addr as *mut (K, V))).clone() }
1518 }
1519
1520 #[inline(always)]
1521 fn set(&self, index: usize, key: K, val: V) {
1522 let addr = self.addr_by_index(index);
1523 unsafe { ptr::write(addr as *mut (K, V), (key, val)) }
1524 }
1525
1526 #[inline(always)]
1527 fn erase(&self, index: usize) {
1528 drop(self.addr_by_index(index) as *mut (K, V))
1529 }
1530
1531 #[inline(always)]
1532 fn dealloc(&self) {}
1533
1534 fn probe(&self, index: usize, key: &K) -> bool {
1535 let addr = self.addr_by_index(index);
1536 let pos_key = unsafe { &*(addr as *mut K) };
1537 pos_key == key
1538 }
1539}
1540
1541pub trait Map<K, V: Clone> {
1542 fn with_capacity(cap: usize) -> Self;
1543 fn get(&self, key: &K) -> Option<V>;
1544 fn insert(&self, key: &K, value: V) -> Option<V>;
1545 fn try_insert(&self, key: &K, value: V) -> Option<V>;
1547 fn remove(&self, key: &K) -> Option<V>;
1548 fn entries(&self) -> Vec<(K, V)>;
1549 fn contains_key(&self, key: &K) -> bool;
1550 fn len(&self) -> usize;
1551 fn get_or_insert<F: Fn() -> V>(&self, key: &K, func: F) -> V {
1553 loop {
1554 if self.contains_key(key) {
1555 if let Some(value) = self.get(key) {
1556 return value;
1557 }
1558 } else {
1559 let value = func();
1560 if let Some(value) = self.try_insert(key, value.clone()) {
1561 return value;
1562 }
1563 return value;
1564 }
1565 }
1566 }
1567}
1568
1569const NUM_FIX: usize = 5;
1570const PLACEHOLDER_VAL: usize = NUM_FIX + 1;
1571
1572impl<K: Clone + Hash + Eq, V: Clone, A: GlobalAlloc + Default> HashKVAttachment<K, V, A> {
1573 fn addr_by_index(&self, index: usize) -> usize {
1574 self.obj_chunk + index * self.obj_size
1575 }
1576}
1577
1578pub struct HashMap<
1579 K: Clone + Hash + Eq,
1580 V: Clone,
1581 ALLOC: GlobalAlloc + Default = System,
1582 H: Hasher + Default = DefaultHasher,
1583> {
1584 table: HashTable<K, V, ALLOC>,
1585 shadow: PhantomData<H>,
1586}
1587
1588impl<K: Clone + Hash + Eq, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default>
1589 HashMap<K, V, ALLOC, H>
1590{
1591 pub fn insert_with_op(&self, op: InsertOp, key: &K, value: V) -> Option<V> {
1592 let hash = hash_key::<K, H>(&key);
1593 self.table
1594 .insert(op, key, Some(value), hash, PLACEHOLDER_VAL)
1595 .map(|(_, v)| v)
1596 }
1597
1598 pub fn write(&self, key: &K) -> Option<HashMapWriteGuard<K, V, ALLOC, H>> {
1599 HashMapWriteGuard::new(&self.table, key)
1600 }
1601 pub fn read(&self, key: &K) -> Option<HashMapReadGuard<K, V, ALLOC, H>> {
1602 HashMapReadGuard::new(&self.table, key)
1603 }
1604}
1605
1606impl<K: Clone + Hash + Eq, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Map<K, V>
1607 for HashMap<K, V, ALLOC, H>
1608{
1609 fn with_capacity(cap: usize) -> Self {
1610 Self {
1611 table: Table::with_capacity(cap),
1612 shadow: PhantomData,
1613 }
1614 }
1615
1616 #[inline(always)]
1617 fn get(&self, key: &K) -> Option<V> {
1618 let hash = hash_key::<K, H>(key);
1619 self.table.get(key, hash, true).map(|v| v.1.unwrap())
1620 }
1621
1622 #[inline(always)]
1623 fn insert(&self, key: &K, value: V) -> Option<V> {
1624 self.insert_with_op(InsertOp::Insert, key, value)
1625 }
1626
1627 #[inline(always)]
1628 fn try_insert(&self, key: &K, value: V) -> Option<V> {
1629 self.insert_with_op(InsertOp::TryInsert, key, value)
1630 }
1631
1632 #[inline(always)]
1633 fn remove(&self, key: &K) -> Option<V> {
1634 let hash = hash_key::<K, H>(&key);
1635 self.table.remove(key, hash).map(|(_, v)| v)
1636 }
1637
1638 #[inline(always)]
1639 fn entries(&self) -> Vec<(K, V)> {
1640 self.table
1641 .entries()
1642 .into_iter()
1643 .map(|(_, _, k, v)| (k, v))
1644 .collect()
1645 }
1646
1647 #[inline(always)]
1648 fn contains_key(&self, key: &K) -> bool {
1649 let hash = hash_key::<K, H>(&key);
1650 self.table.get(key, hash, false).is_some()
1651 }
1652
1653 #[inline(always)]
1654 fn len(&self) -> usize {
1655 self.table.len()
1656 }
1657}
1658
1659impl<T, A: GlobalAlloc + Default> WordObjectAttachment<T, A> {
1660 fn addr_by_index(&self, index: usize) -> usize {
1661 self.obj_chunk + index * self.obj_size
1662 }
1663}
1664
1665type ObjectTable<V, ALLOC, H> = Table<(), V, WordObjectAttachment<V, ALLOC>, ALLOC, H>;
1666
1667#[derive(Clone)]
1668pub struct ObjectMap<
1669 V: Clone,
1670 ALLOC: GlobalAlloc + Default = System,
1671 H: Hasher + Default = DefaultHasher,
1672> {
1673 table: ObjectTable<V, ALLOC, H>,
1674}
1675
1676impl<V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> ObjectMap<V, ALLOC, H> {
1677 fn insert_with_op(&self, op: InsertOp, key: &usize, value: V) -> Option<V> {
1678 self.table
1679 .insert(op, &(), Some(value), key + NUM_FIX, PLACEHOLDER_VAL)
1680 .map(|(_, v)| v)
1681 }
1682
1683 pub fn read(&self, key: usize) -> Option<ObjectMapReadGuard<V, ALLOC, H>> {
1684 ObjectMapReadGuard::new(&self.table, key)
1685 }
1686
1687 pub fn write(&self, key: usize) -> Option<ObjectMapWriteGuard<V, ALLOC, H>> {
1688 ObjectMapWriteGuard::new(&self.table, key)
1689 }
1690}
1691
1692impl<V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Map<usize, V>
1693 for ObjectMap<V, ALLOC, H>
1694{
1695 fn with_capacity(cap: usize) -> Self {
1696 Self {
1697 table: Table::with_capacity(cap),
1698 }
1699 }
1700
1701 #[inline(always)]
1702 fn get(&self, key: &usize) -> Option<V> {
1703 self.table
1704 .get(&(), key + NUM_FIX, true)
1705 .map(|v| v.1.unwrap())
1706 }
1707
1708 #[inline(always)]
1709 fn insert(&self, key: &usize, value: V) -> Option<V> {
1710 self.insert_with_op(InsertOp::Insert, key, value)
1711 }
1712
1713 #[inline(always)]
1714 fn try_insert(&self, key: &usize, value: V) -> Option<V> {
1715 self.insert_with_op(InsertOp::TryInsert, key, value)
1716 }
1717
1718 #[inline(always)]
1719 fn remove(&self, key: &usize) -> Option<V> {
1720 self.table.remove(&(), key + NUM_FIX).map(|(_, v)| v)
1721 }
1722
1723 #[inline(always)]
1724 fn entries(&self) -> Vec<(usize, V)> {
1725 self.table
1726 .entries()
1727 .into_iter()
1728 .map(|(_, k, _, v)| (k - NUM_FIX, v))
1729 .collect()
1730 }
1731
1732 #[inline(always)]
1733 fn contains_key(&self, key: &usize) -> bool {
1734 self.table.get(&(), key + NUM_FIX, false).is_some()
1735 }
1736
1737 #[inline(always)]
1738 fn len(&self) -> usize {
1739 self.table.len()
1740 }
1741}
1742
1743#[derive(Clone)]
1744pub struct WordMap<ALLOC: GlobalAlloc + Default = System, H: Hasher + Default = DefaultHasher> {
1745 table: WordTable<ALLOC, H>,
1746}
1747
1748impl<ALLOC: GlobalAlloc + Default, H: Hasher + Default> WordMap<ALLOC, H> {
1749 fn insert_with_op(&self, op: InsertOp, key: &usize, value: usize) -> Option<usize> {
1750 self.table
1751 .insert(op, &(), None, key + NUM_FIX, value + NUM_FIX)
1752 .map(|(v, _)| v)
1753 }
1754
1755 pub fn get_from_mutex(&self, key: &usize) -> Option<usize> {
1756 self.get(key).map(|v| v & WORD_MUTEX_DATA_BIT_MASK)
1757 }
1758}
1759
1760impl<ALLOC: GlobalAlloc + Default, H: Hasher + Default> Map<usize, usize> for WordMap<ALLOC, H> {
1761 fn with_capacity(cap: usize) -> Self {
1762 Self {
1763 table: Table::with_capacity(cap),
1764 }
1765 }
1766
1767 #[inline(always)]
1768 fn get(&self, key: &usize) -> Option<usize> {
1769 self.table
1770 .get(&(), key + NUM_FIX, false)
1771 .map(|v| v.0 - NUM_FIX)
1772 }
1773
1774 #[inline(always)]
1775 fn insert(&self, key: &usize, value: usize) -> Option<usize> {
1776 self.insert_with_op(InsertOp::UpsertFast, key, value)
1777 }
1778
1779 #[inline(always)]
1780 fn try_insert(&self, key: &usize, value: usize) -> Option<usize> {
1781 self.insert_with_op(InsertOp::TryInsert, key, value)
1782 }
1783
1784 #[inline(always)]
1785 fn remove(&self, key: &usize) -> Option<usize> {
1786 self.table
1787 .remove(&(), key + NUM_FIX)
1788 .map(|(v, _)| v - NUM_FIX)
1789 }
1790 fn entries(&self) -> Vec<(usize, usize)> {
1791 self.table
1792 .entries()
1793 .into_iter()
1794 .map(|(k, v, _, _)| (k - NUM_FIX, v - NUM_FIX))
1795 .collect()
1796 }
1797
1798 #[inline(always)]
1799 fn contains_key(&self, key: &usize) -> bool {
1800 self.get(key).is_some()
1801 }
1802
1803 #[inline(always)]
1804 fn len(&self) -> usize {
1805 self.table.len()
1806 }
1807}
1808
1809const WORD_MUTEX_DATA_BIT_MASK: usize = !0 << 2 >> 2;
1810
1811pub struct WordMutexGuard<
1812 'a,
1813 ALLOC: GlobalAlloc + Default = System,
1814 H: Hasher + Default = DefaultHasher,
1815> {
1816 table: &'a WordTable<ALLOC, H>,
1817 key: usize,
1818 value: usize,
1819}
1820
1821impl<'a, ALLOC: GlobalAlloc + Default, H: Hasher + Default> WordMutexGuard<'a, ALLOC, H> {
1822 fn create(table: &'a WordTable<ALLOC, H>, key: usize) -> Option<Self> {
1823 let key = key + NUM_FIX;
1824 let value = 0;
1825 match table.insert(
1826 InsertOp::TryInsert,
1827 &(),
1828 Some(()),
1829 key,
1830 value | MUTEX_BIT_MASK,
1831 ) {
1832 None | Some((TOMBSTONE_VALUE, ())) | Some((EMPTY_VALUE, ())) => {
1833 trace!("Created locked key {}", key);
1834 Some(Self { table, key, value })
1835 }
1836 _ => {
1837 trace!("Cannot create locked key {} ", key);
1838 None
1839 }
1840 }
1841 }
1842 fn new(table: &'a WordTable<ALLOC, H>, key: usize) -> Option<Self> {
1843 let key = key + NUM_FIX;
1844 let backoff = crossbeam_utils::Backoff::new();
1845 let guard = crossbeam_epoch::pin();
1846 let value;
1847 loop {
1848 let swap_res = table.swap(
1849 key,
1850 &(),
1851 move |fast_value| {
1852 trace!("The key {} have value {}", key, fast_value);
1853 let locked_val = fast_value | MUTEX_BIT_MASK;
1854 if fast_value == locked_val {
1855 trace!("The key {} have locked, unchanged and try again", key);
1857 None
1858 } else {
1859 trace!(
1861 "The key {} have obtained, with value {}",
1862 key,
1863 fast_value & WORD_MUTEX_DATA_BIT_MASK
1864 );
1865 Some(locked_val)
1866 }
1867 },
1868 &guard,
1869 );
1870 match swap_res {
1871 SwapResult::Succeed(val, _idx, _chunk) => {
1872 trace!("Lock on key {} succeed with value {}", key, val);
1873 value = val & WORD_MUTEX_DATA_BIT_MASK;
1874 break;
1875 }
1876 SwapResult::Failed | SwapResult::Aborted => {
1877 trace!("Lock on key {} failed, retry", key);
1878 backoff.spin();
1879 continue;
1880 }
1881 SwapResult::NotFound => {
1882 trace!("Cannot found key {} to lock", key);
1883 return None;
1884 }
1885 }
1886 }
1887 debug_assert_ne!(value, 0);
1888 let value = value - NUM_FIX;
1889 Some(Self { table, key, value })
1890 }
1891
1892 pub fn remove(self) -> usize {
1893 trace!("Removing {}", self.key);
1894 let res = self.table.remove(&(), self.key).unwrap().0;
1895 mem::forget(self);
1896 res | MUTEX_BIT_MASK
1897 }
1898}
1899
1900impl<'a, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Deref for WordMutexGuard<'a, ALLOC, H> {
1901 type Target = usize;
1902
1903 fn deref(&self) -> &Self::Target {
1904 &self.value
1905 }
1906}
1907
1908impl<'a, ALLOC: GlobalAlloc + Default, H: Hasher + Default> DerefMut
1909 for WordMutexGuard<'a, ALLOC, H>
1910{
1911 fn deref_mut(&mut self) -> &mut Self::Target {
1912 &mut self.value
1913 }
1914}
1915
1916impl<'a, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop for WordMutexGuard<'a, ALLOC, H> {
1917 fn drop(&mut self) {
1918 self.value += NUM_FIX;
1919 trace!(
1920 "Release lock for key {} with value {}",
1921 self.key,
1922 self.value
1923 );
1924 self.table.insert(
1925 InsertOp::UpsertFast,
1926 &(),
1927 None,
1928 self.key,
1929 self.value & WORD_MUTEX_DATA_BIT_MASK,
1930 );
1931 }
1932}
1933
1934impl<ALLOC: GlobalAlloc + Default, H: Hasher + Default> WordMap<ALLOC, H> {
1935 pub fn lock(&self, key: usize) -> Option<WordMutexGuard<ALLOC, H>> {
1936 WordMutexGuard::new(&self.table, key)
1937 }
1938 pub fn try_insert_locked(&self, key: usize) -> Option<WordMutexGuard<ALLOC, H>> {
1939 WordMutexGuard::create(&self.table, key)
1940 }
1941}
1942
1943pub struct HashMapReadGuard<
1944 'a,
1945 K: Clone + Eq + Hash,
1946 V: Clone,
1947 ALLOC: GlobalAlloc + Default = System,
1948 H: Hasher + Default = DefaultHasher,
1949> {
1950 table: &'a HashTable<K, V, ALLOC>,
1951 hash: usize,
1952 key: K,
1953 value: V,
1954 _mark: PhantomData<H>,
1955}
1956
1957impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default>
1958 HashMapReadGuard<'a, K, V, ALLOC, H>
1959{
1960 fn new(table: &'a HashTable<K, V, ALLOC>, key: &K) -> Option<Self> {
1961 let backoff = crossbeam_utils::Backoff::new();
1962 let guard = crossbeam_epoch::pin();
1963 let hash = hash_key::<K, H>(&key);
1964 let value: V;
1965 loop {
1966 let swap_res = table.swap(
1967 hash,
1968 key,
1969 move |fast_value| {
1970 if fast_value != PLACEHOLDER_VAL - 1 {
1971 trace!("Key hash {} is not write locked, will read lock", hash);
1973 Some(fast_value + 1)
1974 } else {
1975 trace!("Key hash {} is write locked, unchanged", hash);
1976 None
1977 }
1978 },
1979 &guard,
1980 );
1981 match swap_res {
1982 SwapResult::Succeed(_, idx, chunk) => {
1983 let chunk_ref = unsafe { chunk.deref() };
1984 let (_, v) = chunk_ref.attachment.get(idx);
1985 value = v;
1986 break;
1987 }
1988 SwapResult::Failed | SwapResult::Aborted => {
1989 trace!("Lock on key hash {} failed, retry", hash);
1990 backoff.spin();
1991 continue;
1992 }
1993 SwapResult::NotFound => {
1994 debug!("Cannot found hash key {} to lock", hash);
1995 return None;
1996 }
1997 }
1998 }
1999 Some(Self {
2000 table,
2001 key: key.clone(),
2002 value,
2003 hash,
2004 _mark: Default::default(),
2005 })
2006 }
2007}
2008
2009impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Deref
2010 for HashMapReadGuard<'a, K, V, ALLOC, H>
2011{
2012 type Target = V;
2013
2014 fn deref(&self) -> &Self::Target {
2015 &self.value
2016 }
2017}
2018
2019impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop
2020 for HashMapReadGuard<'a, K, V, ALLOC, H>
2021{
2022 fn drop(&mut self) {
2023 trace!("Release read lock for hash key {}", self.hash);
2024 let guard = crossbeam_epoch::pin();
2025 self.table.swap(
2026 self.hash,
2027 &self.key,
2028 |fast_value| {
2029 debug_assert!(fast_value > PLACEHOLDER_VAL);
2030 Some(fast_value - 1)
2031 },
2032 &guard,
2033 );
2034 }
2035}
2036
2037pub struct HashMapWriteGuard<
2038 'a,
2039 K: Clone + Eq + Hash,
2040 V: Clone,
2041 ALLOC: GlobalAlloc + Default = System,
2042 H: Hasher + Default = DefaultHasher,
2043> {
2044 table: &'a HashTable<K, V, ALLOC>,
2045 hash: usize,
2046 key: K,
2047 value: V,
2048 _mark: PhantomData<H>,
2049}
2050
2051impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default>
2052 HashMapWriteGuard<'a, K, V, ALLOC, H>
2053{
2054 fn new(table: &'a HashTable<K, V, ALLOC>, key: &K) -> Option<Self> {
2055 let backoff = crossbeam_utils::Backoff::new();
2056 let guard = crossbeam_epoch::pin();
2057 let hash = hash_key::<K, H>(&key);
2058 let value: V;
2059 loop {
2060 let swap_res = table.swap(
2061 hash,
2062 key,
2063 move |fast_value| {
2064 if fast_value == PLACEHOLDER_VAL {
2065 trace!("Key hash {} is write lockable, will write lock", hash);
2067 Some(fast_value - 1)
2068 } else {
2069 trace!("Key hash {} is write locked, unchanged", hash);
2070 None
2071 }
2072 },
2073 &guard,
2074 );
2075 match swap_res {
2076 SwapResult::Succeed(_, idx, chunk) => {
2077 let chunk_ref = unsafe { chunk.deref() };
2078 let (_, v) = chunk_ref.attachment.get(idx);
2079 value = v;
2080 break;
2081 }
2082 SwapResult::Failed | SwapResult::Aborted => {
2083 trace!("Lock on key hash {} failed, retry", hash);
2084 backoff.spin();
2085 continue;
2086 }
2087 SwapResult::NotFound => {
2088 debug!("Cannot found hash key {} to lock", hash);
2089 return None;
2090 }
2091 }
2092 }
2093 Some(Self {
2094 table,
2095 key: key.clone(),
2096 value,
2097 hash,
2098 _mark: Default::default(),
2099 })
2100 }
2101
2102 pub fn remove(self) -> V {
2103 let res = self.table.remove(&self.key, self.hash).unwrap().1;
2104 mem::forget(self);
2105 res
2106 }
2107}
2108
2109impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Deref
2110 for HashMapWriteGuard<'a, K, V, ALLOC, H>
2111{
2112 type Target = V;
2113
2114 fn deref(&self) -> &Self::Target {
2115 &self.value
2116 }
2117}
2118
2119impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> DerefMut
2120 for HashMapWriteGuard<'a, K, V, ALLOC, H>
2121{
2122 fn deref_mut(&mut self) -> &mut Self::Target {
2123 &mut self.value
2124 }
2125}
2126
2127impl<'a, K: Clone + Eq + Hash, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop
2128 for HashMapWriteGuard<'a, K, V, ALLOC, H>
2129{
2130 fn drop(&mut self) {
2131 trace!("Release read lock for hash key {}", self.hash);
2132 let hash = hash_key::<K, H>(&self.key);
2133 self.table.insert(
2134 InsertOp::Insert,
2135 &self.key,
2136 Some(self.value.clone()),
2137 hash,
2138 PLACEHOLDER_VAL,
2139 );
2140 }
2141}
2142
2143pub struct ObjectMapReadGuard<
2144 'a,
2145 V: Clone,
2146 ALLOC: GlobalAlloc + Default = System,
2147 H: Hasher + Default = DefaultHasher,
2148> {
2149 table: &'a ObjectTable<V, ALLOC, H>,
2150 key: usize,
2151 value: V,
2152 _mark: PhantomData<H>,
2153}
2154
2155impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default>
2156 ObjectMapReadGuard<'a, V, ALLOC, H>
2157{
2158 fn new(table: &'a ObjectTable<V, ALLOC, H>, key: usize) -> Option<Self> {
2159 let backoff = crossbeam_utils::Backoff::new();
2160 let guard = crossbeam_epoch::pin();
2161 let hash = hash_key::<usize, H>(&key);
2162 let value: V;
2163 loop {
2164 let swap_res = table.swap(
2165 key,
2166 &(),
2167 move |fast_value| {
2168 if fast_value != PLACEHOLDER_VAL - 1 {
2169 trace!("Key {} is not write locked, will read lock", hash);
2171 Some(fast_value + 1)
2172 } else {
2173 trace!("Key {} is write locked, unchanged", hash);
2174 None
2175 }
2176 },
2177 &guard,
2178 );
2179 match swap_res {
2180 SwapResult::Succeed(_, idx, chunk) => {
2181 let chunk_ref = unsafe { chunk.deref() };
2182 let (_, v) = chunk_ref.attachment.get(idx);
2183 value = v;
2184 break;
2185 }
2186 SwapResult::Failed | SwapResult::Aborted => {
2187 trace!("Lock on key {} failed, retry", hash);
2188 backoff.spin();
2189 continue;
2190 }
2191 SwapResult::NotFound => {
2192 debug!("Cannot found hash key {} to lock", hash);
2193 return None;
2194 }
2195 }
2196 }
2197 Some(Self {
2198 table,
2199 key,
2200 value,
2201 _mark: Default::default(),
2202 })
2203 }
2204}
2205
2206impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Deref
2207 for ObjectMapReadGuard<'a, V, ALLOC, H>
2208{
2209 type Target = V;
2210
2211 fn deref(&self) -> &Self::Target {
2212 &self.value
2213 }
2214}
2215
2216impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop
2217 for ObjectMapReadGuard<'a, V, ALLOC, H>
2218{
2219 fn drop(&mut self) {
2220 trace!("Release read lock for hash key {}", self.key);
2221 let guard = crossbeam_epoch::pin();
2222 self.table.swap(
2223 self.key,
2224 &(),
2225 |fast_value| {
2226 debug_assert!(fast_value > PLACEHOLDER_VAL);
2227 Some(fast_value - 1)
2228 },
2229 &guard,
2230 );
2231 }
2232}
2233
2234pub struct ObjectMapWriteGuard<
2235 'a,
2236 V: Clone,
2237 ALLOC: GlobalAlloc + Default = System,
2238 H: Hasher + Default = DefaultHasher,
2239> {
2240 table: &'a ObjectTable<V, ALLOC, H>,
2241 key: usize,
2242 value: V,
2243 _mark: PhantomData<H>,
2244}
2245
2246impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default>
2247 ObjectMapWriteGuard<'a, V, ALLOC, H>
2248{
2249 fn new(table: &'a ObjectTable<V, ALLOC, H>, key: usize) -> Option<Self> {
2250 let backoff = crossbeam_utils::Backoff::new();
2251 let guard = crossbeam_epoch::pin();
2252 let value: V;
2253 let key = key + NUM_FIX;
2254 loop {
2255 let swap_res = table.swap(
2256 key,
2257 &(),
2258 move |fast_value| {
2259 if fast_value == PLACEHOLDER_VAL {
2260 trace!("Key {} is write lockable, will write lock", key);
2262 Some(fast_value - 1)
2263 } else {
2264 trace!("Key {} is write locked, unchanged", key);
2265 None
2266 }
2267 },
2268 &guard,
2269 );
2270 match swap_res {
2271 SwapResult::Succeed(_, idx, chunk) => {
2272 let chunk_ref = unsafe { chunk.deref() };
2273 let (_, v) = chunk_ref.attachment.get(idx);
2274 value = v;
2275 break;
2276 }
2277 SwapResult::Failed | SwapResult::Aborted => {
2278 trace!("Lock on key {} failed, retry", key);
2279 backoff.spin();
2280 continue;
2281 }
2282 SwapResult::NotFound => {
2283 debug!("Cannot found key {} to lock", key);
2284 return None;
2285 }
2286 }
2287 }
2288 Some(Self {
2289 table,
2290 key,
2291 value,
2292 _mark: Default::default(),
2293 })
2294 }
2295
2296 pub fn remove(self) -> V {
2297 let res = self.table.remove(&(), self.key).unwrap().1;
2298 mem::forget(self);
2299 res
2300 }
2301}
2302
2303impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Deref
2304 for ObjectMapWriteGuard<'a, V, ALLOC, H>
2305{
2306 type Target = V;
2307
2308 fn deref(&self) -> &Self::Target {
2309 &self.value
2310 }
2311}
2312
2313impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> DerefMut
2314 for ObjectMapWriteGuard<'a, V, ALLOC, H>
2315{
2316 fn deref_mut(&mut self) -> &mut Self::Target {
2317 &mut self.value
2318 }
2319}
2320
2321impl<'a, V: Clone, ALLOC: GlobalAlloc + Default, H: Hasher + Default> Drop
2322 for ObjectMapWriteGuard<'a, V, ALLOC, H>
2323{
2324 fn drop(&mut self) {
2325 trace!("Release read lock for key {}", self.key);
2326 self.table.insert(
2327 InsertOp::Insert,
2328 &(),
2329 Some(self.value.clone()),
2330 self.key,
2331 PLACEHOLDER_VAL,
2332 );
2333 }
2334}
2335
2336pub struct HashSet<
2337 T: Clone + Hash + Eq,
2338 ALLOC: GlobalAlloc + Default = System,
2339 H: Hasher + Default = DefaultHasher,
2340> {
2341 table: HashTable<T, (), ALLOC>,
2342 shadow: PhantomData<H>,
2343}
2344
2345impl<T: Clone + Hash + Eq, ALLOC: GlobalAlloc + Default, H: Hasher + Default> HashSet<T, ALLOC, H> {
2346 pub fn with_capacity(cap: usize) -> Self {
2347 Self {
2348 table: Table::with_capacity(cap),
2349 shadow: PhantomData,
2350 }
2351 }
2352
2353 pub fn contains(&self, item: &T) -> bool {
2354 let hash = hash_key::<T, H>(item);
2355 self.table.get(item, hash, false).is_some()
2356 }
2357
2358 pub fn insert(&self, item: &T) -> bool {
2359 let hash = hash_key::<T, H>(item);
2360 self.table
2361 .insert(InsertOp::TryInsert, item, None, hash, !0)
2362 .is_none()
2363 }
2364
2365 pub fn remove(&self, item: &T) -> bool {
2366 let hash = hash_key::<T, H>(item);
2367 self.table.remove(item, hash).is_some()
2368 }
2369
2370 pub fn items(&self) -> std::collections::HashSet<T> {
2371 self.table
2372 .entries()
2373 .into_iter()
2374 .map(|(_, _, item, _)| item)
2375 .collect()
2376 }
2377
2378 #[inline(always)]
2379 pub fn len(&self) -> usize {
2380 self.table.len()
2381 }
2382}
2383
2384#[inline(always)]
2385fn alloc_mem<A: GlobalAlloc + Default>(size: usize) -> usize {
2386 let align = 64;
2387 let layout = Layout::from_size_align(size, align).unwrap();
2388 let alloc = A::default();
2389 unsafe {
2391 let addr = alloc.alloc(layout) as usize;
2392 ptr::write_bytes(addr as *mut u8, 0, size);
2393 debug_assert_eq!(addr % 64, 0);
2394 addr
2395 }
2396}
2397
2398#[inline(always)]
2399fn dealloc_mem<A: GlobalAlloc + Default + Default>(ptr: usize, size: usize) {
2400 let align = 64;
2401 let layout = Layout::from_size_align(size, align).unwrap();
2402 let alloc = A::default();
2403 unsafe { alloc.dealloc(ptr as *mut u8, layout) }
2404}
2405
2406pub struct PassthroughHasher {
2407 num: u64,
2408}
2409
2410impl Hasher for PassthroughHasher {
2411 fn finish(&self) -> u64 {
2412 self.num
2413 }
2414
2415 fn write(&mut self, _bytes: &[u8]) {
2416 unimplemented!()
2417 }
2418
2419 fn write_usize(&mut self, i: usize) {
2420 self.num = i as u64
2421 }
2422}
2423
2424impl Default for PassthroughHasher {
2425 fn default() -> Self {
2426 Self { num: 0 }
2427 }
2428}
2429
2430fn timestamp() -> u64 {
2431 let start = SystemTime::now();
2432 let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap();
2433 since_the_epoch.as_millis() as u64
2434}
2435
2436#[cfg(test)]
2437mod tests {
2438 use crate::map::*;
2439 use alloc::sync::Arc;
2440 use chashmap::CHashMap;
2441 use rayon::prelude::*;
2442 use std::collections::HashMap;
2443 use std::thread;
2444 use std::{
2445 alloc::System,
2446 sync::{Mutex, RwLock},
2447 };
2448 use test::Bencher;
2449
2450 #[test]
2451 fn will_not_overflow() {
2452 let _ = env_logger::try_init();
2453 let table = WordMap::<System>::with_capacity(16);
2454 for i in 50..60 {
2455 assert_eq!(table.insert(&i, i), None);
2456 }
2457 for i in 50..60 {
2458 assert_eq!(table.get(&i), Some(i));
2459 }
2460 for i in 50..60 {
2461 assert_eq!(table.remove(&i), Some(i));
2462 }
2463 }
2464
2465 #[test]
2466 fn resize() {
2467 let _ = env_logger::try_init();
2468 let map = WordMap::<System>::with_capacity(16);
2469 for i in 5..2048 {
2470 map.insert(&i, i * 2);
2471 }
2472 for i in 5..2048 {
2473 match map.get(&i) {
2474 Some(r) => assert_eq!(r, i * 2),
2475 None => panic!("{}", i),
2476 }
2477 }
2478 }
2479
2480 #[test]
2481 fn parallel_no_resize() {
2482 let _ = env_logger::try_init();
2483 let map = Arc::new(WordMap::<System>::with_capacity(65536));
2484 let mut threads = vec![];
2485 for i in 5..99 {
2486 map.insert(&i, i * 10);
2487 }
2488 for i in 100..900 {
2489 let map = map.clone();
2490 threads.push(thread::spawn(move || {
2491 for j in 5..60 {
2492 map.insert(&(i * 100 + j), i * j);
2493 }
2494 }));
2495 }
2496 for i in 5..9 {
2497 for j in 1..10 {
2498 map.remove(&(i * j));
2499 }
2500 }
2501 for thread in threads {
2502 let _ = thread.join();
2503 }
2504 for i in 100..900 {
2505 for j in 5..60 {
2506 assert_eq!(map.get(&(i * 100 + j)), Some(i * j))
2507 }
2508 }
2509 for i in 5..9 {
2510 for j in 1..10 {
2511 assert!(map.get(&(i * j)).is_none())
2512 }
2513 }
2514 }
2515
2516 #[test]
2517 fn parallel_with_resize() {
2518 let _ = env_logger::try_init();
2519 let num_threads = num_cpus::get();
2520 let test_load = 4096;
2521 let repeat_load = 16;
2522 let map = Arc::new(WordMap::<System>::with_capacity(32));
2523 let mut threads = vec![];
2524 for i in 0..num_threads {
2525 let map = map.clone();
2526 threads.push(thread::spawn(move || {
2527 for j in 5..test_load {
2528 let key = i * 10000000 + j;
2529 let value_prefix = i * j * 100;
2530 for k in 1..repeat_load {
2531 let value = value_prefix + k;
2532 if k != 1 {
2533 assert_eq!(map.get(&key), Some(value - 1));
2534 }
2535 let pre_insert_epoch = map.table.now_epoch();
2536 map.insert(&key, value);
2537 let post_insert_epoch = map.table.now_epoch();
2538 for l in 1..128 {
2539 let pre_fail_get_epoch = map.table.now_epoch();
2540 let left = map.get(&key);
2541 let post_fail_get_epoch = map.table.now_epoch();
2542 let right = Some(value);
2543 if left != right {
2544 for m in 1..1024 {
2545 let left = map.get(&key);
2546 let right = Some(value);
2547 if left == right {
2548 panic!(
2549 "Recovered at turn {} for {}, copying {}, epoch {} to {}, now {}, PIE: {} to {}. Migration problem!!!",
2550 m,
2551 key,
2552 map.table.map_is_copying(),
2553 pre_fail_get_epoch,
2554 post_fail_get_epoch,
2555 map.table.now_epoch(),
2556 pre_insert_epoch,
2557 post_insert_epoch
2558 );
2559 }
2560 }
2561 panic!("Unable to recover for {}, round {}, copying {}", key, l , map.table.map_is_copying());
2562 }
2563 }
2564 if j % 7 == 0 {
2565 assert_eq!(
2566 map.remove(&key),
2567 Some(value),
2568 "Remove result, get {:?}, copying {}, round {}",
2569 map.get(&key),
2570 map.table.map_is_copying(),
2571 k
2572 );
2573 assert_eq!(map.get(&key), None, "Remove recursion");
2574 assert!(map.lock(key).is_none(), "Remove recursion with lock");
2575 map.insert(&key, value);
2576 }
2577 if j % 3 == 0 {
2578 let new_value = value + 7;
2579 let pre_insert_epoch = map.table.now_epoch();
2580 map.insert(&key, new_value);
2581 let post_insert_epoch = map.table.now_epoch();
2582 assert_eq!(
2583 map.get(&key),
2584 Some(new_value),
2585 "Checking immediate update, key {}, epoch {} to {}",
2586 key, pre_insert_epoch, post_insert_epoch
2587 );
2588 map.insert(&key, value);
2589 }
2590 }
2591 }
2592 }));
2593 }
2594 info!("Waiting for intensive insertion to finish");
2595 for thread in threads {
2596 let _ = thread.join();
2597 }
2598 info!("Checking final value");
2599 (0..num_threads)
2600 .collect::<Vec<_>>()
2601 .par_iter()
2602 .for_each(|i| {
2603 for j in 5..test_load {
2604 let k = i * 10000000 + j;
2605 let value = i * j * 100 + repeat_load - 1;
2606 let get_res = map.get(&k);
2607 assert_eq!(
2608 get_res,
2609 Some(value),
2610 "New k {}, i {}, j {}, epoch {}",
2611 k,
2612 i,
2613 j,
2614 map.table.now_epoch()
2615 );
2616 }
2617 });
2618 }
2619
2620 #[test]
2621 fn parallel_hybrid() {
2622 let _ = env_logger::try_init();
2623 let map = Arc::new(WordMap::<System>::with_capacity(4));
2624 for i in 5..128 {
2625 map.insert(&i, i * 10);
2626 }
2627 let mut threads = vec![];
2628 for i in 256..265 {
2629 let map = map.clone();
2630 threads.push(thread::spawn(move || {
2631 for j in 5..60 {
2632 map.insert(&(i * 10 + j), 10);
2633 }
2634 }));
2635 }
2636 for i in 5..8 {
2637 let map = map.clone();
2638 threads.push(thread::spawn(move || {
2639 for j in 5..8 {
2640 map.remove(&(i * j));
2641 }
2642 }));
2643 }
2644 for thread in threads {
2645 let _ = thread.join();
2646 }
2647 for i in 256..265 {
2648 for j in 5..60 {
2649 assert_eq!(map.get(&(i * 10 + j)), Some(10))
2650 }
2651 }
2652 }
2653
2654 #[test]
2655 fn parallel_word_map_mutex() {
2656 let _ = env_logger::try_init();
2657 let map = Arc::new(WordMap::<System>::with_capacity(4));
2658 map.insert(&1, 0);
2659 let mut threads = vec![];
2660 let num_threads = 256;
2661 for _ in 0..num_threads {
2662 let map = map.clone();
2663 threads.push(thread::spawn(move || {
2664 let mut guard = map.lock(1).unwrap();
2665 *guard += 1;
2666 }));
2667 }
2668 for thread in threads {
2669 let _ = thread.join();
2670 }
2671 assert_eq!(map.get(&1).unwrap(), num_threads);
2672 }
2673
2674 #[test]
2675 fn parallel_word_map_multi_mutex() {
2676 let _ = env_logger::try_init();
2677 let map = Arc::new(WordMap::<System>::with_capacity(4));
2678 let mut threads = vec![];
2679 let num_threads = num_cpus::get();
2680 let test_load = 4096;
2681 let update_load = 128;
2682 for thread_id in 0..num_threads {
2683 let map = map.clone();
2684 threads.push(thread::spawn(move || {
2685 let target = thread_id;
2686 for i in 0..test_load {
2687 let key = target * 1000000 + i;
2688 {
2689 let mut mutex = map.try_insert_locked(key).unwrap();
2690 *mutex = 1;
2691 }
2692 for j in 1..update_load {
2693 assert!(
2694 map.get(&key).is_some(),
2695 "Pre getting value for mutex, key {}, epoch {}",
2696 key,
2697 map.table.now_epoch()
2698 );
2699 let val = {
2700 let mut mutex = map.lock(key).expect(&format!(
2701 "Locking key {}, copying {}",
2702 key,
2703 map.table.now_epoch()
2704 ));
2705 assert_eq!(*mutex, j);
2706 *mutex += 1;
2707 *mutex
2708 };
2709 assert!(
2710 map.get(&key).is_some(),
2711 "Post getting value for mutex, key {}, epoch {}",
2712 key,
2713 map.table.now_epoch()
2714 );
2715 if j % 7 == 0 {
2716 {
2717 let mutex = map.lock(key).expect(&format!(
2718 "Remove locking key {}, copying {}",
2719 key,
2720 map.table.now_epoch()
2721 ));
2722 mutex.remove();
2723 }
2724 assert!(map.lock(key).is_none());
2725 *map.try_insert_locked(key).unwrap() = val;
2726 }
2727 }
2728 assert_eq!(*map.lock(key).unwrap(), update_load);
2729 }
2730 }));
2731 }
2732 for thread in threads {
2733 let _ = thread.join();
2734 }
2735 }
2736
2737 #[test]
2738 fn parallel_obj_map_rwlock() {
2739 let _ = env_logger::try_init();
2740 let map_cont = ObjectMap::<Obj, System, DefaultHasher>::with_capacity(4);
2741 let map = Arc::new(map_cont);
2742 map.insert(&1, Obj::new(0));
2743 let mut threads = vec![];
2744 let num_threads = 256;
2745 for i in 0..num_threads {
2746 let map = map.clone();
2747 threads.push(thread::spawn(move || {
2748 let mut guard = map.write(1).unwrap();
2749 let val = guard.get();
2750 guard.set(val + 1);
2751 trace!("Dealt with {}", i);
2752 }));
2753 }
2754 for thread in threads {
2755 let _ = thread.join();
2756 }
2757 map.get(&1).unwrap().validate(num_threads);
2758 }
2759
2760 #[test]
2761 fn parallel_hash_map_rwlock() {
2762 let _ = env_logger::try_init();
2763 let map_cont = super::HashMap::<u32, Obj, System, DefaultHasher>::with_capacity(4);
2764 let map = Arc::new(map_cont);
2765 map.insert(&1, Obj::new(0));
2766 let mut threads = vec![];
2767 let num_threads = 256;
2768 for i in 0..num_threads {
2769 let map = map.clone();
2770 threads.push(thread::spawn(move || {
2771 let mut guard = map.write(&1u32).unwrap();
2772 let val = guard.get();
2773 guard.set(val + 1);
2774 trace!("Dealt with {}", i);
2775 }));
2776 }
2777 for thread in threads {
2778 let _ = thread.join();
2779 }
2780 map.get(&1).unwrap().validate(num_threads);
2781 }
2782
2783 #[derive(Copy, Clone)]
2784 struct Obj {
2785 a: usize,
2786 b: usize,
2787 c: usize,
2788 d: usize,
2789 }
2790 impl Obj {
2791 fn new(num: usize) -> Self {
2792 Obj {
2793 a: num,
2794 b: num + 1,
2795 c: num + 2,
2796 d: num + 3,
2797 }
2798 }
2799 fn validate(&self, num: usize) {
2800 assert_eq!(self.a, num);
2801 assert_eq!(self.b, num + 1);
2802 assert_eq!(self.c, num + 2);
2803 assert_eq!(self.d, num + 3);
2804 }
2805 fn get(&self) -> usize {
2806 self.a
2807 }
2808 fn set(&mut self, num: usize) {
2809 *self = Self::new(num)
2810 }
2811 }
2812
2813 #[test]
2814 fn obj_map() {
2815 let _ = env_logger::try_init();
2816 let map = ObjectMap::<Obj>::with_capacity(16);
2817 for i in 5..2048 {
2818 map.insert(&i, Obj::new(i));
2819 }
2820 for i in 5..2048 {
2821 match map.get(&i) {
2822 Some(r) => r.validate(i),
2823 None => panic!("{}", i),
2824 }
2825 }
2826 }
2827
2828 #[test]
2829 fn parallel_obj_hybrid() {
2830 let _ = env_logger::try_init();
2831 let map = Arc::new(ObjectMap::<Obj>::with_capacity(4));
2832 for i in 5..128 {
2833 map.insert(&i, Obj::new(i * 10));
2834 }
2835 let mut threads = vec![];
2836 for i in 256..265 {
2837 let map = map.clone();
2838 threads.push(thread::spawn(move || {
2839 for j in 5..60 {
2840 map.insert(&(i * 10 + j), Obj::new(10));
2841 }
2842 }));
2843 }
2844 for i in 5..8 {
2845 let map = map.clone();
2846 threads.push(thread::spawn(move || {
2847 for j in 5..8 {
2848 map.remove(&(i * j));
2849 }
2850 }));
2851 }
2852 for thread in threads {
2853 let _ = thread.join();
2854 }
2855 for i in 256..265 {
2856 for j in 5..60 {
2857 match map.get(&(i * 10 + j)) {
2858 Some(r) => r.validate(10),
2859 None => panic!("{}", i),
2860 }
2861 }
2862 }
2863 }
2864
2865 #[test]
2866 fn parallel_hashmap_hybrid() {
2867 let _ = env_logger::try_init();
2868 let map = Arc::new(super::HashMap::<u32, Obj>::with_capacity(4));
2869 for i in 5..128u32 {
2870 map.insert(&i, Obj::new((i * 10) as usize));
2871 }
2872 let mut threads = vec![];
2873 for i in 256..265u32 {
2874 let map = map.clone();
2875 threads.push(thread::spawn(move || {
2876 for j in 5..60u32 {
2877 map.insert(&(i * 10 + j), Obj::new(10usize));
2878 }
2879 }));
2880 }
2881 for i in 5..8 {
2882 let map = map.clone();
2883 threads.push(thread::spawn(move || {
2884 for j in 5..8 {
2885 map.remove(&(i * j));
2886 }
2887 }));
2888 }
2889 for thread in threads {
2890 let _ = thread.join();
2891 }
2892 for i in 256..265 {
2893 for j in 5..60 {
2894 match map.get(&(i * 10 + j)) {
2895 Some(r) => r.validate(10),
2896 None => panic!("{}", i),
2897 }
2898 }
2899 }
2900 }
2901
2902 use std::thread::JoinHandle;
2903 #[test]
2904 fn atomic_ordering() {
2905 let test_load = 102400;
2906 let epoch = Arc::new(AtomicUsize::new(0));
2907 let old_ptr = Arc::new(AtomicUsize::new(0));
2908 let new_ptr = Arc::new(AtomicUsize::new(0));
2909 let write = || -> JoinHandle<()> {
2910 let epoch = epoch.clone();
2911 let old_ptr = old_ptr.clone();
2912 let new_ptr = new_ptr.clone();
2913 thread::spawn(move || {
2914 for _ in 0..test_load {
2915 let old = old_ptr.load(Acquire);
2916 if new_ptr.compare_and_swap(0, old, AcqRel) != 0 {
2917 return;
2918 }
2919 dfence();
2920 if old_ptr.load(Acquire) != old {
2921 new_ptr.store(0, Release);
2922 dfence();
2923 return;
2924 }
2925 let new = old + 1;
2926 new_ptr.store(new, Release);
2927 dfence();
2928 assert_eq!(epoch.fetch_add(1, AcqRel) % 2, 0);
2929 for _ in 0..1000 {
2931 std::sync::atomic::spin_loop_hint();
2932 }
2933 old_ptr.store(new, Release);
2934 dfence();
2935 assert_eq!(epoch.fetch_add(1, AcqRel) % 2, 1);
2936 dfence();
2937 new_ptr.store(0, Release);
2938 }
2939 })
2940 };
2941 let read = || -> JoinHandle<()> {
2942 let epoch = epoch.clone();
2943 let old_ptr = old_ptr.clone();
2944 let new_ptr = new_ptr.clone();
2945 thread::spawn(move || {
2946 for _ in 0..test_load {
2947 let epoch_val = epoch.load(Acquire);
2948 let old = old_ptr.load(Acquire);
2949 let new = new_ptr.load(Acquire);
2950 let changing = epoch_val % 2 == 1;
2951 for _ in 0..500 {
2952 std::sync::atomic::spin_loop_hint();
2953 }
2954 if changing && epoch.load(Acquire) == epoch_val {
2955 assert_ne!(old, new);
2956 assert_ne!(new, 0);
2957 }
2958 }
2959 })
2960 };
2961 let num_writers = 5;
2962 let mut writers = Vec::with_capacity(num_writers);
2963 for _ in 0..num_writers {
2964 writers.push(write());
2965 }
2966 let num_readers = num_cpus::get();
2967 let mut readers = Vec::with_capacity(num_readers);
2968 for _ in 0..num_readers {
2969 readers.push(read());
2970 }
2971 for reader in readers {
2972 reader.join().unwrap();
2973 }
2974 for writer in writers {
2975 writer.join().unwrap();
2976 }
2977 }
2978
2979 #[bench]
2980 fn lfmap(b: &mut Bencher) {
2981 let _ = env_logger::try_init();
2982 let map = WordMap::<System, DefaultHasher>::with_capacity(8);
2983 let mut i = 5;
2984 b.iter(|| {
2985 map.insert(&i, i);
2986 i += 1;
2987 });
2988 }
2989
2990 #[bench]
2991 fn hashmap(b: &mut Bencher) {
2992 let _ = env_logger::try_init();
2993 let mut map = HashMap::new();
2994 let mut i = 5;
2995 b.iter(|| {
2996 map.insert(i, i);
2997 i += 1;
2998 });
2999 }
3000
3001 #[bench]
3002 fn mutex_hashmap(b: &mut Bencher) {
3003 let _ = env_logger::try_init();
3004 let map = Mutex::new(HashMap::new());
3005 let mut i = 5;
3006 b.iter(|| {
3007 map.lock().unwrap().insert(i, i);
3008 i += 1;
3009 });
3010 }
3011
3012 #[bench]
3013 fn rwlock_hashmap(b: &mut Bencher) {
3014 let _ = env_logger::try_init();
3015 let map = RwLock::new(HashMap::new());
3016 let mut i = 5;
3017 b.iter(|| {
3018 map.write().unwrap().insert(i, i);
3019 i += 1;
3020 });
3021 }
3022
3023 #[bench]
3024 fn chashmap(b: &mut Bencher) {
3025 let _ = env_logger::try_init();
3026 let map = CHashMap::new();
3027 let mut i = 5;
3028 b.iter(|| {
3029 map.insert(i, i);
3030 i += 1;
3031 });
3032 }
3033
3034 #[bench]
3035 fn default_hasher(b: &mut Bencher) {
3036 let _ = env_logger::try_init();
3037 b.iter(|| {
3038 let mut hasher = DefaultHasher::default();
3039 hasher.write_u64(123);
3040 hasher.finish();
3041 });
3042 }
3043}