lightning/
list.rs

1// usize lock-free, wait free paged linked list stack
2use crate::align_padding;
3use core::alloc::Layout;
4use core::borrow::BorrowMut;
5use core::cmp::min;
6use core::ops::Deref;
7use core::ptr;
8use core::ptr::null_mut;
9use core::sync::atomic::Ordering::{AcqRel, Acquire, Release};
10use core::sync::atomic::{AtomicPtr, AtomicUsize};
11use core::{intrinsics, mem};
12use crossbeam_utils::Backoff;
13#[cfg(feature = "exchange_backoff")]
14use exchange::*;
15use std::alloc::{GlobalAlloc, System};
16
17const CACHE_LINE_SIZE: usize = 64;
18const EMPTY_SLOT: usize = 0;
19const SENTINEL_SLOT: usize = 1;
20
21struct BufferMeta<T: Default, A: GlobalAlloc + Default> {
22    head: AtomicUsize,
23    next: AtomicPtr<BufferMeta<T, A>>,
24    refs: AtomicUsize,
25    lower_bound: usize,
26    tuple_size: usize,
27    total_size: usize,
28}
29
30pub struct List<T: Default + Copy, A: GlobalAlloc + Default> {
31    head: AtomicPtr<BufferMeta<T, A>>,
32    count: AtomicUsize,
33    buffer_cap: usize,
34    #[cfg(feature = "exchange_backoff")]
35    exchange: ExchangeArray<T, A>,
36}
37
38pub struct ListIterator<T: Default + Copy, A: GlobalAlloc + Default> {
39    buffer: BufferRef<T, A>,
40    current: usize,
41}
42
43impl<T: Default + Copy, A: GlobalAlloc + Default> List<T, A> {
44    pub fn new(buffer_cap: usize) -> Self {
45        let first_buffer = BufferMeta::new(buffer_cap);
46        Self {
47            head: AtomicPtr::new(first_buffer),
48            count: AtomicUsize::new(0),
49            #[cfg(feature = "exchange_backoff")]
50            exchange: ExchangeArray::new(),
51            buffer_cap,
52        }
53    }
54
55    pub fn push(&self, flag: usize, data: T) {
56        self.do_push(flag, data);
57        self.count.fetch_add(1, AcqRel);
58    }
59
60    fn do_push(&self, flag: usize, data: T) {
61        debug_assert_ne!(flag, EMPTY_SLOT);
62        debug_assert_ne!(flag, SENTINEL_SLOT);
63        loop {
64            let obj_size = mem::size_of::<T>();
65            let head_ptr = self.head.load(Acquire);
66            let page = BufferMeta::borrow(head_ptr);
67            let slot_pos = page.head.load(Acquire);
68            let next_pos = slot_pos + 1;
69            if next_pos > self.buffer_cap {
70                // buffer overflow, make new and link to last buffer
71                let new_head = BufferMeta::new(self.buffer_cap);
72                unsafe {
73                    (*new_head).next.store(head_ptr, Release);
74                    debug_assert_eq!((*new_head).total_size, page.total_size);
75                }
76                if self.head.compare_and_swap(head_ptr, new_head, AcqRel) != head_ptr {
77                    BufferMeta::unref(new_head);
78                }
79            // either case, retry
80            } else {
81                // in this part, we will try to reason about the push on an buffer
82                // It will first try to CAS the head then write the item, finally store a
83                // non-zero flag (or value) to the slot.
84
85                // Note that zero in the slot indicates not complete on pop, then pop
86                // will back off and try again
87                if page.head.compare_and_swap(slot_pos, next_pos, AcqRel) == slot_pos {
88                    let slot_ptr = page.flag_ptr_of(slot_pos);
89                    unsafe {
90                        if obj_size != 0 {
91                            let obj_ptr = page.object_ptr_of(slot_ptr);
92                            ptr::write(obj_ptr, data);
93                        }
94                        let slot_flag =
95                            intrinsics::atomic_cxchg_relaxed(slot_ptr, EMPTY_SLOT, flag).0;
96                        assert_eq!(
97                            slot_flag, EMPTY_SLOT,
98                            "Cannot swap flag for push. Flag is {} expect empty",
99                            slot_flag
100                        );
101                    }
102                    return;
103                }
104            }
105            #[cfg(feature = "exchange_backoff")]
106            match self.exchange.exchange(Some((flag, data))) {
107                Ok(Some(tuple)) | Err(Some(tuple)) => {
108                    // exchanged a push, reset this push parameters
109                    flag = tuple.0;
110                    data = tuple.1;
111                }
112                Ok(None) | Err(None) => {
113                    // pushed to other popping thread
114                    return;
115                }
116            }
117        }
118    }
119
120    pub fn exclusive_push(&self, flag: usize, data: T) {
121        // user ensure the push is exclusive, thus no CAS except for header
122        let obj_size = mem::size_of::<T>();
123        loop {
124            let head_ptr = self.head.load(Acquire);
125            let page = BufferMeta::borrow(head_ptr);
126            let slot_pos = page.head.load(Acquire);
127            let next_pos = slot_pos + 1;
128            if next_pos > self.buffer_cap {
129                // buffer overflow, make new and link to last buffer
130                let new_head = BufferMeta::new(self.buffer_cap);
131                unsafe {
132                    (*new_head).next.store(head_ptr, Release);
133                }
134                if self.head.compare_and_swap(head_ptr, new_head, Release) != head_ptr {
135                    BufferMeta::unref(new_head);
136                }
137            // either case, retry
138            } else {
139                page.head.store(next_pos, Release);
140                let slot_ptr = page.flag_ptr_of(slot_pos);
141                unsafe {
142                    if obj_size != 0 {
143                        let obj_ptr = page.object_ptr_of(slot_ptr);
144                        ptr::write(obj_ptr, data);
145                    }
146                    intrinsics::atomic_store_relaxed(slot_ptr, flag);
147                }
148                self.count.fetch_add(1, AcqRel);
149                return;
150            }
151        }
152    }
153
154    pub fn pop(&self) -> Option<(usize, T)> {
155        if self.count.load(Acquire) == 0 {
156            return None;
157        }
158        let backoff = Backoff::new();
159        loop {
160            let head_ptr = self.head.load(Acquire);
161            let page = BufferMeta::borrow(head_ptr);
162            let slot = page.head.load(Acquire);
163            let obj_size = mem::size_of::<T>();
164            let next_buffer_ptr = page.next.load(Acquire);
165            if slot == 0 && next_buffer_ptr == null_mut() {
166                // empty buffer chain
167                return None;
168            }
169            if slot == 0 && next_buffer_ptr != null_mut() {
170                // last item, need to remove this head and swap to the next one
171                // CAS page head to four times of the upper bound indicates this buffer is obsolete
172                if self
173                    .head
174                    .compare_and_swap(head_ptr, next_buffer_ptr, AcqRel)
175                    == head_ptr
176                {
177                    // At thia point, there may have some items in the old head.
178                    // Need to check spin wait for other threads to finish working on this buffer
179                    // and check head to put back remaining items into the list
180                    // This approach may break ordering but we have no other choice here and
181                    // the side effect is not significant to its use case
182                    drop(page);
183                    let dropped_next = BufferMeta::drop_out(
184                        head_ptr,
185                        &mut Some(|(flag, data)| {
186                            if flag != EMPTY_SLOT && flag != SENTINEL_SLOT {
187                                self.do_push(flag, data); // push without bump counter
188                            }
189                        }),
190                        &mut 0,
191                    );
192                    debug_assert_eq!(dropped_next.unwrap_or(null_mut()), next_buffer_ptr);
193                // don't need to unref here for drop out did this for us
194                } else {
195                    backoff.spin();
196                }
197                continue;
198            }
199            let mut res = None;
200            if slot > 0 {
201                unsafe {
202                    let new_slot = slot - 1;
203                    let new_slot_ptr = page.flag_ptr_of(new_slot);
204                    let new_slot_flag = intrinsics::atomic_load_relaxed(new_slot_ptr);
205                    if new_slot_flag != 0
206                        // first things first, swap the slot to zero if it is not zero
207                        && intrinsics::atomic_cxchg_relaxed(new_slot_ptr, new_slot_flag, EMPTY_SLOT).1
208                    {
209                        res = Some((new_slot_flag, T::default()));
210                        if obj_size != 0 && new_slot_flag != SENTINEL_SLOT {
211                            res.as_mut().map(|(_, obj)| {
212                                let obj_ptr = page.object_ptr_of(new_slot_ptr) as *mut T;
213                                *obj = ptr::read(obj_ptr as *mut T)
214                            });
215                        }
216                        let swapped = page.head.compare_and_swap(slot, new_slot, AcqRel);
217                        debug_assert!(
218                            swapped >= slot,
219                            "Exclusive pop failed, {} expect {}",
220                            swapped,
221                            slot
222                        );
223                        if swapped != slot {
224                            // Swap page head failed
225                            // The only possible scenario is that there was a push for
226                            // pop will back off if flag is detected as zero
227                            // In this case, we have a hole in the list, should indicate pop that
228                            // this slot does not have any useful information, should pop again
229                            intrinsics::atomic_store(new_slot_ptr, SENTINEL_SLOT);
230                        }
231                        if new_slot_flag != SENTINEL_SLOT {
232                            self.count.fetch_sub(1, AcqRel);
233                            return res;
234                        }
235                    }
236                }
237            } else {
238                return res;
239            }
240            #[cfg(feature = "exchange_backoff")]
241            match self.exchange.exchange(None) {
242                Ok(Some(tuple)) | Err(Some(tuple)) => {
243                    // exchanged a push, return it
244                    self.count.fetch_sub(1, AcqRel);
245                    return Some(tuple);
246                }
247                Ok(None) | Err(None) => {
248                    // meet another pop
249                }
250            }
251        }
252    }
253    pub fn drop_out_all<F>(&self, mut retain: Option<F>)
254    where
255        F: FnMut((usize, T)),
256    {
257        let count = self.count.load(Acquire);
258        if count == 0 {
259            return;
260        }
261        let retain = retain.borrow_mut();
262        let pop_threshold = min(self.buffer_cap >> 1, 64);
263        if count < pop_threshold {
264            let pop_amount = pop_threshold << 1; // double of the threshold
265            for _ in 0..pop_amount {
266                if let Some(pair) = self.pop() {
267                    if let Some(retain) = retain {
268                        retain(pair);
269                    }
270                } else {
271                    // the only stop condition is that there is no more elements to pop
272                    // if it still not empty, continue to swap buffer approach
273                    return;
274                }
275            }
276        }
277        let new_head_buffer = BufferMeta::new(self.buffer_cap);
278        let mut buffer_ptr = self.head.swap(new_head_buffer, AcqRel);
279        let null = null_mut();
280        let mut counter = 0;
281        while buffer_ptr != null {
282            buffer_ptr = BufferMeta::drop_out(buffer_ptr, retain, &mut counter).unwrap_or(null);
283        }
284        self.count.fetch_sub(counter, AcqRel);
285    }
286
287    pub fn prepend_with(&self, other: &Self) {
288        if other.count.load(Acquire) == 0 {
289            return;
290        }
291        let other_head = other.head.swap(BufferMeta::new(self.buffer_cap), AcqRel);
292        let other_count = other.count.swap(0, AcqRel);
293        let mut other_tail = BufferMeta::borrow(other_head);
294        // probe the last buffer in other link
295        loop {
296            while other_tail.refs.load(Acquire) > 2 {}
297            let next_ptr = other_tail.next.load(Acquire);
298            if next_ptr == null_mut() {
299                break;
300            }
301            other_tail = BufferMeta::borrow(next_ptr);
302        }
303
304        // CAS this head to other head then reset other tail next buffer to this head
305        loop {
306            let this_head = self.head.load(Acquire);
307            if self.head.compare_and_swap(this_head, other_head, AcqRel) != this_head {
308                continue;
309            } else {
310                other_tail.next.store(this_head, Release);
311                break;
312            }
313        }
314        self.count.fetch_add(other_count, AcqRel);
315    }
316
317    pub fn count(&self) -> usize {
318        self.count.load(Acquire)
319    }
320
321    pub fn iter(&self) -> ListIterator<T, A> {
322        let buffer = BufferMeta::borrow(self.head.load(Acquire));
323        ListIterator {
324            current: buffer.head.load(Acquire),
325            buffer,
326        }
327    }
328}
329
330impl<T: Default + Copy, A: GlobalAlloc + Default> Drop for List<T, A> {
331    fn drop(&mut self) {
332        unsafe {
333            let mut node_ptr = self.head.load(Acquire);
334            while node_ptr as usize != 0 {
335                let next_ptr = (&*node_ptr).next.load(Acquire);
336                BufferMeta::unref(node_ptr);
337                node_ptr = next_ptr;
338            }
339        }
340    }
341}
342
343impl<T: Default + Copy, A: GlobalAlloc + Default> Default for List<T, A> {
344    fn default() -> Self {
345        Self::new(32)
346    }
347}
348
349impl<T: Default, A: GlobalAlloc + Default> BufferMeta<T, A> {
350    pub fn new(buffer_cap: usize) -> *mut BufferMeta<T, A> {
351        let self_size = mem::size_of::<Self>();
352        let meta_size = self_size + align_padding(self_size, CACHE_LINE_SIZE);
353        let slots_size = mem::size_of::<usize>();
354        let data_size = mem::size_of::<T>();
355        let tuple_size = slots_size + data_size;
356        let tuple_size_aligned = if tuple_size <= 8 {
357            8
358        } else if tuple_size <= 16 {
359            16
360        } else if tuple_size <= 32 {
361            32
362        } else {
363            tuple_size + align_padding(tuple_size, CACHE_LINE_SIZE)
364        };
365        let total_size = meta_size + tuple_size_aligned * buffer_cap;
366        let head_page = alloc_mem::<A>(total_size) as *mut Self;
367        let head_page_addr = head_page as usize;
368        let slots_start = head_page_addr + meta_size;
369        unsafe {
370            ptr::write(
371                head_page,
372                Self {
373                    head: AtomicUsize::new(0),
374                    next: AtomicPtr::new(null_mut()),
375                    refs: AtomicUsize::new(1),
376                    lower_bound: slots_start,
377                    tuple_size,
378                    total_size,
379                },
380            );
381        }
382        head_page
383    }
384
385    pub fn unref(buffer: *mut Self) {
386        let rc = {
387            let buffer = unsafe { &*buffer };
388            buffer.refs.fetch_sub(1, AcqRel)
389        };
390        if rc == 1 {
391            Self::gc(buffer);
392        }
393    }
394
395    fn gc(buffer: *mut Self) {
396        let buffer_ref = unsafe { &*buffer };
397        let total_size = buffer_ref.total_size;
398        if mem::needs_drop::<T>() {
399            Self::flush_buffer(buffer_ref, &mut Some(|x| drop(x)), &mut 0);
400        }
401        dealloc_mem::<A>(buffer as usize, total_size)
402    }
403
404    // only use when the buffer is about to be be dead
405    // this require reference checking
406    fn flush_buffer<F>(buffer: &Self, retain: &mut Option<F>, counter: &mut usize)
407    where
408        F: FnMut((usize, T)),
409    {
410        let size_of_obj = mem::size_of::<T>();
411        let data_bound = buffer.head.load(Acquire);
412        let mut slot_addr = buffer.lower_bound;
413        debug_assert!(
414            buffer.refs.load(Acquire) <= 2 || buffer.refs.load(Acquire) >= 256,
415            "Reference counting check failed"
416        );
417        for _ in 0..data_bound {
418            unsafe {
419                let slot = intrinsics::atomic_load_relaxed(slot_addr as *const usize);
420                if slot != EMPTY_SLOT && slot != SENTINEL_SLOT {
421                    let mut rest = (slot, T::default());
422                    if size_of_obj > 0 {
423                        rest.1 = ptr::read((slot_addr + mem::size_of::<usize>()) as *const T);
424                    }
425                    if let Some(retain) = retain {
426                        retain(rest);
427                    }
428                    *counter += 1;
429                }
430            }
431            slot_addr += buffer.tuple_size;
432        }
433        buffer.head.store(0, Release);
434    }
435
436    fn drop_out<F>(
437        buffer_ptr: *mut Self,
438        retain: &mut Option<F>,
439        counter: &mut usize,
440    ) -> Option<*mut Self>
441    where
442        F: FnMut((usize, T)),
443    {
444        let buffer = BufferMeta::borrow(buffer_ptr);
445        let next_ptr = buffer.next.load(Acquire);
446        let backoff = Backoff::new();
447        let word_bits = mem::size_of::<usize>() << 3;
448        let flag = 1 << (word_bits - 1);
449        loop {
450            let rc = buffer.refs.load(Acquire);
451            if rc > flag {
452                // discovered other drop out, give up
453                return None;
454            }
455            let flag_swap = buffer.refs.compare_and_swap(rc, rc | flag, AcqRel);
456            if flag_swap == rc {
457                break;
458            } else if flag_swap > flag {
459                // discovered other drop out, give up
460                return None;
461            } else {
462                backoff.spin();
463            }
464        }
465        loop {
466            //wait until reference counter reach 2 one for not garbage one for current reference)
467            let rc = buffer.refs.load(Acquire);
468            debug_assert!(rc > flag, "get reference {:x}, value {}", rc, rc & !flag);
469            let rc = rc & !flag;
470            if rc <= 1 {
471                // this buffer is marked to be gc, untouched
472                buffer.refs.store(2, Release);
473                return Some(next_ptr);
474            } else if rc == 2 {
475                // no other reference, flush and break out waiting
476                buffer.refs.store(rc, Release);
477                BufferMeta::flush_buffer(&*buffer, retain, counter);
478                BufferMeta::unref(buffer_ptr);
479                return Some(next_ptr);
480            }
481            backoff.spin();
482        }
483    }
484
485    fn borrow(buffer: *mut Self) -> BufferRef<T, A> {
486        {
487            let buffer = unsafe { &*buffer };
488            buffer.refs.fetch_add(1, AcqRel);
489        }
490        BufferRef { ptr: buffer }
491    }
492
493    fn flag_ptr_of(&self, index: usize) -> *mut usize {
494        (self.lower_bound + index * self.tuple_size) as *mut usize
495    }
496
497    fn object_ptr_of(&self, flag_ptr: *mut usize) -> *mut T {
498        (flag_ptr as usize + mem::size_of::<usize>()) as *mut T
499    }
500}
501
502struct BufferRef<T: Default, A: GlobalAlloc + Default> {
503    ptr: *mut BufferMeta<T, A>,
504}
505
506impl<T: Default, A: GlobalAlloc + Default> Drop for BufferRef<T, A> {
507    fn drop(&mut self) {
508        BufferMeta::unref(self.ptr);
509    }
510}
511
512impl<T: Default, A: GlobalAlloc + Default> Deref for BufferRef<T, A> {
513    type Target = BufferMeta<T, A>;
514
515    fn deref(&self) -> &Self::Target {
516        unsafe { &*self.ptr }
517    }
518}
519
520impl<T: Default + Clone + Copy, A: GlobalAlloc + Default> Iterator for ListIterator<T, A> {
521    type Item = (usize, T);
522
523    fn next(&mut self) -> Option<Self::Item> {
524        loop {
525            if self.current == 0 {
526                let next_buffer_ptr = self.buffer.next.load(Acquire);
527                if next_buffer_ptr == null_mut() {
528                    return None;
529                } else {
530                    self.buffer = BufferMeta::borrow(next_buffer_ptr);
531                    self.current = self.buffer.head.load(Acquire);
532                    continue;
533                }
534            }
535            let current_flag_ptr = self.buffer.flag_ptr_of(self.current - 1);
536            unsafe {
537                let mut result = (*current_flag_ptr, T::default());
538                if mem::size_of::<T>() > 0 {
539                    result.1 = (*self.buffer.object_ptr_of(current_flag_ptr)).clone()
540                }
541                self.current -= 1;
542                if result.0 != EMPTY_SLOT && result.0 != SENTINEL_SLOT {
543                    return Some(result);
544                }
545            };
546        }
547    }
548}
549
550pub struct WordList<A: GlobalAlloc + Default = System> {
551    inner: List<(), A>,
552}
553
554impl<A: GlobalAlloc + Default> WordList<A> {
555    pub fn with_capacity(cap: usize) -> Self {
556        Self {
557            inner: List::new(cap),
558        }
559    }
560    pub fn new() -> Self {
561        Self::with_capacity(512)
562    }
563    pub fn push(&self, data: usize) {
564        debug_assert_ne!(data, 0);
565        debug_assert_ne!(data, 1);
566        self.inner.push(data, ())
567    }
568    pub fn exclusive_push(&self, data: usize) {
569        debug_assert_ne!(data, 0);
570        debug_assert_ne!(data, 1);
571        self.inner.exclusive_push(data, ())
572    }
573    pub fn pop(&self) -> Option<usize> {
574        self.inner.pop().map(|(data, _)| data)
575    }
576
577    pub fn drop_out_all<F>(&self, retain: Option<F>)
578    where
579        F: FnMut((usize, ())),
580    {
581        self.inner.drop_out_all(retain);
582    }
583    pub fn prepend_with(&self, other: &Self) {
584        self.inner.prepend_with(&other.inner)
585    }
586    pub fn count(&self) -> usize {
587        self.inner.count()
588    }
589    pub fn iter(&self) -> ListIterator<(), A> {
590        self.inner.iter()
591    }
592}
593
594pub struct ObjectList<T: Default + Copy, A: GlobalAlloc + Default> {
595    inner: List<T, A>,
596}
597
598impl<T: Default + Copy, A: GlobalAlloc + Default> ObjectList<T, A> {
599    pub fn with_capacity(cap: usize) -> Self {
600        Self {
601            inner: List::new(cap),
602        }
603    }
604    pub fn new() -> Self {
605        Self::with_capacity(512)
606    }
607    pub fn push(&self, data: T) {
608        self.inner.push(!0, data)
609    }
610    pub fn exclusive_push(&self, data: T) {
611        self.inner.exclusive_push(!0, data)
612    }
613    pub fn pop(&self) -> Option<T> {
614        self.inner.pop().map(|(_, obj)| obj)
615    }
616
617    pub fn drop_out_all<F>(&self, retain: Option<F>)
618    where
619        F: FnMut((usize, T)),
620    {
621        self.inner.drop_out_all(retain)
622    }
623
624    pub fn prepend_with(&self, other: &Self) {
625        self.inner.prepend_with(&other.inner)
626    }
627    pub fn count(&self) -> usize {
628        self.inner.count()
629    }
630    pub fn iter(&self) -> ListIterator<T, A> {
631        self.inner.iter()
632    }
633}
634
635#[inline]
636pub fn dealloc_mem<A: GlobalAlloc + Default>(ptr: usize, size: usize) {
637    let a = A::default();
638    let align = 16;
639    let layout = Layout::from_size_align(size, align).unwrap();
640    unsafe { a.dealloc(ptr as *mut u8, layout) }
641}
642
643#[inline]
644pub fn alloc_mem<A: GlobalAlloc + Default>(size: usize) -> usize {
645    let a = A::default();
646    let align = 16;
647    let layout = Layout::from_size_align(size, align).unwrap();
648    // must be all zeroed
649    (unsafe { a.alloc_zeroed(layout) }) as usize
650}
651
652#[cfg(feature = "exchange_backoff")]
653mod exchange {
654
655    use super::*;
656    use crate::rand::XorRand;
657    use core::cell::UnsafeCell;
658    use core::marker::PhantomData;
659    use smallvec::SmallVec;
660    use std::cmp::max;
661    use std::sync::atomic::fence;
662    use std::sync::atomic::Ordering::SeqCst;
663    use std::time::Instant;
664
665    const EXCHANGE_EMPTY: usize = 0;
666    const EXCHANGE_WAITING: usize = 1;
667    const EXCHANGE_BUSY: usize = 2;
668    const EXCHANGE_SPIN_WAIT_NS: usize = 150;
669    const MAXIMUM_EXCHANGE_SLOTS: usize = 16;
670
671    type ExchangeData<T> = Option<(usize, T)>;
672    type ExchangeArrayVec<T> = SmallVec<[ExchangeSlot<T>; MAXIMUM_EXCHANGE_SLOTS]>;
673
674    pub struct ExchangeSlot<T: Default + Copy> {
675        state: AtomicUsize,
676        data: UnsafeCell<Option<ExchangeData<T>>>,
677        data_state: AtomicUsize,
678    }
679
680    pub struct ExchangeArray<T: Default + Copy, A: GlobalAlloc + Default> {
681        rand: XorRand,
682        shadow: PhantomData<A>,
683        capacity: usize,
684        slots: ExchangeArrayVec<T>,
685    }
686
687    impl<T: Default + Copy> ExchangeSlot<T> {
688        fn new() -> Self {
689            Self {
690                state: AtomicUsize::new(EXCHANGE_EMPTY),
691                data: UnsafeCell::new(None),
692                data_state: AtomicUsize::new(EXCHANGE_EMPTY),
693            }
694        }
695
696        fn exchange(&self, data: ExchangeData<T>) -> Result<ExchangeData<T>, ExchangeData<T>> {
697            // Memory ordering is somehow important here
698            let state = self.state.load(Acquire);
699            let backoff = Backoff::new();
700            if state == EXCHANGE_EMPTY {
701                self.wait_state_data_until(state, &backoff);
702                if self
703                    .state
704                    .compare_and_swap(EXCHANGE_EMPTY, EXCHANGE_WAITING, AcqRel)
705                    == EXCHANGE_EMPTY
706                {
707                    self.store_state_data(Some(data));
708                    let now = Instant::now();
709                    loop {
710                        // check if it can spin
711                        if (now.elapsed().as_nanos() as usize) < EXCHANGE_SPIN_WAIT_NS
712                            // if not, CAS to empty, can fail by other thread set BUSY
713                            || self.state.compare_and_swap(EXCHANGE_WAITING, EXCHANGE_EMPTY, AcqRel) == EXCHANGE_BUSY
714                        {
715                            if self.state.load(Acquire) != EXCHANGE_BUSY {
716                                continue;
717                            }
718                            self.wait_state_data_until(EXCHANGE_BUSY, &backoff);
719                            self.state.store(EXCHANGE_EMPTY, Release);
720                            let mut data_result = None;
721                            self.swap_state_data(&mut data_result);
722                            if let Some(res) = data_result {
723                                return Ok(res);
724                            } else {
725                                unreachable!();
726                            }
727                        } else {
728                            // no other thead come and take over, return input
729                            assert_eq!(
730                                self.state.load(Acquire),
731                                EXCHANGE_EMPTY,
732                                "Bad state after bail"
733                            );
734                            let mut returned_data_state = None;
735                            self.swap_state_data(&mut returned_data_state);
736                            if let Some(returned_data) = returned_data_state {
737                                //                            assert_eq!(
738                                //                                returned_data.as_ref().map(|(f, _)| *f), origin_data_flag,
739                                //                                "return check error. Current state: {}, in state {}",
740                                //                                self.state.load(Acquire), state
741                                //                            );
742                                return Err(returned_data);
743                            } else {
744                                unreachable!()
745                            }
746                        }
747                    }
748                } else {
749                    return Err(data);
750                }
751            } else if state == EXCHANGE_WAITING {
752                // find a pair, get it first
753                if self
754                    .state
755                    .compare_and_swap(EXCHANGE_WAITING, EXCHANGE_BUSY, AcqRel)
756                    == EXCHANGE_WAITING
757                {
758                    self.wait_state_data_until(EXCHANGE_WAITING, &backoff);
759                    let mut data_result = Some(data);
760                    self.swap_state_data(&mut data_result);
761                    if let Some(res) = data_result {
762                        return Ok(res);
763                    } else {
764                        unreachable!()
765                    }
766                } else {
767                    return Err(data);
768                }
769            } else if state == EXCHANGE_BUSY {
770                return Err(data);
771            } else {
772                unreachable!(
773                    "Got state {}, real state {}",
774                    state,
775                    self.state.load(Acquire)
776                );
777            }
778        }
779
780        fn store_state_data(&self, data: Option<ExchangeData<T>>) {
781            let data_content_ptr = self.data.get();
782            unsafe { ptr::write(data_content_ptr, data) }
783            fence(SeqCst);
784            self.data_state.store(self.state.load(Acquire), Release);
785        }
786
787        fn wait_state_data_until(&self, expecting: usize, backoff: &Backoff) {
788            while self.data_state.load(Acquire) != expecting {
789                backoff.spin();
790            }
791        }
792
793        fn wait_state_data_sync(&self, backoff: &Backoff) {
794            self.wait_state_data_until(self.state.load(Acquire), backoff);
795        }
796
797        fn swap_state_data(&self, data: &mut Option<ExchangeData<T>>) {
798            let mut data_content_mut = unsafe { &mut *self.data.get() };
799            mem::swap(data, data_content_mut);
800            fence(SeqCst);
801            self.data_state.store(self.state.load(Acquire), Release);
802        }
803    }
804
805    unsafe impl<T: Default + Copy> Sync for ExchangeSlot<T> {}
806    unsafe impl<T: Default + Copy> Send for ExchangeSlot<T> {}
807
808    impl<T: Default + Copy, A: GlobalAlloc + Default> ExchangeArray<T, A> {
809        pub fn new() -> Self {
810            let num_cpus = num_cpus::get();
811            let default_capacity = num_cpus >> 3;
812            Self::with_capacity(min(
813                max(default_capacity, 2) as usize,
814                MAXIMUM_EXCHANGE_SLOTS,
815            ))
816        }
817
818        pub fn with_capacity(cap: usize) -> Self {
819            let mut slots = SmallVec::with_capacity(cap);
820            for i in 0..cap {
821                slots.push(ExchangeSlot::new());
822            }
823            Self {
824                slots,
825                rand: XorRand::new(cap),
826                shadow: PhantomData,
827                capacity: cap,
828            }
829        }
830
831        pub fn exchange(&self, data: ExchangeData<T>) -> Result<ExchangeData<T>, ExchangeData<T>> {
832            let slot_num = self.rand.rand_range(0, self.capacity - 1);
833            let slot = &self.slots[slot_num];
834            slot.exchange(data)
835        }
836
837        pub fn worth_exchange(&self, rc: usize) -> bool {
838            rc >= self.slots.capacity()
839        }
840    }
841
842    unsafe impl<T: Default + Copy, A: GlobalAlloc + Default> Send for ExchangeArray<T, A> {}
843    unsafe impl<T: Default + Copy, A: GlobalAlloc + Default> Sync for ExchangeArray<T, A> {}
844
845    #[cfg(test)]
846    mod test {
847        use super::*;
848        use crate::list::*;
849        use std::alloc::{Global, System};
850        use std::collections::BTreeSet;
851        use std::sync::atomic::AtomicUsize;
852        use std::sync::atomic::Ordering::Relaxed;
853        use std::sync::{Arc, Mutex};
854        use std::thread;
855
856        #[test]
857        #[ignore]
858        pub fn exchange() {
859            let exchg = Arc::new(ExchangeSlot::new());
860            let exchg_1 = exchg.clone();
861            let exchg_2 = exchg.clone();
862            let attempt_cycles = 10000;
863            let sum_board = Arc::new(Mutex::new(BTreeSet::new()));
864            let sum_board_1 = sum_board.clone();
865            let sum_board_2 = sum_board.clone();
866            let hit_count = Arc::new(AtomicUsize::new(0));
867            let hit_count_1 = hit_count.clone();
868            let hit_count_2 = hit_count.clone();
869            assert_eq!(
870                exchg.exchange(Some((0, ()))),
871                Err(Some((0, ()))),
872                "No paring exchange shall return the parameter"
873            );
874            let th1 = thread::spawn(move || {
875                for i in 0..attempt_cycles {
876                    let res = exchg_2.exchange(Some((i, ())));
877                    if res.is_ok() {
878                        hit_count_2.fetch_add(1, Relaxed);
879                    }
880                    assert!(sum_board_2
881                        .lock()
882                        .unwrap()
883                        .insert(res.unwrap_or_else(|err| err)));
884                }
885            });
886            let th2 = thread::spawn(move || {
887                for i in attempt_cycles..attempt_cycles * 2 {
888                    let res = exchg_1.exchange(Some((i, ())));
889                    if res.is_ok() {
890                        hit_count_1.fetch_add(1, Relaxed);
891                    }
892                    assert!(sum_board_1
893                        .lock()
894                        .unwrap()
895                        .insert(res.unwrap_or_else(|err| err)));
896                }
897            });
898            th1.join();
899            th2.join();
900            assert!(hit_count.load(Acquire) > 0);
901            assert_eq!(sum_board.lock().unwrap().len(), attempt_cycles * 2);
902            for i in 0..attempt_cycles * 2 {
903                assert!(
904                    sum_board.lock().unwrap().contains(&Some((i, ()))),
905                    "expecting {} but not found",
906                    i
907                );
908            }
909        }
910    }
911}
912
913#[cfg(test)]
914mod test {
915    use crate::list::*;
916    use std::alloc::{Global, System};
917    use std::collections::BTreeSet;
918    use std::sync::atomic::AtomicUsize;
919    use std::sync::atomic::Ordering::Relaxed;
920    use std::sync::{Arc, Mutex};
921    use std::thread;
922
923    #[test]
924    pub fn general() {
925        let list = WordList::<System>::new();
926        let page_size = page_size::get();
927        for i in 2..page_size {
928            list.push(i);
929        }
930        for i in (2..page_size).rev() {
931            assert_eq!(list.pop(), Some(i));
932        }
933        for i in 2..page_size {
934            assert_eq!(list.pop(), None);
935        }
936        list.push(32);
937        list.push(25);
938        let mut iter = list.iter();
939        assert_eq!(iter.next().unwrap().0, 25);
940        assert_eq!(iter.next().unwrap().0, 32);
941        assert_eq!(list.count(), 2);
942        let mut dropped = vec![];
943        list.drop_out_all(Some(|x| {
944            dropped.push(x);
945        }));
946        assert_eq!(dropped, vec![(25, ()), (32, ())]);
947        assert_eq!(list.count(), 0);
948    }
949
950    #[test]
951    pub fn parallel_insertion() {}
952
953    #[test]
954    pub fn parallel() {
955        let page_size = page_size::get();
956        let list = Arc::new(ObjectList::<usize, System>::with_capacity(64));
957        let mut threads = (2..page_size)
958            .map(|i| {
959                let list = list.clone();
960                thread::spawn(move || {
961                    list.push(i);
962                })
963            })
964            .collect::<Vec<_>>();
965        for t in threads {
966            t.join();
967        }
968
969        let mut counter = 0;
970        while list.pop().is_some() {
971            counter += 1;
972        }
973        assert_eq!(counter, page_size - 2);
974
975        // push is fine
976
977        for i in 2..page_size {
978            list.push(i);
979        }
980        let recev_list = Arc::new(WordList::<System>::with_capacity(64));
981        threads = (page_size..(page_size * 2))
982            .map(|i| {
983                let list = list.clone();
984                let recev_list = recev_list.clone();
985                thread::spawn(move || {
986                    if i % 2 == 0 {
987                        list.push(i);
988                    } else {
989                        let pop_val = list.pop().unwrap();
990                        recev_list.push(pop_val);
991                    }
992                })
993            })
994            .collect::<Vec<_>>();
995        for t in threads {
996            t.join();
997        }
998
999        let mut agg = vec![];
1000        while let Some(v) = list.pop() {
1001            agg.push(v);
1002        }
1003        while let Some(v) = recev_list.pop() {
1004            agg.push(v);
1005        }
1006        assert_eq!(recev_list.count(), 0, "receive counter not match");
1007        assert_eq!(list.count(), 0, "origin counter not match");
1008        let total_insertion = page_size + page_size / 2 - 2;
1009        assert_eq!(agg.len(), total_insertion, "unmatch before dedup");
1010        agg.sort();
1011        agg.dedup_by_key(|k| *k);
1012        assert_eq!(agg.len(), total_insertion, "unmatch after dedup");
1013    }
1014}