Skip to main content

bounded_spsc_queue/
lib.rs

1extern crate core;
2
3use core::alloc::Layout;
4use core::{mem, ptr};
5use std::alloc;
6use std::cell::Cell;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::usize;
10
11const CACHELINE_LEN: usize = 64;
12
13macro_rules! cacheline_pad {
14    ($N:expr) => {
15        CACHELINE_LEN / std::mem::size_of::<usize>() - $N
16    };
17}
18
19/// The internal memory buffer used by the queue.
20///
21/// Buffer holds a pointer to allocated memory which represents the bounded
22/// ring buffer, as well as a head and tail atomicUsize which the producer and consumer
23/// use to track location in the ring.
24#[repr(C)]
25pub struct Buffer<T> {
26    /// A pointer to the allocated ring buffer
27    buffer: *mut T,
28
29    /// The bounded size as specified by the user.  If the queue reaches capacity, it will block
30    /// until values are poppped off.
31    capacity: usize,
32
33    /// The allocated size of the ring buffer, in terms of number of values (not physical memory).
34    /// This will be the next power of two larger than `capacity`
35    allocated_size: usize,
36    _padding1: [usize; cacheline_pad!(3)],
37
38    /// Consumer cacheline:
39
40    /// Index position of the current head
41    head: AtomicUsize,
42    shadow_tail: Cell<usize>,
43    _padding2: [usize; cacheline_pad!(2)],
44
45    /// Producer cacheline:
46
47    /// Index position of current tail
48    tail: AtomicUsize,
49    shadow_head: Cell<usize>,
50    _padding3: [usize; cacheline_pad!(2)],
51}
52
53unsafe impl<T: Sync> Sync for Buffer<T> {}
54
55/// A handle to the queue which allows consuming values from the buffer
56pub struct Consumer<T> {
57    buffer: Arc<Buffer<T>>,
58}
59
60/// A handle to the queue which allows adding values onto the buffer
61pub struct Producer<T> {
62    buffer: Arc<Buffer<T>>,
63}
64
65unsafe impl<T: Send> Send for Consumer<T> {}
66unsafe impl<T: Send> Send for Producer<T> {}
67
68impl<T> Buffer<T> {
69    /// Attempt to pop a value off the buffer.
70    ///
71    /// If the buffer is empty, this method will not block.  Instead, it will return `None`
72    /// signifying the buffer was empty.  The caller may then decide what to do next (e.g. spin-wait,
73    /// sleep, process something else, etc)
74    ///
75    /// # Examples
76    ///
77    /// ```
78    /// // Attempt to pop off a value
79    /// let t = buffer.try_pop();
80    /// match t {
81    ///   Some(v) => {}, // Got a value
82    ///   None => {}     // Buffer empty, try again later
83    /// }
84    /// ```
85    pub fn try_pop(&self) -> Option<T> {
86        let current_head = self.head.load(Ordering::Relaxed);
87
88        if current_head == self.shadow_tail.get() {
89            self.shadow_tail.set(self.tail.load(Ordering::Acquire));
90            if current_head == self.shadow_tail.get() {
91                return None;
92            }
93        }
94
95        let v = unsafe { ptr::read(self.load(current_head)) };
96        self.head
97            .store(current_head.wrapping_add(1), Ordering::Release);
98        Some(v)
99    }
100
101    /// Attempts to pop (and discard) at most `n` values off the buffer.
102    ///
103    /// Returns the amount of values successfully skipped.
104    ///
105    /// # Safety
106    ///
107    /// *WARNING:* This will leak at most `n` values from the buffer, i.e. the destructors of the
108    /// objects skipped over will not be called. This function is intended to be used on buffers that
109    /// contain non-`Drop` data, such as a `Buffer<f32>`.
110    pub fn skip_n(&self, n: usize) -> usize {
111        let current_head = self.head.load(Ordering::Relaxed);
112
113        self.shadow_tail.set(self.tail.load(Ordering::Acquire));
114        if current_head == self.shadow_tail.get() {
115            return 0;
116        }
117        let mut diff = self.shadow_tail.get().wrapping_sub(current_head);
118        if diff > n {
119            diff = n
120        }
121        self.head
122            .store(current_head.wrapping_add(diff), Ordering::Release);
123        diff
124    }
125
126    /// Pop a value off the buffer.
127    ///
128    /// This method will block until the buffer is non-empty.  The waiting strategy is a simple
129    /// spin-wait and will repeatedly call `try_pop()` until a value is available.  If you do not
130    /// want a spin-wait burning CPU, you should call `try_pop()` directly and implement a different
131    /// waiting strategy.
132    ///
133    /// # Examples
134    ///
135    /// ```
136    /// // Block until a value is ready
137    /// let t = buffer.pop();
138    /// ```
139    pub fn pop(&self) -> T {
140        loop {
141            match self.try_pop() {
142                None => {}
143                Some(v) => return v,
144            }
145        }
146    }
147
148    /// Attempt to push a value onto the buffer.
149    ///
150    /// If the buffer is full, this method will not block.  Instead, it will return `Some(v)`, where
151    /// `v` was the value attempting to be pushed onto the buffer.  If the value was successfully
152    /// pushed onto the buffer, `None` will be returned signifying success.
153    ///
154    /// # Examples
155    ///
156    /// ```
157    /// // Attempt to push a value onto the buffer
158    /// let t = buffer.try_push(123);
159    /// match t {
160    ///   Some(v) => {}, // Buffer was full, try again later
161    ///   None => {}     // Value was successfully pushed onto the buffer
162    /// }
163    /// ```
164    pub fn try_push(&self, v: T) -> Option<T> {
165        let current_tail = self.tail.load(Ordering::Relaxed);
166
167        if self.shadow_head.get() + self.capacity <= current_tail {
168            self.shadow_head.set(self.head.load(Ordering::Relaxed));
169            if self.shadow_head.get() + self.capacity <= current_tail {
170                return Some(v);
171            }
172        }
173
174        unsafe {
175            self.store(current_tail, v);
176        }
177        self.tail
178            .store(current_tail.wrapping_add(1), Ordering::Release);
179        None
180    }
181
182    /// Push a value onto the buffer.
183    ///
184    /// This method will block until the buffer is non-full.  The waiting strategy is a simple
185    /// spin-wait and will repeatedly call `try_push()` until the value can be added.  If you do not
186    /// want a spin-wait burning CPU, you should call `try_push()` directly and implement a different
187    /// waiting strategy.
188    ///
189    /// # Examples
190    ///
191    /// ```
192    /// // Block until we can push this value onto the buffer
193    /// buffer.try_push(123);
194    /// ```
195    pub fn push(&self, v: T) {
196        let mut t = v;
197        loop {
198            match self.try_push(t) {
199                Some(rv) => t = rv,
200                None => return,
201            }
202        }
203    }
204
205    /// Load a value out of the buffer
206    ///
207    /// # Safety
208    ///
209    /// This method assumes the caller has:
210    /// - Initialized a valid block of memory
211    /// - Specified an index position that contains valid data
212    ///
213    /// The caller can use either absolute or monotonically increasing index positions, since
214    /// buffer wrapping is handled inside the method.
215    #[inline]
216    unsafe fn load(&self, pos: usize) -> &T {
217        &*self.buffer
218            .offset((pos & (self.allocated_size - 1)) as isize)
219    }
220
221    /// Store a value in the buffer
222    ///
223    /// # Safety
224    ///
225    /// This method assumes the caller has:
226    /// - Initialized a valid block of memory
227    #[inline]
228    unsafe fn store(&self, pos: usize, v: T) {
229        let end = self.buffer
230            .offset((pos & (self.allocated_size - 1)) as isize);
231        ptr::write(&mut *end, v);
232    }
233}
234
235/// Handles deallocation of heap memory when the buffer is dropped
236impl<T> Drop for Buffer<T> {
237    fn drop(&mut self) {
238        // Pop the rest of the values off the queue.  By moving them into this scope,
239        // we implicitly call their destructor
240
241        // TODO this could be optimized to avoid the atomic operations / book-keeping...but
242        // since this is the destructor, there shouldn't be any contention... so meh?
243        while let Some(_) = self.try_pop() {}
244
245        if mem::size_of::<T>() > 0 {
246            unsafe {
247                let layout = Layout::from_size_align(
248                    self.allocated_size * mem::size_of::<T>(),
249                    mem::align_of::<T>(),
250                ).unwrap();
251                alloc::dealloc(self.buffer as *mut u8, layout);
252            }
253        }
254    }
255}
256
257/// Creates a new SPSC Queue, returning a Producer and Consumer handle
258///
259/// Capacity specifies the size of the bounded queue to create.  Actual memory usage
260/// will be `capacity.next_power_of_two() * size_of::<T>()`, since ringbuffers with
261/// power of two sizes are more efficient to operate on (can use a bitwise AND to index
262/// into the ring instead of a more expensive modulo operator).
263///
264/// # Examples
265///
266/// Here is a simple usage of make, using the queue within the same thread:
267///
268/// ```
269/// // Create a queue with capacity to hold 100 values
270/// let (p, c) = make(100);
271///
272/// // Push `123` onto the queue
273/// p.push(123);
274///
275/// // Pop the value back off
276/// let t = c.pop();
277/// assert!(t == 123);
278/// ```
279///
280/// Of course, a SPSC queue is really only useful if you plan to use it in a multi-threaded
281/// environment.  The Producer and Consumer can both be sent to a thread, providing a fast, bounded
282/// one-way communication channel between those threads:
283///
284/// ```
285/// use std::thread;
286///
287/// let (p, c) = make(500);
288///
289/// // Spawn a new thread and move the Producer into it
290/// thread::spawn(move|| {
291///   for i in 0..100000 {
292///     p.push(i as u32);
293///   }
294/// });
295///
296/// // Back in the first thread, start Pop'ing values off the queue
297/// for i in 0..100000 {
298///   let t = c.pop();
299///   assert!(t == i);
300/// }
301///
302/// ```
303///
304/// # Panics
305///
306/// If the requested queue size is larger than available memory (e.g.
307/// `capacity.next_power_of_two() * size_of::<T>() > available memory` ), this function will abort
308/// with an OOM panic.
309pub fn make<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
310    let ptr = unsafe { allocate_buffer(capacity) };
311
312    let arc = Arc::new(Buffer {
313        buffer: ptr,
314        capacity,
315        allocated_size: capacity.next_power_of_two(),
316        _padding1: [0; cacheline_pad!(3)],
317
318        head: AtomicUsize::new(0),
319        shadow_tail: Cell::new(0),
320        _padding2: [0; cacheline_pad!(2)],
321
322        tail: AtomicUsize::new(0),
323        shadow_head: Cell::new(0),
324        _padding3: [0; cacheline_pad!(2)],
325    });
326
327    (
328        Producer {
329            buffer: arc.clone(),
330        },
331        Consumer {
332            buffer: arc.clone(),
333        },
334    )
335}
336
337/// Allocates a memory buffer on the heap and returns a pointer to it
338unsafe fn allocate_buffer<T>(capacity: usize) -> *mut T {
339    let adjusted_size = capacity.next_power_of_two();
340    let size = adjusted_size
341        .checked_mul(mem::size_of::<T>())
342        .expect("capacity overflow");
343
344    let layout = Layout::from_size_align(size, mem::align_of::<T>()).unwrap();
345
346    let ptr = if size > 0 {
347        alloc::alloc(layout) as *mut T
348    } else {
349        mem::align_of::<T>() as *mut T
350    };
351
352    if ptr.is_null() {
353        alloc::handle_alloc_error(layout)
354    } else {
355        ptr
356    }
357}
358
359impl<T> Producer<T> {
360    /// Push a value onto the buffer.
361    ///
362    /// If the buffer is non-full, the operation will execute immediately.  If the buffer is full,
363    /// this method will block until the buffer is non-full.  The waiting strategy is a simple
364    /// spin-wait. If you do not want a spin-wait burning CPU, you should call `try_push()`
365    /// directly and implement a different waiting strategy.
366    ///
367    /// # Examples
368    ///
369    /// ```
370    /// let (producer, _) = make(100);
371    ///
372    /// // Block until we can push this value onto the queue
373    /// producer.push(123);
374    /// ```
375    pub fn push(&self, v: T) {
376        (*self.buffer).push(v);
377    }
378
379    /// Attempt to push a value onto the buffer.
380    ///
381    /// This method does not block.  If the queue is not full, the value will be added to the
382    /// queue and the method will return `None`, signifying success.  If the queue is full,
383    /// this method will return `Some(v)``, where `v` is your original value.
384    ///
385    /// # Examples
386    ///
387    /// ```
388    /// let (producer, _) = make(100);
389    ///
390    /// // Attempt to add this value to the queue
391    /// match producer.try push(123) {
392    ///     Some(v) => {}, // Queue full, try again later
393    ///     None => {}     // Value added to queue
394    /// }
395    /// ```
396    pub fn try_push(&self, v: T) -> Option<T> {
397        (*self.buffer).try_push(v)
398    }
399
400    /// Returns the total capacity of this queue
401    ///
402    /// This value represents the total capacity of the queue when it is full.  It does not
403    /// represent the current usage.  For that, call `size()`.
404    ///
405    /// # Examples
406    ///
407    /// ```
408    /// let (producer, _) = make(100);
409    ///
410    /// assert!(producer.capacity() == 100);
411    /// producer.push(123);
412    /// assert!(producer.capacity() == 100);
413    /// ```
414    pub fn capacity(&self) -> usize {
415        (*self.buffer).capacity
416    }
417
418    /// Returns the current size of the queue
419    ///
420    /// This value represents the current size of the queue.  This value can be from 0-`capacity`
421    /// inclusive.
422    ///
423    /// # Examples
424    ///
425    /// ```
426    /// let (producer, _) = make(100);
427    ///
428    /// assert!(producer.size() == 0);
429    /// producer.push(123);
430    /// assert!(producer.size() == 1);
431    /// ```
432    pub fn size(&self) -> usize {
433        (*self.buffer).tail.load(Ordering::Acquire) - (*self.buffer).head.load(Ordering::Acquire)
434    }
435
436    /// Returns the available space in the queue
437    ///
438    /// This value represents the number of items that can be pushed onto the queue before it
439    /// becomes full.
440    ///
441    /// # Examples
442    ///
443    /// ```
444    /// let (producer, _) = make(100);
445    ///
446    /// assert!(producer.free_space() == 100);
447    /// producer.push(123);
448    /// assert!(producer.free_space() == 99);
449    /// ```
450    pub fn free_space(&self) -> usize {
451        self.capacity() - self.size()
452    }
453}
454
455impl<T> Consumer<T> {
456    /// Pop a value off the queue.
457    ///
458    /// If the buffer contains values, this method will execute immediately and return a value.
459    /// If the buffer is empty, this method will block until a value becomes available.  The
460    /// waiting strategy is a simple spin-wait. If you do not want a spin-wait burning CPU, you
461    /// should call `try_push()` directly and implement a different waiting strategy.
462    ///
463    /// # Examples
464    ///
465    /// ```
466    /// let (_, consumer) = make(100);
467    ///
468    /// // Block until a value becomes available
469    /// let t = consumer.pop();
470    /// ```
471    pub fn pop(&self) -> T {
472        (*self.buffer).pop()
473    }
474
475    /// Attempt to pop a value off the queue.
476    ///
477    /// This method does not block.  If the queue is empty, the method will return `None`.  If
478    /// there is a value available, the method will return `Some(v)`, where `v` is the value
479    /// being popped off the queue.
480    ///
481    /// # Examples
482    ///
483    /// ```
484    /// use bounded_spsc_queue::*;
485    ///
486    /// let (_, consumer) = make(100);
487    ///
488    /// // Attempt to pop a value off the queue
489    /// let t = consumer.try_pop();
490    /// match t {
491    ///     Some(v) => {},      // Successfully popped a value
492    ///     None => {}          // Queue empty, try again later
493    /// }
494    /// ```
495    pub fn try_pop(&self) -> Option<T> {
496        (*self.buffer).try_pop()
497    }
498
499    /// Attempts to pop (and discard) at most `n` values off the buffer.
500    ///
501    /// Returns the amount of values successfully skipped.
502    ///
503    /// # Safety
504    ///
505    /// *WARNING:* This will leak at most `n` values from the buffer, i.e. the destructors of the
506    /// objects skipped over will not be called. This function is intended to be used on buffers that
507    /// contain non-`Drop` data, such as a `Buffer<f32>`.
508    ///
509    /// # Examples
510    ///
511    /// ```
512    /// use bounded_spsc_queue::*;
513    ///
514    /// let (_, consumer) = make(100);
515    ///
516    /// let mut read_position = 0; // current buffer index
517    /// read_position += consumer.skip_n(512); // try to skip at most 512 elements
518    /// ```
519    pub fn skip_n(&self, n: usize) -> usize {
520        (*self.buffer).skip_n(n)
521    }
522    /// Returns the total capacity of this queue
523    ///
524    /// This value represents the total capacity of the queue when it is full.  It does not
525    /// represent the current usage.  For that, call `size()`.
526    ///
527    /// # Examples
528    ///
529    /// ```
530    /// let (_, consumer) = make(100);
531    ///
532    /// assert!(consumer.capacity() == 100);
533    /// let t = consumer.pop();
534    /// assert!(producer.capacity() == 100);
535    /// ```
536    pub fn capacity(&self) -> usize {
537        (*self.buffer).capacity
538    }
539
540    /// Returns the current size of the queue
541    ///
542    /// This value represents the current size of the queue.  This value can be from 0-`capacity`
543    /// inclusive.
544    ///
545    /// # Examples
546    ///
547    /// ```
548    /// let (_, consumer) = make(100);
549    ///
550    /// //... producer pushes somewhere ...
551    ///
552    /// assert!(consumer.size() == 10);
553    /// consumer.pop();
554    /// assert!(producer.size() == 9);
555    /// ```
556    pub fn size(&self) -> usize {
557        (*self.buffer).tail.load(Ordering::Acquire) - (*self.buffer).head.load(Ordering::Acquire)
558    }
559}
560
561#[cfg(test)]
562mod tests {
563
564    use super::*;
565    use std::thread;
566
567    #[test]
568    fn test_buffer_size() {
569        assert_eq!(::std::mem::size_of::<Buffer<()>>(), 3 * CACHELINE_LEN);
570    }
571
572    #[test]
573    fn test_producer_push() {
574        let (p, _) = super::make(10);
575
576        for i in 0..9 {
577            p.push(i);
578            assert!(p.capacity() == 10);
579            assert!(p.size() == i + 1);
580        }
581    }
582
583    #[test]
584    fn test_consumer_pop() {
585        let (p, c) = super::make(10);
586
587        for i in 0..9 {
588            p.push(i);
589            assert!(p.capacity() == 10);
590            assert!(p.size() == i + 1);
591        }
592
593        for i in 0..9 {
594            assert!(c.size() == 9 - i);
595            let t = c.pop();
596            assert!(c.capacity() == 10);
597            assert!(c.size() == 9 - i - 1);
598            assert!(t == i);
599        }
600    }
601
602    #[test]
603    fn test_consumer_skip() {
604        let (p, c) = super::make(10);
605
606        for i in 0..9 {
607            p.push(i);
608            assert!(p.capacity() == 10);
609            assert!(p.size() == i + 1);
610        }
611        assert!(c.size() == 9);
612        assert!(c.skip_n(5) == 5);
613        assert!(c.size() == 4);
614        for i in 0..4 {
615            assert!(c.size() == 4 - i);
616            let t = c.pop();
617            assert!(c.capacity() == 10);
618            assert!(c.size() == 4 - i - 1);
619            assert!(t == i + 5);
620        }
621        assert!(c.size() == 0);
622        assert!(c.skip_n(5) == 0);
623    }
624
625    #[test]
626    fn test_consumer_skip_whole_buf() {
627        let (p, c) = super::make(9);
628
629        for i in 0..9 {
630            p.push(i);
631            assert!(p.capacity() == 9);
632            assert!(p.size() == i + 1);
633        }
634        assert!(c.size() == 9);
635        assert!(c.skip_n(9) == 9);
636        assert!(c.size() == 0);
637    }
638
639    #[test]
640    fn test_try_push() {
641        let (p, _) = super::make(10);
642
643        for i in 0..10 {
644            p.push(i);
645            assert!(p.capacity() == 10);
646            assert!(p.size() == i + 1);
647        }
648
649        match p.try_push(10) {
650            Some(v) => {
651                assert!(v == 10);
652            }
653            None => assert!(false, "Queue should not have accepted another write!"),
654        }
655    }
656
657    #[test]
658    fn test_try_poll() {
659        let (p, c) = super::make(10);
660
661        match c.try_pop() {
662            Some(_) => assert!(false, "Queue was empty but a value was read!"),
663            None => {}
664        }
665
666        p.push(123);
667
668        match c.try_pop() {
669            Some(v) => assert!(v == 123),
670            None => assert!(false, "Queue was not empty but poll() returned nothing!"),
671        }
672
673        match c.try_pop() {
674            Some(_) => assert!(false, "Queue was empty but a value was read!"),
675            None => {}
676        }
677    }
678
679    #[test]
680    fn test_threaded() {
681        let (p, c) = super::make(500);
682
683        thread::spawn(move || {
684            for i in 0..100000 {
685                p.push(i);
686            }
687        });
688
689        for i in 0..100000 {
690            let t = c.pop();
691            assert!(t == i);
692        }
693    }
694
695    extern crate time;
696    use self::time::PreciseTime;
697    use std::sync::mpsc::sync_channel;
698
699    #[test]
700    #[ignore]
701    fn bench_spsc_throughput() {
702        let iterations: i64 = 2i64.pow(14);
703
704        let (p, c) = make(iterations as usize);
705
706        let start = PreciseTime::now();
707        for i in 0..iterations as usize {
708            p.push(i);
709        }
710        let t = c.pop();
711        assert!(t == 0);
712        let end = PreciseTime::now();
713        let throughput =
714            (iterations as f64 / (start.to(end)).num_nanoseconds().unwrap() as f64) * 1000000000f64;
715        println!(
716            "Spsc Throughput: {}/s -- (iterations: {} in {} ns)",
717            throughput,
718            iterations,
719            (start.to(end)).num_nanoseconds().unwrap()
720        );
721    }
722
723    #[test]
724    #[ignore]
725    fn bench_chan_throughput() {
726        let iterations: i64 = 2i64.pow(14);
727
728        let (tx, rx) = sync_channel(iterations as usize);
729
730        let start = PreciseTime::now();
731        for i in 0..iterations as usize {
732            tx.send(i).unwrap();
733        }
734        let t = rx.recv().unwrap();
735        assert!(t == 0);
736        let end = PreciseTime::now();
737        let throughput =
738            (iterations as f64 / (start.to(end)).num_nanoseconds().unwrap() as f64) * 1000000000f64;
739        println!(
740            "Chan Throughput: {}/s -- (iterations: {} in {} ns)",
741            throughput,
742            iterations,
743            (start.to(end)).num_nanoseconds().unwrap()
744        );
745    }
746
747}