bounded_spsc_queue/lib.rs
1extern crate core;
2
3use core::alloc::Layout;
4use core::{mem, ptr};
5use std::alloc;
6use std::cell::Cell;
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::usize;
10
11const CACHELINE_LEN: usize = 64;
12
13macro_rules! cacheline_pad {
14 ($N:expr) => {
15 CACHELINE_LEN / std::mem::size_of::<usize>() - $N
16 };
17}
18
19/// The internal memory buffer used by the queue.
20///
21/// Buffer holds a pointer to allocated memory which represents the bounded
22/// ring buffer, as well as a head and tail atomicUsize which the producer and consumer
23/// use to track location in the ring.
24#[repr(C)]
25pub struct Buffer<T> {
26 /// A pointer to the allocated ring buffer
27 buffer: *mut T,
28
29 /// The bounded size as specified by the user. If the queue reaches capacity, it will block
30 /// until values are poppped off.
31 capacity: usize,
32
33 /// The allocated size of the ring buffer, in terms of number of values (not physical memory).
34 /// This will be the next power of two larger than `capacity`
35 allocated_size: usize,
36 _padding1: [usize; cacheline_pad!(3)],
37
38 /// Consumer cacheline:
39
40 /// Index position of the current head
41 head: AtomicUsize,
42 shadow_tail: Cell<usize>,
43 _padding2: [usize; cacheline_pad!(2)],
44
45 /// Producer cacheline:
46
47 /// Index position of current tail
48 tail: AtomicUsize,
49 shadow_head: Cell<usize>,
50 _padding3: [usize; cacheline_pad!(2)],
51}
52
53unsafe impl<T: Sync> Sync for Buffer<T> {}
54
55/// A handle to the queue which allows consuming values from the buffer
56pub struct Consumer<T> {
57 buffer: Arc<Buffer<T>>,
58}
59
60/// A handle to the queue which allows adding values onto the buffer
61pub struct Producer<T> {
62 buffer: Arc<Buffer<T>>,
63}
64
65unsafe impl<T: Send> Send for Consumer<T> {}
66unsafe impl<T: Send> Send for Producer<T> {}
67
68impl<T> Buffer<T> {
69 /// Attempt to pop a value off the buffer.
70 ///
71 /// If the buffer is empty, this method will not block. Instead, it will return `None`
72 /// signifying the buffer was empty. The caller may then decide what to do next (e.g. spin-wait,
73 /// sleep, process something else, etc)
74 ///
75 /// # Examples
76 ///
77 /// ```
78 /// // Attempt to pop off a value
79 /// let t = buffer.try_pop();
80 /// match t {
81 /// Some(v) => {}, // Got a value
82 /// None => {} // Buffer empty, try again later
83 /// }
84 /// ```
85 pub fn try_pop(&self) -> Option<T> {
86 let current_head = self.head.load(Ordering::Relaxed);
87
88 if current_head == self.shadow_tail.get() {
89 self.shadow_tail.set(self.tail.load(Ordering::Acquire));
90 if current_head == self.shadow_tail.get() {
91 return None;
92 }
93 }
94
95 let v = unsafe { ptr::read(self.load(current_head)) };
96 self.head
97 .store(current_head.wrapping_add(1), Ordering::Release);
98 Some(v)
99 }
100
101 /// Attempts to pop (and discard) at most `n` values off the buffer.
102 ///
103 /// Returns the amount of values successfully skipped.
104 ///
105 /// # Safety
106 ///
107 /// *WARNING:* This will leak at most `n` values from the buffer, i.e. the destructors of the
108 /// objects skipped over will not be called. This function is intended to be used on buffers that
109 /// contain non-`Drop` data, such as a `Buffer<f32>`.
110 pub fn skip_n(&self, n: usize) -> usize {
111 let current_head = self.head.load(Ordering::Relaxed);
112
113 self.shadow_tail.set(self.tail.load(Ordering::Acquire));
114 if current_head == self.shadow_tail.get() {
115 return 0;
116 }
117 let mut diff = self.shadow_tail.get().wrapping_sub(current_head);
118 if diff > n {
119 diff = n
120 }
121 self.head
122 .store(current_head.wrapping_add(diff), Ordering::Release);
123 diff
124 }
125
126 /// Pop a value off the buffer.
127 ///
128 /// This method will block until the buffer is non-empty. The waiting strategy is a simple
129 /// spin-wait and will repeatedly call `try_pop()` until a value is available. If you do not
130 /// want a spin-wait burning CPU, you should call `try_pop()` directly and implement a different
131 /// waiting strategy.
132 ///
133 /// # Examples
134 ///
135 /// ```
136 /// // Block until a value is ready
137 /// let t = buffer.pop();
138 /// ```
139 pub fn pop(&self) -> T {
140 loop {
141 match self.try_pop() {
142 None => {}
143 Some(v) => return v,
144 }
145 }
146 }
147
148 /// Attempt to push a value onto the buffer.
149 ///
150 /// If the buffer is full, this method will not block. Instead, it will return `Some(v)`, where
151 /// `v` was the value attempting to be pushed onto the buffer. If the value was successfully
152 /// pushed onto the buffer, `None` will be returned signifying success.
153 ///
154 /// # Examples
155 ///
156 /// ```
157 /// // Attempt to push a value onto the buffer
158 /// let t = buffer.try_push(123);
159 /// match t {
160 /// Some(v) => {}, // Buffer was full, try again later
161 /// None => {} // Value was successfully pushed onto the buffer
162 /// }
163 /// ```
164 pub fn try_push(&self, v: T) -> Option<T> {
165 let current_tail = self.tail.load(Ordering::Relaxed);
166
167 if self.shadow_head.get() + self.capacity <= current_tail {
168 self.shadow_head.set(self.head.load(Ordering::Relaxed));
169 if self.shadow_head.get() + self.capacity <= current_tail {
170 return Some(v);
171 }
172 }
173
174 unsafe {
175 self.store(current_tail, v);
176 }
177 self.tail
178 .store(current_tail.wrapping_add(1), Ordering::Release);
179 None
180 }
181
182 /// Push a value onto the buffer.
183 ///
184 /// This method will block until the buffer is non-full. The waiting strategy is a simple
185 /// spin-wait and will repeatedly call `try_push()` until the value can be added. If you do not
186 /// want a spin-wait burning CPU, you should call `try_push()` directly and implement a different
187 /// waiting strategy.
188 ///
189 /// # Examples
190 ///
191 /// ```
192 /// // Block until we can push this value onto the buffer
193 /// buffer.try_push(123);
194 /// ```
195 pub fn push(&self, v: T) {
196 let mut t = v;
197 loop {
198 match self.try_push(t) {
199 Some(rv) => t = rv,
200 None => return,
201 }
202 }
203 }
204
205 /// Load a value out of the buffer
206 ///
207 /// # Safety
208 ///
209 /// This method assumes the caller has:
210 /// - Initialized a valid block of memory
211 /// - Specified an index position that contains valid data
212 ///
213 /// The caller can use either absolute or monotonically increasing index positions, since
214 /// buffer wrapping is handled inside the method.
215 #[inline]
216 unsafe fn load(&self, pos: usize) -> &T {
217 &*self.buffer
218 .offset((pos & (self.allocated_size - 1)) as isize)
219 }
220
221 /// Store a value in the buffer
222 ///
223 /// # Safety
224 ///
225 /// This method assumes the caller has:
226 /// - Initialized a valid block of memory
227 #[inline]
228 unsafe fn store(&self, pos: usize, v: T) {
229 let end = self.buffer
230 .offset((pos & (self.allocated_size - 1)) as isize);
231 ptr::write(&mut *end, v);
232 }
233}
234
235/// Handles deallocation of heap memory when the buffer is dropped
236impl<T> Drop for Buffer<T> {
237 fn drop(&mut self) {
238 // Pop the rest of the values off the queue. By moving them into this scope,
239 // we implicitly call their destructor
240
241 // TODO this could be optimized to avoid the atomic operations / book-keeping...but
242 // since this is the destructor, there shouldn't be any contention... so meh?
243 while let Some(_) = self.try_pop() {}
244
245 if mem::size_of::<T>() > 0 {
246 unsafe {
247 let layout = Layout::from_size_align(
248 self.allocated_size * mem::size_of::<T>(),
249 mem::align_of::<T>(),
250 ).unwrap();
251 alloc::dealloc(self.buffer as *mut u8, layout);
252 }
253 }
254 }
255}
256
257/// Creates a new SPSC Queue, returning a Producer and Consumer handle
258///
259/// Capacity specifies the size of the bounded queue to create. Actual memory usage
260/// will be `capacity.next_power_of_two() * size_of::<T>()`, since ringbuffers with
261/// power of two sizes are more efficient to operate on (can use a bitwise AND to index
262/// into the ring instead of a more expensive modulo operator).
263///
264/// # Examples
265///
266/// Here is a simple usage of make, using the queue within the same thread:
267///
268/// ```
269/// // Create a queue with capacity to hold 100 values
270/// let (p, c) = make(100);
271///
272/// // Push `123` onto the queue
273/// p.push(123);
274///
275/// // Pop the value back off
276/// let t = c.pop();
277/// assert!(t == 123);
278/// ```
279///
280/// Of course, a SPSC queue is really only useful if you plan to use it in a multi-threaded
281/// environment. The Producer and Consumer can both be sent to a thread, providing a fast, bounded
282/// one-way communication channel between those threads:
283///
284/// ```
285/// use std::thread;
286///
287/// let (p, c) = make(500);
288///
289/// // Spawn a new thread and move the Producer into it
290/// thread::spawn(move|| {
291/// for i in 0..100000 {
292/// p.push(i as u32);
293/// }
294/// });
295///
296/// // Back in the first thread, start Pop'ing values off the queue
297/// for i in 0..100000 {
298/// let t = c.pop();
299/// assert!(t == i);
300/// }
301///
302/// ```
303///
304/// # Panics
305///
306/// If the requested queue size is larger than available memory (e.g.
307/// `capacity.next_power_of_two() * size_of::<T>() > available memory` ), this function will abort
308/// with an OOM panic.
309pub fn make<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
310 let ptr = unsafe { allocate_buffer(capacity) };
311
312 let arc = Arc::new(Buffer {
313 buffer: ptr,
314 capacity,
315 allocated_size: capacity.next_power_of_two(),
316 _padding1: [0; cacheline_pad!(3)],
317
318 head: AtomicUsize::new(0),
319 shadow_tail: Cell::new(0),
320 _padding2: [0; cacheline_pad!(2)],
321
322 tail: AtomicUsize::new(0),
323 shadow_head: Cell::new(0),
324 _padding3: [0; cacheline_pad!(2)],
325 });
326
327 (
328 Producer {
329 buffer: arc.clone(),
330 },
331 Consumer {
332 buffer: arc.clone(),
333 },
334 )
335}
336
337/// Allocates a memory buffer on the heap and returns a pointer to it
338unsafe fn allocate_buffer<T>(capacity: usize) -> *mut T {
339 let adjusted_size = capacity.next_power_of_two();
340 let size = adjusted_size
341 .checked_mul(mem::size_of::<T>())
342 .expect("capacity overflow");
343
344 let layout = Layout::from_size_align(size, mem::align_of::<T>()).unwrap();
345
346 let ptr = if size > 0 {
347 alloc::alloc(layout) as *mut T
348 } else {
349 mem::align_of::<T>() as *mut T
350 };
351
352 if ptr.is_null() {
353 alloc::handle_alloc_error(layout)
354 } else {
355 ptr
356 }
357}
358
359impl<T> Producer<T> {
360 /// Push a value onto the buffer.
361 ///
362 /// If the buffer is non-full, the operation will execute immediately. If the buffer is full,
363 /// this method will block until the buffer is non-full. The waiting strategy is a simple
364 /// spin-wait. If you do not want a spin-wait burning CPU, you should call `try_push()`
365 /// directly and implement a different waiting strategy.
366 ///
367 /// # Examples
368 ///
369 /// ```
370 /// let (producer, _) = make(100);
371 ///
372 /// // Block until we can push this value onto the queue
373 /// producer.push(123);
374 /// ```
375 pub fn push(&self, v: T) {
376 (*self.buffer).push(v);
377 }
378
379 /// Attempt to push a value onto the buffer.
380 ///
381 /// This method does not block. If the queue is not full, the value will be added to the
382 /// queue and the method will return `None`, signifying success. If the queue is full,
383 /// this method will return `Some(v)``, where `v` is your original value.
384 ///
385 /// # Examples
386 ///
387 /// ```
388 /// let (producer, _) = make(100);
389 ///
390 /// // Attempt to add this value to the queue
391 /// match producer.try push(123) {
392 /// Some(v) => {}, // Queue full, try again later
393 /// None => {} // Value added to queue
394 /// }
395 /// ```
396 pub fn try_push(&self, v: T) -> Option<T> {
397 (*self.buffer).try_push(v)
398 }
399
400 /// Returns the total capacity of this queue
401 ///
402 /// This value represents the total capacity of the queue when it is full. It does not
403 /// represent the current usage. For that, call `size()`.
404 ///
405 /// # Examples
406 ///
407 /// ```
408 /// let (producer, _) = make(100);
409 ///
410 /// assert!(producer.capacity() == 100);
411 /// producer.push(123);
412 /// assert!(producer.capacity() == 100);
413 /// ```
414 pub fn capacity(&self) -> usize {
415 (*self.buffer).capacity
416 }
417
418 /// Returns the current size of the queue
419 ///
420 /// This value represents the current size of the queue. This value can be from 0-`capacity`
421 /// inclusive.
422 ///
423 /// # Examples
424 ///
425 /// ```
426 /// let (producer, _) = make(100);
427 ///
428 /// assert!(producer.size() == 0);
429 /// producer.push(123);
430 /// assert!(producer.size() == 1);
431 /// ```
432 pub fn size(&self) -> usize {
433 (*self.buffer).tail.load(Ordering::Acquire) - (*self.buffer).head.load(Ordering::Acquire)
434 }
435
436 /// Returns the available space in the queue
437 ///
438 /// This value represents the number of items that can be pushed onto the queue before it
439 /// becomes full.
440 ///
441 /// # Examples
442 ///
443 /// ```
444 /// let (producer, _) = make(100);
445 ///
446 /// assert!(producer.free_space() == 100);
447 /// producer.push(123);
448 /// assert!(producer.free_space() == 99);
449 /// ```
450 pub fn free_space(&self) -> usize {
451 self.capacity() - self.size()
452 }
453}
454
455impl<T> Consumer<T> {
456 /// Pop a value off the queue.
457 ///
458 /// If the buffer contains values, this method will execute immediately and return a value.
459 /// If the buffer is empty, this method will block until a value becomes available. The
460 /// waiting strategy is a simple spin-wait. If you do not want a spin-wait burning CPU, you
461 /// should call `try_push()` directly and implement a different waiting strategy.
462 ///
463 /// # Examples
464 ///
465 /// ```
466 /// let (_, consumer) = make(100);
467 ///
468 /// // Block until a value becomes available
469 /// let t = consumer.pop();
470 /// ```
471 pub fn pop(&self) -> T {
472 (*self.buffer).pop()
473 }
474
475 /// Attempt to pop a value off the queue.
476 ///
477 /// This method does not block. If the queue is empty, the method will return `None`. If
478 /// there is a value available, the method will return `Some(v)`, where `v` is the value
479 /// being popped off the queue.
480 ///
481 /// # Examples
482 ///
483 /// ```
484 /// use bounded_spsc_queue::*;
485 ///
486 /// let (_, consumer) = make(100);
487 ///
488 /// // Attempt to pop a value off the queue
489 /// let t = consumer.try_pop();
490 /// match t {
491 /// Some(v) => {}, // Successfully popped a value
492 /// None => {} // Queue empty, try again later
493 /// }
494 /// ```
495 pub fn try_pop(&self) -> Option<T> {
496 (*self.buffer).try_pop()
497 }
498
499 /// Attempts to pop (and discard) at most `n` values off the buffer.
500 ///
501 /// Returns the amount of values successfully skipped.
502 ///
503 /// # Safety
504 ///
505 /// *WARNING:* This will leak at most `n` values from the buffer, i.e. the destructors of the
506 /// objects skipped over will not be called. This function is intended to be used on buffers that
507 /// contain non-`Drop` data, such as a `Buffer<f32>`.
508 ///
509 /// # Examples
510 ///
511 /// ```
512 /// use bounded_spsc_queue::*;
513 ///
514 /// let (_, consumer) = make(100);
515 ///
516 /// let mut read_position = 0; // current buffer index
517 /// read_position += consumer.skip_n(512); // try to skip at most 512 elements
518 /// ```
519 pub fn skip_n(&self, n: usize) -> usize {
520 (*self.buffer).skip_n(n)
521 }
522 /// Returns the total capacity of this queue
523 ///
524 /// This value represents the total capacity of the queue when it is full. It does not
525 /// represent the current usage. For that, call `size()`.
526 ///
527 /// # Examples
528 ///
529 /// ```
530 /// let (_, consumer) = make(100);
531 ///
532 /// assert!(consumer.capacity() == 100);
533 /// let t = consumer.pop();
534 /// assert!(producer.capacity() == 100);
535 /// ```
536 pub fn capacity(&self) -> usize {
537 (*self.buffer).capacity
538 }
539
540 /// Returns the current size of the queue
541 ///
542 /// This value represents the current size of the queue. This value can be from 0-`capacity`
543 /// inclusive.
544 ///
545 /// # Examples
546 ///
547 /// ```
548 /// let (_, consumer) = make(100);
549 ///
550 /// //... producer pushes somewhere ...
551 ///
552 /// assert!(consumer.size() == 10);
553 /// consumer.pop();
554 /// assert!(producer.size() == 9);
555 /// ```
556 pub fn size(&self) -> usize {
557 (*self.buffer).tail.load(Ordering::Acquire) - (*self.buffer).head.load(Ordering::Acquire)
558 }
559}
560
561#[cfg(test)]
562mod tests {
563
564 use super::*;
565 use std::thread;
566
567 #[test]
568 fn test_buffer_size() {
569 assert_eq!(::std::mem::size_of::<Buffer<()>>(), 3 * CACHELINE_LEN);
570 }
571
572 #[test]
573 fn test_producer_push() {
574 let (p, _) = super::make(10);
575
576 for i in 0..9 {
577 p.push(i);
578 assert!(p.capacity() == 10);
579 assert!(p.size() == i + 1);
580 }
581 }
582
583 #[test]
584 fn test_consumer_pop() {
585 let (p, c) = super::make(10);
586
587 for i in 0..9 {
588 p.push(i);
589 assert!(p.capacity() == 10);
590 assert!(p.size() == i + 1);
591 }
592
593 for i in 0..9 {
594 assert!(c.size() == 9 - i);
595 let t = c.pop();
596 assert!(c.capacity() == 10);
597 assert!(c.size() == 9 - i - 1);
598 assert!(t == i);
599 }
600 }
601
602 #[test]
603 fn test_consumer_skip() {
604 let (p, c) = super::make(10);
605
606 for i in 0..9 {
607 p.push(i);
608 assert!(p.capacity() == 10);
609 assert!(p.size() == i + 1);
610 }
611 assert!(c.size() == 9);
612 assert!(c.skip_n(5) == 5);
613 assert!(c.size() == 4);
614 for i in 0..4 {
615 assert!(c.size() == 4 - i);
616 let t = c.pop();
617 assert!(c.capacity() == 10);
618 assert!(c.size() == 4 - i - 1);
619 assert!(t == i + 5);
620 }
621 assert!(c.size() == 0);
622 assert!(c.skip_n(5) == 0);
623 }
624
625 #[test]
626 fn test_consumer_skip_whole_buf() {
627 let (p, c) = super::make(9);
628
629 for i in 0..9 {
630 p.push(i);
631 assert!(p.capacity() == 9);
632 assert!(p.size() == i + 1);
633 }
634 assert!(c.size() == 9);
635 assert!(c.skip_n(9) == 9);
636 assert!(c.size() == 0);
637 }
638
639 #[test]
640 fn test_try_push() {
641 let (p, _) = super::make(10);
642
643 for i in 0..10 {
644 p.push(i);
645 assert!(p.capacity() == 10);
646 assert!(p.size() == i + 1);
647 }
648
649 match p.try_push(10) {
650 Some(v) => {
651 assert!(v == 10);
652 }
653 None => assert!(false, "Queue should not have accepted another write!"),
654 }
655 }
656
657 #[test]
658 fn test_try_poll() {
659 let (p, c) = super::make(10);
660
661 match c.try_pop() {
662 Some(_) => assert!(false, "Queue was empty but a value was read!"),
663 None => {}
664 }
665
666 p.push(123);
667
668 match c.try_pop() {
669 Some(v) => assert!(v == 123),
670 None => assert!(false, "Queue was not empty but poll() returned nothing!"),
671 }
672
673 match c.try_pop() {
674 Some(_) => assert!(false, "Queue was empty but a value was read!"),
675 None => {}
676 }
677 }
678
679 #[test]
680 fn test_threaded() {
681 let (p, c) = super::make(500);
682
683 thread::spawn(move || {
684 for i in 0..100000 {
685 p.push(i);
686 }
687 });
688
689 for i in 0..100000 {
690 let t = c.pop();
691 assert!(t == i);
692 }
693 }
694
695 extern crate time;
696 use self::time::PreciseTime;
697 use std::sync::mpsc::sync_channel;
698
699 #[test]
700 #[ignore]
701 fn bench_spsc_throughput() {
702 let iterations: i64 = 2i64.pow(14);
703
704 let (p, c) = make(iterations as usize);
705
706 let start = PreciseTime::now();
707 for i in 0..iterations as usize {
708 p.push(i);
709 }
710 let t = c.pop();
711 assert!(t == 0);
712 let end = PreciseTime::now();
713 let throughput =
714 (iterations as f64 / (start.to(end)).num_nanoseconds().unwrap() as f64) * 1000000000f64;
715 println!(
716 "Spsc Throughput: {}/s -- (iterations: {} in {} ns)",
717 throughput,
718 iterations,
719 (start.to(end)).num_nanoseconds().unwrap()
720 );
721 }
722
723 #[test]
724 #[ignore]
725 fn bench_chan_throughput() {
726 let iterations: i64 = 2i64.pow(14);
727
728 let (tx, rx) = sync_channel(iterations as usize);
729
730 let start = PreciseTime::now();
731 for i in 0..iterations as usize {
732 tx.send(i).unwrap();
733 }
734 let t = rx.recv().unwrap();
735 assert!(t == 0);
736 let end = PreciseTime::now();
737 let throughput =
738 (iterations as f64 / (start.to(end)).num_nanoseconds().unwrap() as f64) * 1000000000f64;
739 println!(
740 "Chan Throughput: {}/s -- (iterations: {} in {} ns)",
741 throughput,
742 iterations,
743 (start.to(end)).num_nanoseconds().unwrap()
744 );
745 }
746
747}