1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
//! Please see the struct level documentation.

use ccl_crossbeam_epoch::{self as epoch, Atomic, Guard, Owned, Pointer};
use std::mem;
use std::ptr;
use std::sync::atomic::Ordering;

/// Aquire a guard. These are needed when accessing a stack. Since aquiring a guard has a significant cost,
/// you may wish to aquire a guard once and pass it around when doing bulk operations.
/// For most use cases you will not need this.
///
/// Please note that no memory consumed by objects removed after the guard was aquired can be reclaimed
/// until the guard has been dropped.
#[inline]
pub fn aquire_guard() -> Guard {
    epoch::pin()
}

/// ConcurrentStack is a general purpose threadsafe and lockfree FILO/LIFO stack.
pub struct ConcurrentStack<T> {
    head: Atomic<Node<T>>,
}

impl<T> Drop for ConcurrentStack<T> {
    #[inline]
    fn drop(&mut self) {
        let guard = &aquire_guard();
        let head = self.head.load(Ordering::SeqCst, guard);

        if !head.is_null() {
            unsafe {
                guard.defer_destroy(head);
            }
        }
    }
}

struct Node<T> {
    data: T,
    next: Atomic<Node<T>>,
}

impl<T> Drop for Node<T> {
    #[inline]
    fn drop(&mut self) {
        let guard = &aquire_guard();
        let next = self.next.load(Ordering::SeqCst, guard);

        if !next.is_null() {
            unsafe {
                guard.defer_destroy(next);
            }
        }
    }
}

impl<T> ConcurrentStack<T> {
    /// Create a new, empty stack.
    pub fn new() -> Self {
        Self {
            head: Atomic::null(),
        }
    }

    /// Push an element to the top of the stack.
    #[inline]
    pub fn push(&self, data: T) {
        let guard = &aquire_guard();
        self.push_with_guard(data, guard);
    }

    /// Pop the uppermost element of the stack.
    #[inline]
    pub fn pop(&self) -> Option<T> {
        let guard = &aquire_guard();
        self.pop_with_guard(guard)
    }

    /// Create an iterator over all elements in the stack.
    #[inline]
    pub fn pop_iter(&self) -> StackIter<T> {
        StackIter {
            guard: aquire_guard(),
            stack: &self,
        }
    }

    /// Push an element with an existing guard.
    #[inline]
    pub fn push_with_guard(&self, data: T, guard: &Guard) {
        let mut node = Owned::new(Node {
            data,
            next: Atomic::null(),
        });

        loop {
            let head = self.head.load(Ordering::SeqCst, guard);

            node.next.store(head, Ordering::SeqCst);

            match self
                .head
                .compare_and_set(head, node, Ordering::SeqCst, guard)
            {
                Ok(_) => return,
                Err(err) => node = err.new,
            }
        }
    }

    /// Pop an element with an existing guard.
    #[inline]
    pub fn pop_with_guard(&self, guard: &Guard) -> Option<T> {
        loop {
            let head_ptr = self.head.load(Ordering::SeqCst, guard);

            match unsafe { head_ptr.as_ref() } {
                Some(head) => unsafe {
                    let next = head.next.load(Ordering::SeqCst, guard);

                    if let Ok(head_ptr) =
                        self.head
                            .compare_and_set(head_ptr, next, Ordering::SeqCst, guard)
                    {
                        guard.defer_unchecked(move || {
                            mem::drop(Box::from_raw(head_ptr.into_usize() as *mut u8));
                        });

                        return Some(ptr::read(&(*head).data));
                    }
                },
                None => return None,
            }
        }
    }
}

impl<T> Default for ConcurrentStack<T> {
    fn default() -> Self {
        Self::new()
    }
}

/// An iterator over a stack.
pub struct StackIter<'a, T> {
    guard: Guard,
    stack: &'a ConcurrentStack<T>,
}

impl<'a, T> Iterator for StackIter<'a, T> {
    type Item = T;

    #[inline]
    fn next(&mut self) -> Option<Self::Item> {
        self.stack.pop_with_guard(&self.guard)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use rayon::prelude::*;

    #[test]
    fn insert_then_pop_assert_1024_st() {
        let stack = ConcurrentStack::new();

        for _ in 0..1024_i32 {
            stack.push(9);
        }

        for _ in 0..1024_i32 {
            assert_eq!(9, stack.pop().unwrap());
        }
    }

    #[test]
    fn insert_then_pop_assert_rayon() {
        let stack = ConcurrentStack::new();

        let iter_c: i32 = 1024 * 1024;

        (0..iter_c).into_par_iter().for_each(|_| {
            stack.push(9);
        });

        (0..iter_c).into_par_iter().for_each(|_| {
            assert_eq!(9, stack.pop().unwrap());
        });
    }
}