bounded_spsc_queue/
lib.rs

1#![feature(allocator_api)]
2
3extern crate core;
4
5use core::alloc::Layout;
6use core::{mem, ptr};
7use std::alloc;
8use std::cell::Cell;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::usize;
12
13const CACHELINE_LEN: usize = 64;
14
15macro_rules! cacheline_pad {
16    ($N:expr) => {
17        CACHELINE_LEN / std::mem::size_of::<usize>() - $N
18    };
19}
20
21/// The internal memory buffer used by the queue.
22///
23/// Buffer holds a pointer to allocated memory which represents the bounded
24/// ring buffer, as well as a head and tail atomicUsize which the producer and consumer
25/// use to track location in the ring.
26#[repr(C)]
27pub struct Buffer<T> {
28    /// A pointer to the allocated ring buffer
29    buffer: *mut T,
30
31    /// The bounded size as specified by the user.  If the queue reaches capacity, it will block
32    /// until values are poppped off.
33    capacity: usize,
34
35    /// The allocated size of the ring buffer, in terms of number of values (not physical memory).
36    /// This will be the next power of two larger than `capacity`
37    allocated_size: usize,
38    _padding1: [usize; cacheline_pad!(3)],
39
40    /// Consumer cacheline:
41
42    /// Index position of the current head
43    head: AtomicUsize,
44    shadow_tail: Cell<usize>,
45    _padding2: [usize; cacheline_pad!(2)],
46
47    /// Producer cacheline:
48
49    /// Index position of current tail
50    tail: AtomicUsize,
51    shadow_head: Cell<usize>,
52    _padding3: [usize; cacheline_pad!(2)],
53}
54
55unsafe impl<T: Sync> Sync for Buffer<T> {}
56
57/// A handle to the queue which allows consuming values from the buffer
58pub struct Consumer<T> {
59    buffer: Arc<Buffer<T>>,
60}
61
62/// A handle to the queue which allows adding values onto the buffer
63pub struct Producer<T> {
64    buffer: Arc<Buffer<T>>,
65}
66
67unsafe impl<T: Send> Send for Consumer<T> {}
68unsafe impl<T: Send> Send for Producer<T> {}
69
70impl<T> Buffer<T> {
71    /// Attempt to pop a value off the buffer.
72    ///
73    /// If the buffer is empty, this method will not block.  Instead, it will return `None`
74    /// signifying the buffer was empty.  The caller may then decide what to do next (e.g. spin-wait,
75    /// sleep, process something else, etc)
76    ///
77    /// # Examples
78    ///
79    /// ```
80    /// // Attempt to pop off a value
81    /// let t = buffer.try_pop();
82    /// match t {
83    ///   Some(v) => {}, // Got a value
84    ///   None => {}     // Buffer empty, try again later
85    /// }
86    /// ```
87    pub fn try_pop(&self) -> Option<T> {
88        let current_head = self.head.load(Ordering::Relaxed);
89
90        if current_head == self.shadow_tail.get() {
91            self.shadow_tail.set(self.tail.load(Ordering::Acquire));
92            if current_head == self.shadow_tail.get() {
93                return None;
94            }
95        }
96
97        let v = unsafe { ptr::read(self.load(current_head)) };
98        self.head
99            .store(current_head.wrapping_add(1), Ordering::Release);
100        Some(v)
101    }
102
103    /// Attempts to pop (and discard) at most `n` values off the buffer.
104    ///
105    /// Returns the amount of values successfully skipped.
106    ///
107    /// # Safety
108    ///
109    /// *WARNING:* This will leak at most `n` values from the buffer, i.e. the destructors of the
110    /// objects skipped over will not be called. This function is intended to be used on buffers that
111    /// contain non-`Drop` data, such as a `Buffer<f32>`.
112    pub fn skip_n(&self, n: usize) -> usize {
113        let current_head = self.head.load(Ordering::Relaxed);
114
115        self.shadow_tail.set(self.tail.load(Ordering::Acquire));
116        if current_head == self.shadow_tail.get() {
117            return 0;
118        }
119        let mut diff = self.shadow_tail.get().wrapping_sub(current_head);
120        if diff > n {
121            diff = n
122        }
123        self.head
124            .store(current_head.wrapping_add(diff), Ordering::Release);
125        diff
126    }
127
128    /// Pop a value off the buffer.
129    ///
130    /// This method will block until the buffer is non-empty.  The waiting strategy is a simple
131    /// spin-wait and will repeatedly call `try_pop()` until a value is available.  If you do not
132    /// want a spin-wait burning CPU, you should call `try_pop()` directly and implement a different
133    /// waiting strategy.
134    ///
135    /// # Examples
136    ///
137    /// ```
138    /// // Block until a value is ready
139    /// let t = buffer.pop();
140    /// ```
141    pub fn pop(&self) -> T {
142        loop {
143            match self.try_pop() {
144                None => {}
145                Some(v) => return v,
146            }
147        }
148    }
149
150    /// Attempt to push a value onto the buffer.
151    ///
152    /// If the buffer is full, this method will not block.  Instead, it will return `Some(v)`, where
153    /// `v` was the value attempting to be pushed onto the buffer.  If the value was successfully
154    /// pushed onto the buffer, `None` will be returned signifying success.
155    ///
156    /// # Examples
157    ///
158    /// ```
159    /// // Attempt to push a value onto the buffer
160    /// let t = buffer.try_push(123);
161    /// match t {
162    ///   Some(v) => {}, // Buffer was full, try again later
163    ///   None => {}     // Value was successfully pushed onto the buffer
164    /// }
165    /// ```
166    pub fn try_push(&self, v: T) -> Option<T> {
167        let current_tail = self.tail.load(Ordering::Relaxed);
168
169        if self.shadow_head.get() + self.capacity <= current_tail {
170            self.shadow_head.set(self.head.load(Ordering::Relaxed));
171            if self.shadow_head.get() + self.capacity <= current_tail {
172                return Some(v);
173            }
174        }
175
176        unsafe {
177            self.store(current_tail, v);
178        }
179        self.tail
180            .store(current_tail.wrapping_add(1), Ordering::Release);
181        None
182    }
183
184    /// Push a value onto the buffer.
185    ///
186    /// This method will block until the buffer is non-full.  The waiting strategy is a simple
187    /// spin-wait and will repeatedly call `try_push()` until the value can be added.  If you do not
188    /// want a spin-wait burning CPU, you should call `try_push()` directly and implement a different
189    /// waiting strategy.
190    ///
191    /// # Examples
192    ///
193    /// ```
194    /// // Block until we can push this value onto the buffer
195    /// buffer.try_push(123);
196    /// ```
197    pub fn push(&self, v: T) {
198        let mut t = v;
199        loop {
200            match self.try_push(t) {
201                Some(rv) => t = rv,
202                None => return,
203            }
204        }
205    }
206
207    /// Load a value out of the buffer
208    ///
209    /// # Safety
210    ///
211    /// This method assumes the caller has:
212    /// - Initialized a valid block of memory
213    /// - Specified an index position that contains valid data
214    ///
215    /// The caller can use either absolute or monotonically increasing index positions, since
216    /// buffer wrapping is handled inside the method.
217    #[inline]
218    unsafe fn load(&self, pos: usize) -> &T {
219        &*self.buffer
220            .offset((pos & (self.allocated_size - 1)) as isize)
221    }
222
223    /// Store a value in the buffer
224    ///
225    /// # Safety
226    ///
227    /// This method assumes the caller has:
228    /// - Initialized a valid block of memory
229    #[inline]
230    unsafe fn store(&self, pos: usize, v: T) {
231        let end = self.buffer
232            .offset((pos & (self.allocated_size - 1)) as isize);
233        ptr::write(&mut *end, v);
234    }
235}
236
237/// Handles deallocation of heap memory when the buffer is dropped
238impl<T> Drop for Buffer<T> {
239    fn drop(&mut self) {
240        // Pop the rest of the values off the queue.  By moving them into this scope,
241        // we implicitly call their destructor
242
243        // TODO this could be optimized to avoid the atomic operations / book-keeping...but
244        // since this is the destructor, there shouldn't be any contention... so meh?
245        while let Some(_) = self.try_pop() {}
246
247        unsafe {
248            let layout = Layout::from_size_align(
249                self.allocated_size * mem::size_of::<T>(),
250                mem::align_of::<T>(),
251            ).unwrap();
252            alloc::dealloc(self.buffer as *mut u8, layout);
253        }
254    }
255}
256
257/// Creates a new SPSC Queue, returning a Producer and Consumer handle
258///
259/// Capacity specifies the size of the bounded queue to create.  Actual memory usage
260/// will be `capacity.next_power_of_two() * size_of::<T>()`, since ringbuffers with
261/// power of two sizes are more efficient to operate on (can use a bitwise AND to index
262/// into the ring instead of a more expensive modulo operator).
263///
264/// # Examples
265///
266/// Here is a simple usage of make, using the queue within the same thread:
267///
268/// ```
269/// // Create a queue with capacity to hold 100 values
270/// let (p, c) = make(100);
271///
272/// // Push `123` onto the queue
273/// p.push(123);
274///
275/// // Pop the value back off
276/// let t = c.pop();
277/// assert!(t == 123);
278/// ```
279///
280/// Of course, a SPSC queue is really only useful if you plan to use it in a multi-threaded
281/// environment.  The Producer and Consumer can both be sent to a thread, providing a fast, bounded
282/// one-way communication channel between those threads:
283///
284/// ```
285/// use std::thread;
286///
287/// let (p, c) = make(500);
288///
289/// // Spawn a new thread and move the Producer into it
290/// thread::spawn(move|| {
291///   for i in 0..100000 {
292///     p.push(i as u32);
293///   }
294/// });
295///
296/// // Back in the first thread, start Pop'ing values off the queue
297/// for i in 0..100000 {
298///   let t = c.pop();
299///   assert!(t == i);
300/// }
301///
302/// ```
303///
304/// # Panics
305///
306/// If the requested queue size is larger than available memory (e.g.
307/// `capacity.next_power_of_two() * size_of::<T>() > available memory` ), this function will abort
308/// with an OOM panic.
309pub fn make<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
310    let ptr = unsafe { allocate_buffer(capacity) };
311
312    let arc = Arc::new(Buffer {
313        buffer: ptr,
314        capacity,
315        allocated_size: capacity.next_power_of_two(),
316        _padding1: [0; cacheline_pad!(3)],
317
318        head: AtomicUsize::new(0),
319        shadow_tail: Cell::new(0),
320        _padding2: [0; cacheline_pad!(2)],
321
322        tail: AtomicUsize::new(0),
323        shadow_head: Cell::new(0),
324        _padding3: [0; cacheline_pad!(2)],
325    });
326
327    (
328        Producer {
329            buffer: arc.clone(),
330        },
331        Consumer {
332            buffer: arc.clone(),
333        },
334    )
335}
336
337/// Allocates a memory buffer on the heap and returns a pointer to it
338unsafe fn allocate_buffer<T>(capacity: usize) -> *mut T {
339    let adjusted_size = capacity.next_power_of_two();
340    let size = adjusted_size
341        .checked_mul(mem::size_of::<T>())
342        .expect("capacity overflow");
343
344    let layout = Layout::from_size_align(size, mem::align_of::<T>()).unwrap();
345    let ptr = alloc::alloc(layout);
346    if ptr.is_null() {
347        alloc::handle_alloc_error(layout)
348    } else {
349        ptr as *mut T
350    }
351}
352
353impl<T> Producer<T> {
354    /// Push a value onto the buffer.
355    ///
356    /// If the buffer is non-full, the operation will execute immediately.  If the buffer is full,
357    /// this method will block until the buffer is non-full.  The waiting strategy is a simple
358    /// spin-wait. If you do not want a spin-wait burning CPU, you should call `try_push()`
359    /// directly and implement a different waiting strategy.
360    ///
361    /// # Examples
362    ///
363    /// ```
364    /// let (producer, _) = make(100);
365    ///
366    /// // Block until we can push this value onto the queue
367    /// producer.push(123);
368    /// ```
369    pub fn push(&self, v: T) {
370        (*self.buffer).push(v);
371    }
372
373    /// Attempt to push a value onto the buffer.
374    ///
375    /// This method does not block.  If the queue is not full, the value will be added to the
376    /// queue and the method will return `None`, signifying success.  If the queue is full,
377    /// this method will return `Some(v)``, where `v` is your original value.
378    ///
379    /// # Examples
380    ///
381    /// ```
382    /// let (producer, _) = make(100);
383    ///
384    /// // Attempt to add this value to the queue
385    /// match producer.try push(123) {
386    ///     Some(v) => {}, // Queue full, try again later
387    ///     None => {}     // Value added to queue
388    /// }
389    /// ```
390    pub fn try_push(&self, v: T) -> Option<T> {
391        (*self.buffer).try_push(v)
392    }
393
394    /// Returns the total capacity of this queue
395    ///
396    /// This value represents the total capacity of the queue when it is full.  It does not
397    /// represent the current usage.  For that, call `size()`.
398    ///
399    /// # Examples
400    ///
401    /// ```
402    /// let (producer, _) = make(100);
403    ///
404    /// assert!(producer.capacity() == 100);
405    /// producer.push(123);
406    /// assert!(producer.capacity() == 100);
407    /// ```
408    pub fn capacity(&self) -> usize {
409        (*self.buffer).capacity
410    }
411
412    /// Returns the current size of the queue
413    ///
414    /// This value represents the current size of the queue.  This value can be from 0-`capacity`
415    /// inclusive.
416    ///
417    /// # Examples
418    ///
419    /// ```
420    /// let (producer, _) = make(100);
421    ///
422    /// assert!(producer.size() == 0);
423    /// producer.push(123);
424    /// assert!(producer.size() == 1);
425    /// ```
426    pub fn size(&self) -> usize {
427        (*self.buffer).tail.load(Ordering::Acquire) - (*self.buffer).head.load(Ordering::Acquire)
428    }
429
430    /// Returns the available space in the queue
431    ///
432    /// This value represents the number of items that can be pushed onto the queue before it
433    /// becomes full.
434    ///
435    /// # Examples
436    ///
437    /// ```
438    /// let (producer, _) = make(100);
439    ///
440    /// assert!(producer.free_space() == 100);
441    /// producer.push(123);
442    /// assert!(producer.free_space() == 99);
443    /// ```
444    pub fn free_space(&self) -> usize {
445        self.capacity() - self.size()
446    }
447}
448
449impl<T> Consumer<T> {
450    /// Pop a value off the queue.
451    ///
452    /// If the buffer contains values, this method will execute immediately and return a value.
453    /// If the buffer is empty, this method will block until a value becomes available.  The
454    /// waiting strategy is a simple spin-wait. If you do not want a spin-wait burning CPU, you
455    /// should call `try_push()` directly and implement a different waiting strategy.
456    ///
457    /// # Examples
458    ///
459    /// ```
460    /// let (_, consumer) = make(100);
461    ///
462    /// // Block until a value becomes available
463    /// let t = consumer.pop();
464    /// ```
465    pub fn pop(&self) -> T {
466        (*self.buffer).pop()
467    }
468
469    /// Attempt to pop a value off the queue.
470    ///
471    /// This method does not block.  If the queue is empty, the method will return `None`.  If
472    /// there is a value available, the method will return `Some(v)`, where `v` is the value
473    /// being popped off the queue.
474    ///
475    /// # Examples
476    ///
477    /// ```
478    /// use bounded_spsc_queue::*;
479    ///
480    /// let (_, consumer) = make(100);
481    ///
482    /// // Attempt to pop a value off the queue
483    /// let t = consumer.try_pop();
484    /// match t {
485    ///     Some(v) => {},      // Successfully popped a value
486    ///     None => {}          // Queue empty, try again later
487    /// }
488    /// ```
489    pub fn try_pop(&self) -> Option<T> {
490        (*self.buffer).try_pop()
491    }
492
493    /// Attempts to pop (and discard) at most `n` values off the buffer.
494    ///
495    /// Returns the amount of values successfully skipped.
496    ///
497    /// # Safety
498    ///
499    /// *WARNING:* This will leak at most `n` values from the buffer, i.e. the destructors of the
500    /// objects skipped over will not be called. This function is intended to be used on buffers that
501    /// contain non-`Drop` data, such as a `Buffer<f32>`.
502    ///
503    /// # Examples
504    ///
505    /// ```
506    /// use bounded_spsc_queue::*;
507    ///
508    /// let (_, consumer) = make(100);
509    ///
510    /// let mut read_position = 0; // current buffer index
511    /// read_position += consumer.skip_n(512); // try to skip at most 512 elements
512    /// ```
513    pub fn skip_n(&self, n: usize) -> usize {
514        (*self.buffer).skip_n(n)
515    }
516    /// Returns the total capacity of this queue
517    ///
518    /// This value represents the total capacity of the queue when it is full.  It does not
519    /// represent the current usage.  For that, call `size()`.
520    ///
521    /// # Examples
522    ///
523    /// ```
524    /// let (_, consumer) = make(100);
525    ///
526    /// assert!(consumer.capacity() == 100);
527    /// let t = consumer.pop();
528    /// assert!(producer.capacity() == 100);
529    /// ```
530    pub fn capacity(&self) -> usize {
531        (*self.buffer).capacity
532    }
533
534    /// Returns the current size of the queue
535    ///
536    /// This value represents the current size of the queue.  This value can be from 0-`capacity`
537    /// inclusive.
538    ///
539    /// # Examples
540    ///
541    /// ```
542    /// let (_, consumer) = make(100);
543    ///
544    /// //... producer pushes somewhere ...
545    ///
546    /// assert!(consumer.size() == 10);
547    /// consumer.pop();
548    /// assert!(producer.size() == 9);
549    /// ```
550    pub fn size(&self) -> usize {
551        (*self.buffer).tail.load(Ordering::Acquire) - (*self.buffer).head.load(Ordering::Acquire)
552    }
553}
554
555#[cfg(test)]
556mod tests {
557
558    use super::*;
559    use std::thread;
560
561    #[test]
562    fn test_buffer_size() {
563        assert_eq!(::std::mem::size_of::<Buffer<()>>(), 3 * CACHELINE_LEN);
564    }
565
566    #[test]
567    fn test_producer_push() {
568        let (p, _) = super::make(10);
569
570        for i in 0..9 {
571            p.push(i);
572            assert!(p.capacity() == 10);
573            assert!(p.size() == i + 1);
574        }
575    }
576
577    #[test]
578    fn test_consumer_pop() {
579        let (p, c) = super::make(10);
580
581        for i in 0..9 {
582            p.push(i);
583            assert!(p.capacity() == 10);
584            assert!(p.size() == i + 1);
585        }
586
587        for i in 0..9 {
588            assert!(c.size() == 9 - i);
589            let t = c.pop();
590            assert!(c.capacity() == 10);
591            assert!(c.size() == 9 - i - 1);
592            assert!(t == i);
593        }
594    }
595
596    #[test]
597    fn test_consumer_skip() {
598        let (p, c) = super::make(10);
599
600        for i in 0..9 {
601            p.push(i);
602            assert!(p.capacity() == 10);
603            assert!(p.size() == i + 1);
604        }
605        assert!(c.size() == 9);
606        assert!(c.skip_n(5) == 5);
607        assert!(c.size() == 4);
608        for i in 0..4 {
609            assert!(c.size() == 4 - i);
610            let t = c.pop();
611            assert!(c.capacity() == 10);
612            assert!(c.size() == 4 - i - 1);
613            assert!(t == i + 5);
614        }
615        assert!(c.size() == 0);
616        assert!(c.skip_n(5) == 0);
617    }
618
619    #[test]
620    fn test_consumer_skip_whole_buf() {
621        let (p, c) = super::make(9);
622
623        for i in 0..9 {
624            p.push(i);
625            assert!(p.capacity() == 9);
626            assert!(p.size() == i + 1);
627        }
628        assert!(c.size() == 9);
629        assert!(c.skip_n(9) == 9);
630        assert!(c.size() == 0);
631    }
632
633    #[test]
634    fn test_try_push() {
635        let (p, _) = super::make(10);
636
637        for i in 0..10 {
638            p.push(i);
639            assert!(p.capacity() == 10);
640            assert!(p.size() == i + 1);
641        }
642
643        match p.try_push(10) {
644            Some(v) => {
645                assert!(v == 10);
646            }
647            None => assert!(false, "Queue should not have accepted another write!"),
648        }
649    }
650
651    #[test]
652    fn test_try_poll() {
653        let (p, c) = super::make(10);
654
655        match c.try_pop() {
656            Some(_) => assert!(false, "Queue was empty but a value was read!"),
657            None => {}
658        }
659
660        p.push(123);
661
662        match c.try_pop() {
663            Some(v) => assert!(v == 123),
664            None => assert!(false, "Queue was not empty but poll() returned nothing!"),
665        }
666
667        match c.try_pop() {
668            Some(_) => assert!(false, "Queue was empty but a value was read!"),
669            None => {}
670        }
671    }
672
673    #[test]
674    fn test_threaded() {
675        let (p, c) = super::make(500);
676
677        thread::spawn(move || {
678            for i in 0..100000 {
679                p.push(i);
680            }
681        });
682
683        for i in 0..100000 {
684            let t = c.pop();
685            assert!(t == i);
686        }
687    }
688
689    extern crate time;
690    use self::time::PreciseTime;
691    use std::sync::mpsc::sync_channel;
692
693    #[test]
694    #[ignore]
695    fn bench_spsc_throughput() {
696        let iterations: i64 = 2i64.pow(14);
697
698        let (p, c) = make(iterations as usize);
699
700        let start = PreciseTime::now();
701        for i in 0..iterations as usize {
702            p.push(i);
703        }
704        let t = c.pop();
705        assert!(t == 0);
706        let end = PreciseTime::now();
707        let throughput =
708            (iterations as f64 / (start.to(end)).num_nanoseconds().unwrap() as f64) * 1000000000f64;
709        println!(
710            "Spsc Throughput: {}/s -- (iterations: {} in {} ns)",
711            throughput,
712            iterations,
713            (start.to(end)).num_nanoseconds().unwrap()
714        );
715    }
716
717    #[test]
718    #[ignore]
719    fn bench_chan_throughput() {
720        let iterations: i64 = 2i64.pow(14);
721
722        let (tx, rx) = sync_channel(iterations as usize);
723
724        let start = PreciseTime::now();
725        for i in 0..iterations as usize {
726            tx.send(i).unwrap();
727        }
728        let t = rx.recv().unwrap();
729        assert!(t == 0);
730        let end = PreciseTime::now();
731        let throughput =
732            (iterations as f64 / (start.to(end)).num_nanoseconds().unwrap() as f64) * 1000000000f64;
733        println!(
734            "Chan Throughput: {}/s -- (iterations: {} in {} ns)",
735            throughput,
736            iterations,
737            (start.to(end)).num_nanoseconds().unwrap()
738        );
739    }
740
741}