1use crate::align_padding;
3use core::alloc::Layout;
4use core::borrow::BorrowMut;
5use core::cmp::min;
6use core::ops::Deref;
7use core::ptr;
8use core::ptr::null_mut;
9use core::sync::atomic::Ordering::{AcqRel, Acquire, Release};
10use core::sync::atomic::{AtomicPtr, AtomicUsize};
11use core::{intrinsics, mem};
12use crossbeam_utils::Backoff;
13#[cfg(feature = "exchange_backoff")]
14use exchange::*;
15use std::alloc::{GlobalAlloc, System};
16
17const CACHE_LINE_SIZE: usize = 64;
18const EMPTY_SLOT: usize = 0;
19const SENTINEL_SLOT: usize = 1;
20
21struct BufferMeta<T: Default, A: GlobalAlloc + Default> {
22 head: AtomicUsize,
23 next: AtomicPtr<BufferMeta<T, A>>,
24 refs: AtomicUsize,
25 lower_bound: usize,
26 tuple_size: usize,
27 total_size: usize,
28}
29
30pub struct List<T: Default + Copy, A: GlobalAlloc + Default> {
31 head: AtomicPtr<BufferMeta<T, A>>,
32 count: AtomicUsize,
33 buffer_cap: usize,
34 #[cfg(feature = "exchange_backoff")]
35 exchange: ExchangeArray<T, A>,
36}
37
38pub struct ListIterator<T: Default + Copy, A: GlobalAlloc + Default> {
39 buffer: BufferRef<T, A>,
40 current: usize,
41}
42
43impl<T: Default + Copy, A: GlobalAlloc + Default> List<T, A> {
44 pub fn new(buffer_cap: usize) -> Self {
45 let first_buffer = BufferMeta::new(buffer_cap);
46 Self {
47 head: AtomicPtr::new(first_buffer),
48 count: AtomicUsize::new(0),
49 #[cfg(feature = "exchange_backoff")]
50 exchange: ExchangeArray::new(),
51 buffer_cap,
52 }
53 }
54
55 pub fn push(&self, flag: usize, data: T) {
56 self.do_push(flag, data);
57 self.count.fetch_add(1, AcqRel);
58 }
59
60 fn do_push(&self, flag: usize, data: T) {
61 debug_assert_ne!(flag, EMPTY_SLOT);
62 debug_assert_ne!(flag, SENTINEL_SLOT);
63 loop {
64 let obj_size = mem::size_of::<T>();
65 let head_ptr = self.head.load(Acquire);
66 let page = BufferMeta::borrow(head_ptr);
67 let slot_pos = page.head.load(Acquire);
68 let next_pos = slot_pos + 1;
69 if next_pos > self.buffer_cap {
70 let new_head = BufferMeta::new(self.buffer_cap);
72 unsafe {
73 (*new_head).next.store(head_ptr, Release);
74 debug_assert_eq!((*new_head).total_size, page.total_size);
75 }
76 if self.head.compare_and_swap(head_ptr, new_head, AcqRel) != head_ptr {
77 BufferMeta::unref(new_head);
78 }
79 } else {
81 if page.head.compare_and_swap(slot_pos, next_pos, AcqRel) == slot_pos {
88 let slot_ptr = page.flag_ptr_of(slot_pos);
89 unsafe {
90 if obj_size != 0 {
91 let obj_ptr = page.object_ptr_of(slot_ptr);
92 ptr::write(obj_ptr, data);
93 }
94 let slot_flag =
95 intrinsics::atomic_cxchg_relaxed(slot_ptr, EMPTY_SLOT, flag).0;
96 assert_eq!(
97 slot_flag, EMPTY_SLOT,
98 "Cannot swap flag for push. Flag is {} expect empty",
99 slot_flag
100 );
101 }
102 return;
103 }
104 }
105 #[cfg(feature = "exchange_backoff")]
106 match self.exchange.exchange(Some((flag, data))) {
107 Ok(Some(tuple)) | Err(Some(tuple)) => {
108 flag = tuple.0;
110 data = tuple.1;
111 }
112 Ok(None) | Err(None) => {
113 return;
115 }
116 }
117 }
118 }
119
120 pub fn exclusive_push(&self, flag: usize, data: T) {
121 let obj_size = mem::size_of::<T>();
123 loop {
124 let head_ptr = self.head.load(Acquire);
125 let page = BufferMeta::borrow(head_ptr);
126 let slot_pos = page.head.load(Acquire);
127 let next_pos = slot_pos + 1;
128 if next_pos > self.buffer_cap {
129 let new_head = BufferMeta::new(self.buffer_cap);
131 unsafe {
132 (*new_head).next.store(head_ptr, Release);
133 }
134 if self.head.compare_and_swap(head_ptr, new_head, Release) != head_ptr {
135 BufferMeta::unref(new_head);
136 }
137 } else {
139 page.head.store(next_pos, Release);
140 let slot_ptr = page.flag_ptr_of(slot_pos);
141 unsafe {
142 if obj_size != 0 {
143 let obj_ptr = page.object_ptr_of(slot_ptr);
144 ptr::write(obj_ptr, data);
145 }
146 intrinsics::atomic_store_relaxed(slot_ptr, flag);
147 }
148 self.count.fetch_add(1, AcqRel);
149 return;
150 }
151 }
152 }
153
154 pub fn pop(&self) -> Option<(usize, T)> {
155 if self.count.load(Acquire) == 0 {
156 return None;
157 }
158 let backoff = Backoff::new();
159 loop {
160 let head_ptr = self.head.load(Acquire);
161 let page = BufferMeta::borrow(head_ptr);
162 let slot = page.head.load(Acquire);
163 let obj_size = mem::size_of::<T>();
164 let next_buffer_ptr = page.next.load(Acquire);
165 if slot == 0 && next_buffer_ptr == null_mut() {
166 return None;
168 }
169 if slot == 0 && next_buffer_ptr != null_mut() {
170 if self
173 .head
174 .compare_and_swap(head_ptr, next_buffer_ptr, AcqRel)
175 == head_ptr
176 {
177 drop(page);
183 let dropped_next = BufferMeta::drop_out(
184 head_ptr,
185 &mut Some(|(flag, data)| {
186 if flag != EMPTY_SLOT && flag != SENTINEL_SLOT {
187 self.do_push(flag, data); }
189 }),
190 &mut 0,
191 );
192 debug_assert_eq!(dropped_next.unwrap_or(null_mut()), next_buffer_ptr);
193 } else {
195 backoff.spin();
196 }
197 continue;
198 }
199 let mut res = None;
200 if slot > 0 {
201 unsafe {
202 let new_slot = slot - 1;
203 let new_slot_ptr = page.flag_ptr_of(new_slot);
204 let new_slot_flag = intrinsics::atomic_load_relaxed(new_slot_ptr);
205 if new_slot_flag != 0
206 && intrinsics::atomic_cxchg_relaxed(new_slot_ptr, new_slot_flag, EMPTY_SLOT).1
208 {
209 res = Some((new_slot_flag, T::default()));
210 if obj_size != 0 && new_slot_flag != SENTINEL_SLOT {
211 res.as_mut().map(|(_, obj)| {
212 let obj_ptr = page.object_ptr_of(new_slot_ptr) as *mut T;
213 *obj = ptr::read(obj_ptr as *mut T)
214 });
215 }
216 let swapped = page.head.compare_and_swap(slot, new_slot, AcqRel);
217 debug_assert!(
218 swapped >= slot,
219 "Exclusive pop failed, {} expect {}",
220 swapped,
221 slot
222 );
223 if swapped != slot {
224 intrinsics::atomic_store(new_slot_ptr, SENTINEL_SLOT);
230 }
231 if new_slot_flag != SENTINEL_SLOT {
232 self.count.fetch_sub(1, AcqRel);
233 return res;
234 }
235 }
236 }
237 } else {
238 return res;
239 }
240 #[cfg(feature = "exchange_backoff")]
241 match self.exchange.exchange(None) {
242 Ok(Some(tuple)) | Err(Some(tuple)) => {
243 self.count.fetch_sub(1, AcqRel);
245 return Some(tuple);
246 }
247 Ok(None) | Err(None) => {
248 }
250 }
251 }
252 }
253 pub fn drop_out_all<F>(&self, mut retain: Option<F>)
254 where
255 F: FnMut((usize, T)),
256 {
257 let count = self.count.load(Acquire);
258 if count == 0 {
259 return;
260 }
261 let retain = retain.borrow_mut();
262 let pop_threshold = min(self.buffer_cap >> 1, 64);
263 if count < pop_threshold {
264 let pop_amount = pop_threshold << 1; for _ in 0..pop_amount {
266 if let Some(pair) = self.pop() {
267 if let Some(retain) = retain {
268 retain(pair);
269 }
270 } else {
271 return;
274 }
275 }
276 }
277 let new_head_buffer = BufferMeta::new(self.buffer_cap);
278 let mut buffer_ptr = self.head.swap(new_head_buffer, AcqRel);
279 let null = null_mut();
280 let mut counter = 0;
281 while buffer_ptr != null {
282 buffer_ptr = BufferMeta::drop_out(buffer_ptr, retain, &mut counter).unwrap_or(null);
283 }
284 self.count.fetch_sub(counter, AcqRel);
285 }
286
287 pub fn prepend_with(&self, other: &Self) {
288 if other.count.load(Acquire) == 0 {
289 return;
290 }
291 let other_head = other.head.swap(BufferMeta::new(self.buffer_cap), AcqRel);
292 let other_count = other.count.swap(0, AcqRel);
293 let mut other_tail = BufferMeta::borrow(other_head);
294 loop {
296 while other_tail.refs.load(Acquire) > 2 {}
297 let next_ptr = other_tail.next.load(Acquire);
298 if next_ptr == null_mut() {
299 break;
300 }
301 other_tail = BufferMeta::borrow(next_ptr);
302 }
303
304 loop {
306 let this_head = self.head.load(Acquire);
307 if self.head.compare_and_swap(this_head, other_head, AcqRel) != this_head {
308 continue;
309 } else {
310 other_tail.next.store(this_head, Release);
311 break;
312 }
313 }
314 self.count.fetch_add(other_count, AcqRel);
315 }
316
317 pub fn count(&self) -> usize {
318 self.count.load(Acquire)
319 }
320
321 pub fn iter(&self) -> ListIterator<T, A> {
322 let buffer = BufferMeta::borrow(self.head.load(Acquire));
323 ListIterator {
324 current: buffer.head.load(Acquire),
325 buffer,
326 }
327 }
328}
329
330impl<T: Default + Copy, A: GlobalAlloc + Default> Drop for List<T, A> {
331 fn drop(&mut self) {
332 unsafe {
333 let mut node_ptr = self.head.load(Acquire);
334 while node_ptr as usize != 0 {
335 let next_ptr = (&*node_ptr).next.load(Acquire);
336 BufferMeta::unref(node_ptr);
337 node_ptr = next_ptr;
338 }
339 }
340 }
341}
342
343impl<T: Default + Copy, A: GlobalAlloc + Default> Default for List<T, A> {
344 fn default() -> Self {
345 Self::new(32)
346 }
347}
348
349impl<T: Default, A: GlobalAlloc + Default> BufferMeta<T, A> {
350 pub fn new(buffer_cap: usize) -> *mut BufferMeta<T, A> {
351 let self_size = mem::size_of::<Self>();
352 let meta_size = self_size + align_padding(self_size, CACHE_LINE_SIZE);
353 let slots_size = mem::size_of::<usize>();
354 let data_size = mem::size_of::<T>();
355 let tuple_size = slots_size + data_size;
356 let tuple_size_aligned = if tuple_size <= 8 {
357 8
358 } else if tuple_size <= 16 {
359 16
360 } else if tuple_size <= 32 {
361 32
362 } else {
363 tuple_size + align_padding(tuple_size, CACHE_LINE_SIZE)
364 };
365 let total_size = meta_size + tuple_size_aligned * buffer_cap;
366 let head_page = alloc_mem::<A>(total_size) as *mut Self;
367 let head_page_addr = head_page as usize;
368 let slots_start = head_page_addr + meta_size;
369 unsafe {
370 ptr::write(
371 head_page,
372 Self {
373 head: AtomicUsize::new(0),
374 next: AtomicPtr::new(null_mut()),
375 refs: AtomicUsize::new(1),
376 lower_bound: slots_start,
377 tuple_size,
378 total_size,
379 },
380 );
381 }
382 head_page
383 }
384
385 pub fn unref(buffer: *mut Self) {
386 let rc = {
387 let buffer = unsafe { &*buffer };
388 buffer.refs.fetch_sub(1, AcqRel)
389 };
390 if rc == 1 {
391 Self::gc(buffer);
392 }
393 }
394
395 fn gc(buffer: *mut Self) {
396 let buffer_ref = unsafe { &*buffer };
397 let total_size = buffer_ref.total_size;
398 if mem::needs_drop::<T>() {
399 Self::flush_buffer(buffer_ref, &mut Some(|x| drop(x)), &mut 0);
400 }
401 dealloc_mem::<A>(buffer as usize, total_size)
402 }
403
404 fn flush_buffer<F>(buffer: &Self, retain: &mut Option<F>, counter: &mut usize)
407 where
408 F: FnMut((usize, T)),
409 {
410 let size_of_obj = mem::size_of::<T>();
411 let data_bound = buffer.head.load(Acquire);
412 let mut slot_addr = buffer.lower_bound;
413 debug_assert!(
414 buffer.refs.load(Acquire) <= 2 || buffer.refs.load(Acquire) >= 256,
415 "Reference counting check failed"
416 );
417 for _ in 0..data_bound {
418 unsafe {
419 let slot = intrinsics::atomic_load_relaxed(slot_addr as *const usize);
420 if slot != EMPTY_SLOT && slot != SENTINEL_SLOT {
421 let mut rest = (slot, T::default());
422 if size_of_obj > 0 {
423 rest.1 = ptr::read((slot_addr + mem::size_of::<usize>()) as *const T);
424 }
425 if let Some(retain) = retain {
426 retain(rest);
427 }
428 *counter += 1;
429 }
430 }
431 slot_addr += buffer.tuple_size;
432 }
433 buffer.head.store(0, Release);
434 }
435
436 fn drop_out<F>(
437 buffer_ptr: *mut Self,
438 retain: &mut Option<F>,
439 counter: &mut usize,
440 ) -> Option<*mut Self>
441 where
442 F: FnMut((usize, T)),
443 {
444 let buffer = BufferMeta::borrow(buffer_ptr);
445 let next_ptr = buffer.next.load(Acquire);
446 let backoff = Backoff::new();
447 let word_bits = mem::size_of::<usize>() << 3;
448 let flag = 1 << (word_bits - 1);
449 loop {
450 let rc = buffer.refs.load(Acquire);
451 if rc > flag {
452 return None;
454 }
455 let flag_swap = buffer.refs.compare_and_swap(rc, rc | flag, AcqRel);
456 if flag_swap == rc {
457 break;
458 } else if flag_swap > flag {
459 return None;
461 } else {
462 backoff.spin();
463 }
464 }
465 loop {
466 let rc = buffer.refs.load(Acquire);
468 debug_assert!(rc > flag, "get reference {:x}, value {}", rc, rc & !flag);
469 let rc = rc & !flag;
470 if rc <= 1 {
471 buffer.refs.store(2, Release);
473 return Some(next_ptr);
474 } else if rc == 2 {
475 buffer.refs.store(rc, Release);
477 BufferMeta::flush_buffer(&*buffer, retain, counter);
478 BufferMeta::unref(buffer_ptr);
479 return Some(next_ptr);
480 }
481 backoff.spin();
482 }
483 }
484
485 fn borrow(buffer: *mut Self) -> BufferRef<T, A> {
486 {
487 let buffer = unsafe { &*buffer };
488 buffer.refs.fetch_add(1, AcqRel);
489 }
490 BufferRef { ptr: buffer }
491 }
492
493 fn flag_ptr_of(&self, index: usize) -> *mut usize {
494 (self.lower_bound + index * self.tuple_size) as *mut usize
495 }
496
497 fn object_ptr_of(&self, flag_ptr: *mut usize) -> *mut T {
498 (flag_ptr as usize + mem::size_of::<usize>()) as *mut T
499 }
500}
501
502struct BufferRef<T: Default, A: GlobalAlloc + Default> {
503 ptr: *mut BufferMeta<T, A>,
504}
505
506impl<T: Default, A: GlobalAlloc + Default> Drop for BufferRef<T, A> {
507 fn drop(&mut self) {
508 BufferMeta::unref(self.ptr);
509 }
510}
511
512impl<T: Default, A: GlobalAlloc + Default> Deref for BufferRef<T, A> {
513 type Target = BufferMeta<T, A>;
514
515 fn deref(&self) -> &Self::Target {
516 unsafe { &*self.ptr }
517 }
518}
519
520impl<T: Default + Clone + Copy, A: GlobalAlloc + Default> Iterator for ListIterator<T, A> {
521 type Item = (usize, T);
522
523 fn next(&mut self) -> Option<Self::Item> {
524 loop {
525 if self.current == 0 {
526 let next_buffer_ptr = self.buffer.next.load(Acquire);
527 if next_buffer_ptr == null_mut() {
528 return None;
529 } else {
530 self.buffer = BufferMeta::borrow(next_buffer_ptr);
531 self.current = self.buffer.head.load(Acquire);
532 continue;
533 }
534 }
535 let current_flag_ptr = self.buffer.flag_ptr_of(self.current - 1);
536 unsafe {
537 let mut result = (*current_flag_ptr, T::default());
538 if mem::size_of::<T>() > 0 {
539 result.1 = (*self.buffer.object_ptr_of(current_flag_ptr)).clone()
540 }
541 self.current -= 1;
542 if result.0 != EMPTY_SLOT && result.0 != SENTINEL_SLOT {
543 return Some(result);
544 }
545 };
546 }
547 }
548}
549
550pub struct WordList<A: GlobalAlloc + Default = System> {
551 inner: List<(), A>,
552}
553
554impl<A: GlobalAlloc + Default> WordList<A> {
555 pub fn with_capacity(cap: usize) -> Self {
556 Self {
557 inner: List::new(cap),
558 }
559 }
560 pub fn new() -> Self {
561 Self::with_capacity(512)
562 }
563 pub fn push(&self, data: usize) {
564 debug_assert_ne!(data, 0);
565 debug_assert_ne!(data, 1);
566 self.inner.push(data, ())
567 }
568 pub fn exclusive_push(&self, data: usize) {
569 debug_assert_ne!(data, 0);
570 debug_assert_ne!(data, 1);
571 self.inner.exclusive_push(data, ())
572 }
573 pub fn pop(&self) -> Option<usize> {
574 self.inner.pop().map(|(data, _)| data)
575 }
576
577 pub fn drop_out_all<F>(&self, retain: Option<F>)
578 where
579 F: FnMut((usize, ())),
580 {
581 self.inner.drop_out_all(retain);
582 }
583 pub fn prepend_with(&self, other: &Self) {
584 self.inner.prepend_with(&other.inner)
585 }
586 pub fn count(&self) -> usize {
587 self.inner.count()
588 }
589 pub fn iter(&self) -> ListIterator<(), A> {
590 self.inner.iter()
591 }
592}
593
594pub struct ObjectList<T: Default + Copy, A: GlobalAlloc + Default> {
595 inner: List<T, A>,
596}
597
598impl<T: Default + Copy, A: GlobalAlloc + Default> ObjectList<T, A> {
599 pub fn with_capacity(cap: usize) -> Self {
600 Self {
601 inner: List::new(cap),
602 }
603 }
604 pub fn new() -> Self {
605 Self::with_capacity(512)
606 }
607 pub fn push(&self, data: T) {
608 self.inner.push(!0, data)
609 }
610 pub fn exclusive_push(&self, data: T) {
611 self.inner.exclusive_push(!0, data)
612 }
613 pub fn pop(&self) -> Option<T> {
614 self.inner.pop().map(|(_, obj)| obj)
615 }
616
617 pub fn drop_out_all<F>(&self, retain: Option<F>)
618 where
619 F: FnMut((usize, T)),
620 {
621 self.inner.drop_out_all(retain)
622 }
623
624 pub fn prepend_with(&self, other: &Self) {
625 self.inner.prepend_with(&other.inner)
626 }
627 pub fn count(&self) -> usize {
628 self.inner.count()
629 }
630 pub fn iter(&self) -> ListIterator<T, A> {
631 self.inner.iter()
632 }
633}
634
635#[inline]
636pub fn dealloc_mem<A: GlobalAlloc + Default>(ptr: usize, size: usize) {
637 let a = A::default();
638 let align = 16;
639 let layout = Layout::from_size_align(size, align).unwrap();
640 unsafe { a.dealloc(ptr as *mut u8, layout) }
641}
642
643#[inline]
644pub fn alloc_mem<A: GlobalAlloc + Default>(size: usize) -> usize {
645 let a = A::default();
646 let align = 16;
647 let layout = Layout::from_size_align(size, align).unwrap();
648 (unsafe { a.alloc_zeroed(layout) }) as usize
650}
651
652#[cfg(feature = "exchange_backoff")]
653mod exchange {
654
655 use super::*;
656 use crate::rand::XorRand;
657 use core::cell::UnsafeCell;
658 use core::marker::PhantomData;
659 use smallvec::SmallVec;
660 use std::cmp::max;
661 use std::sync::atomic::fence;
662 use std::sync::atomic::Ordering::SeqCst;
663 use std::time::Instant;
664
665 const EXCHANGE_EMPTY: usize = 0;
666 const EXCHANGE_WAITING: usize = 1;
667 const EXCHANGE_BUSY: usize = 2;
668 const EXCHANGE_SPIN_WAIT_NS: usize = 150;
669 const MAXIMUM_EXCHANGE_SLOTS: usize = 16;
670
671 type ExchangeData<T> = Option<(usize, T)>;
672 type ExchangeArrayVec<T> = SmallVec<[ExchangeSlot<T>; MAXIMUM_EXCHANGE_SLOTS]>;
673
674 pub struct ExchangeSlot<T: Default + Copy> {
675 state: AtomicUsize,
676 data: UnsafeCell<Option<ExchangeData<T>>>,
677 data_state: AtomicUsize,
678 }
679
680 pub struct ExchangeArray<T: Default + Copy, A: GlobalAlloc + Default> {
681 rand: XorRand,
682 shadow: PhantomData<A>,
683 capacity: usize,
684 slots: ExchangeArrayVec<T>,
685 }
686
687 impl<T: Default + Copy> ExchangeSlot<T> {
688 fn new() -> Self {
689 Self {
690 state: AtomicUsize::new(EXCHANGE_EMPTY),
691 data: UnsafeCell::new(None),
692 data_state: AtomicUsize::new(EXCHANGE_EMPTY),
693 }
694 }
695
696 fn exchange(&self, data: ExchangeData<T>) -> Result<ExchangeData<T>, ExchangeData<T>> {
697 let state = self.state.load(Acquire);
699 let backoff = Backoff::new();
700 if state == EXCHANGE_EMPTY {
701 self.wait_state_data_until(state, &backoff);
702 if self
703 .state
704 .compare_and_swap(EXCHANGE_EMPTY, EXCHANGE_WAITING, AcqRel)
705 == EXCHANGE_EMPTY
706 {
707 self.store_state_data(Some(data));
708 let now = Instant::now();
709 loop {
710 if (now.elapsed().as_nanos() as usize) < EXCHANGE_SPIN_WAIT_NS
712 || self.state.compare_and_swap(EXCHANGE_WAITING, EXCHANGE_EMPTY, AcqRel) == EXCHANGE_BUSY
714 {
715 if self.state.load(Acquire) != EXCHANGE_BUSY {
716 continue;
717 }
718 self.wait_state_data_until(EXCHANGE_BUSY, &backoff);
719 self.state.store(EXCHANGE_EMPTY, Release);
720 let mut data_result = None;
721 self.swap_state_data(&mut data_result);
722 if let Some(res) = data_result {
723 return Ok(res);
724 } else {
725 unreachable!();
726 }
727 } else {
728 assert_eq!(
730 self.state.load(Acquire),
731 EXCHANGE_EMPTY,
732 "Bad state after bail"
733 );
734 let mut returned_data_state = None;
735 self.swap_state_data(&mut returned_data_state);
736 if let Some(returned_data) = returned_data_state {
737 return Err(returned_data);
743 } else {
744 unreachable!()
745 }
746 }
747 }
748 } else {
749 return Err(data);
750 }
751 } else if state == EXCHANGE_WAITING {
752 if self
754 .state
755 .compare_and_swap(EXCHANGE_WAITING, EXCHANGE_BUSY, AcqRel)
756 == EXCHANGE_WAITING
757 {
758 self.wait_state_data_until(EXCHANGE_WAITING, &backoff);
759 let mut data_result = Some(data);
760 self.swap_state_data(&mut data_result);
761 if let Some(res) = data_result {
762 return Ok(res);
763 } else {
764 unreachable!()
765 }
766 } else {
767 return Err(data);
768 }
769 } else if state == EXCHANGE_BUSY {
770 return Err(data);
771 } else {
772 unreachable!(
773 "Got state {}, real state {}",
774 state,
775 self.state.load(Acquire)
776 );
777 }
778 }
779
780 fn store_state_data(&self, data: Option<ExchangeData<T>>) {
781 let data_content_ptr = self.data.get();
782 unsafe { ptr::write(data_content_ptr, data) }
783 fence(SeqCst);
784 self.data_state.store(self.state.load(Acquire), Release);
785 }
786
787 fn wait_state_data_until(&self, expecting: usize, backoff: &Backoff) {
788 while self.data_state.load(Acquire) != expecting {
789 backoff.spin();
790 }
791 }
792
793 fn wait_state_data_sync(&self, backoff: &Backoff) {
794 self.wait_state_data_until(self.state.load(Acquire), backoff);
795 }
796
797 fn swap_state_data(&self, data: &mut Option<ExchangeData<T>>) {
798 let mut data_content_mut = unsafe { &mut *self.data.get() };
799 mem::swap(data, data_content_mut);
800 fence(SeqCst);
801 self.data_state.store(self.state.load(Acquire), Release);
802 }
803 }
804
805 unsafe impl<T: Default + Copy> Sync for ExchangeSlot<T> {}
806 unsafe impl<T: Default + Copy> Send for ExchangeSlot<T> {}
807
808 impl<T: Default + Copy, A: GlobalAlloc + Default> ExchangeArray<T, A> {
809 pub fn new() -> Self {
810 let num_cpus = num_cpus::get();
811 let default_capacity = num_cpus >> 3;
812 Self::with_capacity(min(
813 max(default_capacity, 2) as usize,
814 MAXIMUM_EXCHANGE_SLOTS,
815 ))
816 }
817
818 pub fn with_capacity(cap: usize) -> Self {
819 let mut slots = SmallVec::with_capacity(cap);
820 for i in 0..cap {
821 slots.push(ExchangeSlot::new());
822 }
823 Self {
824 slots,
825 rand: XorRand::new(cap),
826 shadow: PhantomData,
827 capacity: cap,
828 }
829 }
830
831 pub fn exchange(&self, data: ExchangeData<T>) -> Result<ExchangeData<T>, ExchangeData<T>> {
832 let slot_num = self.rand.rand_range(0, self.capacity - 1);
833 let slot = &self.slots[slot_num];
834 slot.exchange(data)
835 }
836
837 pub fn worth_exchange(&self, rc: usize) -> bool {
838 rc >= self.slots.capacity()
839 }
840 }
841
842 unsafe impl<T: Default + Copy, A: GlobalAlloc + Default> Send for ExchangeArray<T, A> {}
843 unsafe impl<T: Default + Copy, A: GlobalAlloc + Default> Sync for ExchangeArray<T, A> {}
844
845 #[cfg(test)]
846 mod test {
847 use super::*;
848 use crate::list::*;
849 use std::alloc::{Global, System};
850 use std::collections::BTreeSet;
851 use std::sync::atomic::AtomicUsize;
852 use std::sync::atomic::Ordering::Relaxed;
853 use std::sync::{Arc, Mutex};
854 use std::thread;
855
856 #[test]
857 #[ignore]
858 pub fn exchange() {
859 let exchg = Arc::new(ExchangeSlot::new());
860 let exchg_1 = exchg.clone();
861 let exchg_2 = exchg.clone();
862 let attempt_cycles = 10000;
863 let sum_board = Arc::new(Mutex::new(BTreeSet::new()));
864 let sum_board_1 = sum_board.clone();
865 let sum_board_2 = sum_board.clone();
866 let hit_count = Arc::new(AtomicUsize::new(0));
867 let hit_count_1 = hit_count.clone();
868 let hit_count_2 = hit_count.clone();
869 assert_eq!(
870 exchg.exchange(Some((0, ()))),
871 Err(Some((0, ()))),
872 "No paring exchange shall return the parameter"
873 );
874 let th1 = thread::spawn(move || {
875 for i in 0..attempt_cycles {
876 let res = exchg_2.exchange(Some((i, ())));
877 if res.is_ok() {
878 hit_count_2.fetch_add(1, Relaxed);
879 }
880 assert!(sum_board_2
881 .lock()
882 .unwrap()
883 .insert(res.unwrap_or_else(|err| err)));
884 }
885 });
886 let th2 = thread::spawn(move || {
887 for i in attempt_cycles..attempt_cycles * 2 {
888 let res = exchg_1.exchange(Some((i, ())));
889 if res.is_ok() {
890 hit_count_1.fetch_add(1, Relaxed);
891 }
892 assert!(sum_board_1
893 .lock()
894 .unwrap()
895 .insert(res.unwrap_or_else(|err| err)));
896 }
897 });
898 th1.join();
899 th2.join();
900 assert!(hit_count.load(Acquire) > 0);
901 assert_eq!(sum_board.lock().unwrap().len(), attempt_cycles * 2);
902 for i in 0..attempt_cycles * 2 {
903 assert!(
904 sum_board.lock().unwrap().contains(&Some((i, ()))),
905 "expecting {} but not found",
906 i
907 );
908 }
909 }
910 }
911}
912
913#[cfg(test)]
914mod test {
915 use crate::list::*;
916 use std::alloc::{Global, System};
917 use std::collections::BTreeSet;
918 use std::sync::atomic::AtomicUsize;
919 use std::sync::atomic::Ordering::Relaxed;
920 use std::sync::{Arc, Mutex};
921 use std::thread;
922
923 #[test]
924 pub fn general() {
925 let list = WordList::<System>::new();
926 let page_size = page_size::get();
927 for i in 2..page_size {
928 list.push(i);
929 }
930 for i in (2..page_size).rev() {
931 assert_eq!(list.pop(), Some(i));
932 }
933 for i in 2..page_size {
934 assert_eq!(list.pop(), None);
935 }
936 list.push(32);
937 list.push(25);
938 let mut iter = list.iter();
939 assert_eq!(iter.next().unwrap().0, 25);
940 assert_eq!(iter.next().unwrap().0, 32);
941 assert_eq!(list.count(), 2);
942 let mut dropped = vec![];
943 list.drop_out_all(Some(|x| {
944 dropped.push(x);
945 }));
946 assert_eq!(dropped, vec![(25, ()), (32, ())]);
947 assert_eq!(list.count(), 0);
948 }
949
950 #[test]
951 pub fn parallel_insertion() {}
952
953 #[test]
954 pub fn parallel() {
955 let page_size = page_size::get();
956 let list = Arc::new(ObjectList::<usize, System>::with_capacity(64));
957 let mut threads = (2..page_size)
958 .map(|i| {
959 let list = list.clone();
960 thread::spawn(move || {
961 list.push(i);
962 })
963 })
964 .collect::<Vec<_>>();
965 for t in threads {
966 t.join();
967 }
968
969 let mut counter = 0;
970 while list.pop().is_some() {
971 counter += 1;
972 }
973 assert_eq!(counter, page_size - 2);
974
975 for i in 2..page_size {
978 list.push(i);
979 }
980 let recev_list = Arc::new(WordList::<System>::with_capacity(64));
981 threads = (page_size..(page_size * 2))
982 .map(|i| {
983 let list = list.clone();
984 let recev_list = recev_list.clone();
985 thread::spawn(move || {
986 if i % 2 == 0 {
987 list.push(i);
988 } else {
989 let pop_val = list.pop().unwrap();
990 recev_list.push(pop_val);
991 }
992 })
993 })
994 .collect::<Vec<_>>();
995 for t in threads {
996 t.join();
997 }
998
999 let mut agg = vec![];
1000 while let Some(v) = list.pop() {
1001 agg.push(v);
1002 }
1003 while let Some(v) = recev_list.pop() {
1004 agg.push(v);
1005 }
1006 assert_eq!(recev_list.count(), 0, "receive counter not match");
1007 assert_eq!(list.count(), 0, "origin counter not match");
1008 let total_insertion = page_size + page_size / 2 - 2;
1009 assert_eq!(agg.len(), total_insertion, "unmatch before dedup");
1010 agg.sort();
1011 agg.dedup_by_key(|k| *k);
1012 assert_eq!(agg.len(), total_insertion, "unmatch after dedup");
1013 }
1014}