fast_spsc_queue/
lib.rs

1use std::{mem, ptr};
2
3struct SpscQueue<V: Send + Sync> {
4    buffer: *mut V,
5    capacity: usize,
6    capacity_mask: usize,
7    // We implement it at the queue level as it's a common requirement and so that V doesn't have to
8    // be a heavier enum with an end message variant.
9    ended: bool,
10    read_next: usize,
11    write_next: usize,
12}
13
14unsafe impl<V: Send + Sync> Send for SpscQueue<V> {}
15
16unsafe impl<V: Send + Sync> Sync for SpscQueue<V> {}
17
18impl<V: Send + Sync> Drop for SpscQueue<V> {
19    fn drop(&mut self) {
20        unsafe {
21            let _ = Vec::from_raw_parts(self.buffer, 0, self.capacity);
22        };
23    }
24}
25
26impl<V: Send + Sync> SpscQueue<V> {
27    pub fn new(capacity_exponent: usize) -> SpscQueue<V> {
28        assert!(capacity_exponent < mem::size_of::<usize>() * 8);
29        let capacity = 1 << capacity_exponent;
30        let mut vec = Vec::with_capacity(capacity);
31        let ptr = vec.as_mut_ptr();
32        mem::forget(vec);
33        SpscQueue {
34            buffer: ptr,
35            capacity,
36            capacity_mask: capacity - 1,
37            ended: false,
38            read_next: 0,
39            write_next: 0,
40        }
41    }
42}
43
44// Producer owns the underlying queue and drops it when itself is released.
45pub struct SpscQueueProducer<V: Send + Sync> {
46    queue: *mut SpscQueue<V>,
47}
48
49unsafe impl<V: Send + Sync> Send for SpscQueueProducer<V> {}
50
51unsafe impl<V: Send + Sync> Sync for SpscQueueProducer<V> {}
52
53impl<V: Send + Sync> Drop for SpscQueueProducer<V> {
54    fn drop(&mut self) {
55        unsafe {
56            let _ = Box::from_raw(self.queue);
57        };
58    }
59}
60
61impl<V: Send + Sync> SpscQueueProducer<V> {
62    pub fn enqueue(&mut self, value: V) -> () {
63        let queue = unsafe { &mut *self.queue };
64        while queue.write_next >= queue.read_next + queue.capacity {
65            // Wait for consumer to catch up.
66        };
67        unsafe { ptr::write(queue.buffer.offset((queue.write_next & queue.capacity_mask) as isize), value) };
68        // Increment after setting buffer element.
69        queue.write_next += 1;
70    }
71
72    pub fn finish(&mut self) -> () {
73        let queue = unsafe { &mut *self.queue };
74        queue.ended = true;
75    }
76}
77
78pub enum MaybeDequeued<V> {
79    Ended,
80    None,
81    Some(V),
82}
83
84pub struct SpscQueueConsumer<V: Send + Sync> {
85    queue: *mut SpscQueue<V>,
86}
87
88unsafe impl<V: Send + Sync> Send for SpscQueueConsumer<V> {}
89
90unsafe impl<V: Send + Sync> Sync for SpscQueueConsumer<V> {}
91
92impl<V: Send + Sync> SpscQueueConsumer<V> {
93    #[inline(always)]
94    fn queue(&self) -> &SpscQueue<V> {
95        unsafe { &*self.queue }
96    }
97
98    #[inline(always)]
99    fn queue_mut(&self) -> &mut SpscQueue<V> {
100        unsafe { &mut *self.queue }
101    }
102
103    #[inline(always)]
104    pub fn is_empty(&self) -> bool {
105        let queue = self.queue();
106        queue.read_next >= queue.write_next
107    }
108
109    pub fn maybe_dequeue(&mut self) -> MaybeDequeued<V> {
110        if self.is_empty() {
111            if self.queue().ended {
112                return MaybeDequeued::Ended;
113            };
114            return MaybeDequeued::None;
115        };
116        let queue = self.queue_mut();
117        let value = unsafe { ptr::read(queue.buffer.offset((queue.read_next & queue.capacity_mask) as isize)) };
118        queue.read_next += 1;
119        MaybeDequeued::Some(value)
120    }
121
122    pub fn dequeue(&mut self) -> Option<V> {
123        loop {
124            match self.maybe_dequeue() {
125                // Wait for producer to provide values.
126                MaybeDequeued::None => {}
127                // We've caught up to the end.
128                MaybeDequeued::Ended => return None,
129                MaybeDequeued::Some(v) => return Some(v),
130            };
131        };
132    }
133}
134
135pub fn create_spsc_queue<V: Send + Sync>(capacity_exponent: usize) -> (SpscQueueProducer<V>, SpscQueueConsumer<V>) {
136    let queue = Box::into_raw(Box::new(SpscQueue::<V>::new(capacity_exponent)));
137    (SpscQueueProducer { queue }, SpscQueueConsumer { queue })
138}