bounded_spsc_queue/lib.rs
1#![feature(allocator_api)]
2
3extern crate core;
4
5use core::alloc::Layout;
6use core::{mem, ptr};
7use std::alloc;
8use std::cell::Cell;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::usize;
12
13const CACHELINE_LEN: usize = 64;
14
15macro_rules! cacheline_pad {
16 ($N:expr) => {
17 CACHELINE_LEN / std::mem::size_of::<usize>() - $N
18 };
19}
20
21/// The internal memory buffer used by the queue.
22///
23/// Buffer holds a pointer to allocated memory which represents the bounded
24/// ring buffer, as well as a head and tail atomicUsize which the producer and consumer
25/// use to track location in the ring.
26#[repr(C)]
27pub struct Buffer<T> {
28 /// A pointer to the allocated ring buffer
29 buffer: *mut T,
30
31 /// The bounded size as specified by the user. If the queue reaches capacity, it will block
32 /// until values are poppped off.
33 capacity: usize,
34
35 /// The allocated size of the ring buffer, in terms of number of values (not physical memory).
36 /// This will be the next power of two larger than `capacity`
37 allocated_size: usize,
38 _padding1: [usize; cacheline_pad!(3)],
39
40 /// Consumer cacheline:
41
42 /// Index position of the current head
43 head: AtomicUsize,
44 shadow_tail: Cell<usize>,
45 _padding2: [usize; cacheline_pad!(2)],
46
47 /// Producer cacheline:
48
49 /// Index position of current tail
50 tail: AtomicUsize,
51 shadow_head: Cell<usize>,
52 _padding3: [usize; cacheline_pad!(2)],
53}
54
55unsafe impl<T: Sync> Sync for Buffer<T> {}
56
57/// A handle to the queue which allows consuming values from the buffer
58pub struct Consumer<T> {
59 buffer: Arc<Buffer<T>>,
60}
61
62/// A handle to the queue which allows adding values onto the buffer
63pub struct Producer<T> {
64 buffer: Arc<Buffer<T>>,
65}
66
67unsafe impl<T: Send> Send for Consumer<T> {}
68unsafe impl<T: Send> Send for Producer<T> {}
69
70impl<T> Buffer<T> {
71 /// Attempt to pop a value off the buffer.
72 ///
73 /// If the buffer is empty, this method will not block. Instead, it will return `None`
74 /// signifying the buffer was empty. The caller may then decide what to do next (e.g. spin-wait,
75 /// sleep, process something else, etc)
76 ///
77 /// # Examples
78 ///
79 /// ```
80 /// // Attempt to pop off a value
81 /// let t = buffer.try_pop();
82 /// match t {
83 /// Some(v) => {}, // Got a value
84 /// None => {} // Buffer empty, try again later
85 /// }
86 /// ```
87 pub fn try_pop(&self) -> Option<T> {
88 let current_head = self.head.load(Ordering::Relaxed);
89
90 if current_head == self.shadow_tail.get() {
91 self.shadow_tail.set(self.tail.load(Ordering::Acquire));
92 if current_head == self.shadow_tail.get() {
93 return None;
94 }
95 }
96
97 let v = unsafe { ptr::read(self.load(current_head)) };
98 self.head
99 .store(current_head.wrapping_add(1), Ordering::Release);
100 Some(v)
101 }
102
103 /// Attempts to pop (and discard) at most `n` values off the buffer.
104 ///
105 /// Returns the amount of values successfully skipped.
106 ///
107 /// # Safety
108 ///
109 /// *WARNING:* This will leak at most `n` values from the buffer, i.e. the destructors of the
110 /// objects skipped over will not be called. This function is intended to be used on buffers that
111 /// contain non-`Drop` data, such as a `Buffer<f32>`.
112 pub fn skip_n(&self, n: usize) -> usize {
113 let current_head = self.head.load(Ordering::Relaxed);
114
115 self.shadow_tail.set(self.tail.load(Ordering::Acquire));
116 if current_head == self.shadow_tail.get() {
117 return 0;
118 }
119 let mut diff = self.shadow_tail.get().wrapping_sub(current_head);
120 if diff > n {
121 diff = n
122 }
123 self.head
124 .store(current_head.wrapping_add(diff), Ordering::Release);
125 diff
126 }
127
128 /// Pop a value off the buffer.
129 ///
130 /// This method will block until the buffer is non-empty. The waiting strategy is a simple
131 /// spin-wait and will repeatedly call `try_pop()` until a value is available. If you do not
132 /// want a spin-wait burning CPU, you should call `try_pop()` directly and implement a different
133 /// waiting strategy.
134 ///
135 /// # Examples
136 ///
137 /// ```
138 /// // Block until a value is ready
139 /// let t = buffer.pop();
140 /// ```
141 pub fn pop(&self) -> T {
142 loop {
143 match self.try_pop() {
144 None => {}
145 Some(v) => return v,
146 }
147 }
148 }
149
150 /// Attempt to push a value onto the buffer.
151 ///
152 /// If the buffer is full, this method will not block. Instead, it will return `Some(v)`, where
153 /// `v` was the value attempting to be pushed onto the buffer. If the value was successfully
154 /// pushed onto the buffer, `None` will be returned signifying success.
155 ///
156 /// # Examples
157 ///
158 /// ```
159 /// // Attempt to push a value onto the buffer
160 /// let t = buffer.try_push(123);
161 /// match t {
162 /// Some(v) => {}, // Buffer was full, try again later
163 /// None => {} // Value was successfully pushed onto the buffer
164 /// }
165 /// ```
166 pub fn try_push(&self, v: T) -> Option<T> {
167 let current_tail = self.tail.load(Ordering::Relaxed);
168
169 if self.shadow_head.get() + self.capacity <= current_tail {
170 self.shadow_head.set(self.head.load(Ordering::Relaxed));
171 if self.shadow_head.get() + self.capacity <= current_tail {
172 return Some(v);
173 }
174 }
175
176 unsafe {
177 self.store(current_tail, v);
178 }
179 self.tail
180 .store(current_tail.wrapping_add(1), Ordering::Release);
181 None
182 }
183
184 /// Push a value onto the buffer.
185 ///
186 /// This method will block until the buffer is non-full. The waiting strategy is a simple
187 /// spin-wait and will repeatedly call `try_push()` until the value can be added. If you do not
188 /// want a spin-wait burning CPU, you should call `try_push()` directly and implement a different
189 /// waiting strategy.
190 ///
191 /// # Examples
192 ///
193 /// ```
194 /// // Block until we can push this value onto the buffer
195 /// buffer.try_push(123);
196 /// ```
197 pub fn push(&self, v: T) {
198 let mut t = v;
199 loop {
200 match self.try_push(t) {
201 Some(rv) => t = rv,
202 None => return,
203 }
204 }
205 }
206
207 /// Load a value out of the buffer
208 ///
209 /// # Safety
210 ///
211 /// This method assumes the caller has:
212 /// - Initialized a valid block of memory
213 /// - Specified an index position that contains valid data
214 ///
215 /// The caller can use either absolute or monotonically increasing index positions, since
216 /// buffer wrapping is handled inside the method.
217 #[inline]
218 unsafe fn load(&self, pos: usize) -> &T {
219 &*self.buffer
220 .offset((pos & (self.allocated_size - 1)) as isize)
221 }
222
223 /// Store a value in the buffer
224 ///
225 /// # Safety
226 ///
227 /// This method assumes the caller has:
228 /// - Initialized a valid block of memory
229 #[inline]
230 unsafe fn store(&self, pos: usize, v: T) {
231 let end = self.buffer
232 .offset((pos & (self.allocated_size - 1)) as isize);
233 ptr::write(&mut *end, v);
234 }
235}
236
237/// Handles deallocation of heap memory when the buffer is dropped
238impl<T> Drop for Buffer<T> {
239 fn drop(&mut self) {
240 // Pop the rest of the values off the queue. By moving them into this scope,
241 // we implicitly call their destructor
242
243 // TODO this could be optimized to avoid the atomic operations / book-keeping...but
244 // since this is the destructor, there shouldn't be any contention... so meh?
245 while let Some(_) = self.try_pop() {}
246
247 unsafe {
248 let layout = Layout::from_size_align(
249 self.allocated_size * mem::size_of::<T>(),
250 mem::align_of::<T>(),
251 ).unwrap();
252 alloc::dealloc(self.buffer as *mut u8, layout);
253 }
254 }
255}
256
257/// Creates a new SPSC Queue, returning a Producer and Consumer handle
258///
259/// Capacity specifies the size of the bounded queue to create. Actual memory usage
260/// will be `capacity.next_power_of_two() * size_of::<T>()`, since ringbuffers with
261/// power of two sizes are more efficient to operate on (can use a bitwise AND to index
262/// into the ring instead of a more expensive modulo operator).
263///
264/// # Examples
265///
266/// Here is a simple usage of make, using the queue within the same thread:
267///
268/// ```
269/// // Create a queue with capacity to hold 100 values
270/// let (p, c) = make(100);
271///
272/// // Push `123` onto the queue
273/// p.push(123);
274///
275/// // Pop the value back off
276/// let t = c.pop();
277/// assert!(t == 123);
278/// ```
279///
280/// Of course, a SPSC queue is really only useful if you plan to use it in a multi-threaded
281/// environment. The Producer and Consumer can both be sent to a thread, providing a fast, bounded
282/// one-way communication channel between those threads:
283///
284/// ```
285/// use std::thread;
286///
287/// let (p, c) = make(500);
288///
289/// // Spawn a new thread and move the Producer into it
290/// thread::spawn(move|| {
291/// for i in 0..100000 {
292/// p.push(i as u32);
293/// }
294/// });
295///
296/// // Back in the first thread, start Pop'ing values off the queue
297/// for i in 0..100000 {
298/// let t = c.pop();
299/// assert!(t == i);
300/// }
301///
302/// ```
303///
304/// # Panics
305///
306/// If the requested queue size is larger than available memory (e.g.
307/// `capacity.next_power_of_two() * size_of::<T>() > available memory` ), this function will abort
308/// with an OOM panic.
309pub fn make<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
310 let ptr = unsafe { allocate_buffer(capacity) };
311
312 let arc = Arc::new(Buffer {
313 buffer: ptr,
314 capacity,
315 allocated_size: capacity.next_power_of_two(),
316 _padding1: [0; cacheline_pad!(3)],
317
318 head: AtomicUsize::new(0),
319 shadow_tail: Cell::new(0),
320 _padding2: [0; cacheline_pad!(2)],
321
322 tail: AtomicUsize::new(0),
323 shadow_head: Cell::new(0),
324 _padding3: [0; cacheline_pad!(2)],
325 });
326
327 (
328 Producer {
329 buffer: arc.clone(),
330 },
331 Consumer {
332 buffer: arc.clone(),
333 },
334 )
335}
336
337/// Allocates a memory buffer on the heap and returns a pointer to it
338unsafe fn allocate_buffer<T>(capacity: usize) -> *mut T {
339 let adjusted_size = capacity.next_power_of_two();
340 let size = adjusted_size
341 .checked_mul(mem::size_of::<T>())
342 .expect("capacity overflow");
343
344 let layout = Layout::from_size_align(size, mem::align_of::<T>()).unwrap();
345 let ptr = alloc::alloc(layout);
346 if ptr.is_null() {
347 alloc::handle_alloc_error(layout)
348 } else {
349 ptr as *mut T
350 }
351}
352
353impl<T> Producer<T> {
354 /// Push a value onto the buffer.
355 ///
356 /// If the buffer is non-full, the operation will execute immediately. If the buffer is full,
357 /// this method will block until the buffer is non-full. The waiting strategy is a simple
358 /// spin-wait. If you do not want a spin-wait burning CPU, you should call `try_push()`
359 /// directly and implement a different waiting strategy.
360 ///
361 /// # Examples
362 ///
363 /// ```
364 /// let (producer, _) = make(100);
365 ///
366 /// // Block until we can push this value onto the queue
367 /// producer.push(123);
368 /// ```
369 pub fn push(&self, v: T) {
370 (*self.buffer).push(v);
371 }
372
373 /// Attempt to push a value onto the buffer.
374 ///
375 /// This method does not block. If the queue is not full, the value will be added to the
376 /// queue and the method will return `None`, signifying success. If the queue is full,
377 /// this method will return `Some(v)``, where `v` is your original value.
378 ///
379 /// # Examples
380 ///
381 /// ```
382 /// let (producer, _) = make(100);
383 ///
384 /// // Attempt to add this value to the queue
385 /// match producer.try push(123) {
386 /// Some(v) => {}, // Queue full, try again later
387 /// None => {} // Value added to queue
388 /// }
389 /// ```
390 pub fn try_push(&self, v: T) -> Option<T> {
391 (*self.buffer).try_push(v)
392 }
393
394 /// Returns the total capacity of this queue
395 ///
396 /// This value represents the total capacity of the queue when it is full. It does not
397 /// represent the current usage. For that, call `size()`.
398 ///
399 /// # Examples
400 ///
401 /// ```
402 /// let (producer, _) = make(100);
403 ///
404 /// assert!(producer.capacity() == 100);
405 /// producer.push(123);
406 /// assert!(producer.capacity() == 100);
407 /// ```
408 pub fn capacity(&self) -> usize {
409 (*self.buffer).capacity
410 }
411
412 /// Returns the current size of the queue
413 ///
414 /// This value represents the current size of the queue. This value can be from 0-`capacity`
415 /// inclusive.
416 ///
417 /// # Examples
418 ///
419 /// ```
420 /// let (producer, _) = make(100);
421 ///
422 /// assert!(producer.size() == 0);
423 /// producer.push(123);
424 /// assert!(producer.size() == 1);
425 /// ```
426 pub fn size(&self) -> usize {
427 (*self.buffer).tail.load(Ordering::Acquire) - (*self.buffer).head.load(Ordering::Acquire)
428 }
429
430 /// Returns the available space in the queue
431 ///
432 /// This value represents the number of items that can be pushed onto the queue before it
433 /// becomes full.
434 ///
435 /// # Examples
436 ///
437 /// ```
438 /// let (producer, _) = make(100);
439 ///
440 /// assert!(producer.free_space() == 100);
441 /// producer.push(123);
442 /// assert!(producer.free_space() == 99);
443 /// ```
444 pub fn free_space(&self) -> usize {
445 self.capacity() - self.size()
446 }
447}
448
449impl<T> Consumer<T> {
450 /// Pop a value off the queue.
451 ///
452 /// If the buffer contains values, this method will execute immediately and return a value.
453 /// If the buffer is empty, this method will block until a value becomes available. The
454 /// waiting strategy is a simple spin-wait. If you do not want a spin-wait burning CPU, you
455 /// should call `try_push()` directly and implement a different waiting strategy.
456 ///
457 /// # Examples
458 ///
459 /// ```
460 /// let (_, consumer) = make(100);
461 ///
462 /// // Block until a value becomes available
463 /// let t = consumer.pop();
464 /// ```
465 pub fn pop(&self) -> T {
466 (*self.buffer).pop()
467 }
468
469 /// Attempt to pop a value off the queue.
470 ///
471 /// This method does not block. If the queue is empty, the method will return `None`. If
472 /// there is a value available, the method will return `Some(v)`, where `v` is the value
473 /// being popped off the queue.
474 ///
475 /// # Examples
476 ///
477 /// ```
478 /// use bounded_spsc_queue::*;
479 ///
480 /// let (_, consumer) = make(100);
481 ///
482 /// // Attempt to pop a value off the queue
483 /// let t = consumer.try_pop();
484 /// match t {
485 /// Some(v) => {}, // Successfully popped a value
486 /// None => {} // Queue empty, try again later
487 /// }
488 /// ```
489 pub fn try_pop(&self) -> Option<T> {
490 (*self.buffer).try_pop()
491 }
492
493 /// Attempts to pop (and discard) at most `n` values off the buffer.
494 ///
495 /// Returns the amount of values successfully skipped.
496 ///
497 /// # Safety
498 ///
499 /// *WARNING:* This will leak at most `n` values from the buffer, i.e. the destructors of the
500 /// objects skipped over will not be called. This function is intended to be used on buffers that
501 /// contain non-`Drop` data, such as a `Buffer<f32>`.
502 ///
503 /// # Examples
504 ///
505 /// ```
506 /// use bounded_spsc_queue::*;
507 ///
508 /// let (_, consumer) = make(100);
509 ///
510 /// let mut read_position = 0; // current buffer index
511 /// read_position += consumer.skip_n(512); // try to skip at most 512 elements
512 /// ```
513 pub fn skip_n(&self, n: usize) -> usize {
514 (*self.buffer).skip_n(n)
515 }
516 /// Returns the total capacity of this queue
517 ///
518 /// This value represents the total capacity of the queue when it is full. It does not
519 /// represent the current usage. For that, call `size()`.
520 ///
521 /// # Examples
522 ///
523 /// ```
524 /// let (_, consumer) = make(100);
525 ///
526 /// assert!(consumer.capacity() == 100);
527 /// let t = consumer.pop();
528 /// assert!(producer.capacity() == 100);
529 /// ```
530 pub fn capacity(&self) -> usize {
531 (*self.buffer).capacity
532 }
533
534 /// Returns the current size of the queue
535 ///
536 /// This value represents the current size of the queue. This value can be from 0-`capacity`
537 /// inclusive.
538 ///
539 /// # Examples
540 ///
541 /// ```
542 /// let (_, consumer) = make(100);
543 ///
544 /// //... producer pushes somewhere ...
545 ///
546 /// assert!(consumer.size() == 10);
547 /// consumer.pop();
548 /// assert!(producer.size() == 9);
549 /// ```
550 pub fn size(&self) -> usize {
551 (*self.buffer).tail.load(Ordering::Acquire) - (*self.buffer).head.load(Ordering::Acquire)
552 }
553}
554
555#[cfg(test)]
556mod tests {
557
558 use super::*;
559 use std::thread;
560
561 #[test]
562 fn test_buffer_size() {
563 assert_eq!(::std::mem::size_of::<Buffer<()>>(), 3 * CACHELINE_LEN);
564 }
565
566 #[test]
567 fn test_producer_push() {
568 let (p, _) = super::make(10);
569
570 for i in 0..9 {
571 p.push(i);
572 assert!(p.capacity() == 10);
573 assert!(p.size() == i + 1);
574 }
575 }
576
577 #[test]
578 fn test_consumer_pop() {
579 let (p, c) = super::make(10);
580
581 for i in 0..9 {
582 p.push(i);
583 assert!(p.capacity() == 10);
584 assert!(p.size() == i + 1);
585 }
586
587 for i in 0..9 {
588 assert!(c.size() == 9 - i);
589 let t = c.pop();
590 assert!(c.capacity() == 10);
591 assert!(c.size() == 9 - i - 1);
592 assert!(t == i);
593 }
594 }
595
596 #[test]
597 fn test_consumer_skip() {
598 let (p, c) = super::make(10);
599
600 for i in 0..9 {
601 p.push(i);
602 assert!(p.capacity() == 10);
603 assert!(p.size() == i + 1);
604 }
605 assert!(c.size() == 9);
606 assert!(c.skip_n(5) == 5);
607 assert!(c.size() == 4);
608 for i in 0..4 {
609 assert!(c.size() == 4 - i);
610 let t = c.pop();
611 assert!(c.capacity() == 10);
612 assert!(c.size() == 4 - i - 1);
613 assert!(t == i + 5);
614 }
615 assert!(c.size() == 0);
616 assert!(c.skip_n(5) == 0);
617 }
618
619 #[test]
620 fn test_consumer_skip_whole_buf() {
621 let (p, c) = super::make(9);
622
623 for i in 0..9 {
624 p.push(i);
625 assert!(p.capacity() == 9);
626 assert!(p.size() == i + 1);
627 }
628 assert!(c.size() == 9);
629 assert!(c.skip_n(9) == 9);
630 assert!(c.size() == 0);
631 }
632
633 #[test]
634 fn test_try_push() {
635 let (p, _) = super::make(10);
636
637 for i in 0..10 {
638 p.push(i);
639 assert!(p.capacity() == 10);
640 assert!(p.size() == i + 1);
641 }
642
643 match p.try_push(10) {
644 Some(v) => {
645 assert!(v == 10);
646 }
647 None => assert!(false, "Queue should not have accepted another write!"),
648 }
649 }
650
651 #[test]
652 fn test_try_poll() {
653 let (p, c) = super::make(10);
654
655 match c.try_pop() {
656 Some(_) => assert!(false, "Queue was empty but a value was read!"),
657 None => {}
658 }
659
660 p.push(123);
661
662 match c.try_pop() {
663 Some(v) => assert!(v == 123),
664 None => assert!(false, "Queue was not empty but poll() returned nothing!"),
665 }
666
667 match c.try_pop() {
668 Some(_) => assert!(false, "Queue was empty but a value was read!"),
669 None => {}
670 }
671 }
672
673 #[test]
674 fn test_threaded() {
675 let (p, c) = super::make(500);
676
677 thread::spawn(move || {
678 for i in 0..100000 {
679 p.push(i);
680 }
681 });
682
683 for i in 0..100000 {
684 let t = c.pop();
685 assert!(t == i);
686 }
687 }
688
689 extern crate time;
690 use self::time::PreciseTime;
691 use std::sync::mpsc::sync_channel;
692
693 #[test]
694 #[ignore]
695 fn bench_spsc_throughput() {
696 let iterations: i64 = 2i64.pow(14);
697
698 let (p, c) = make(iterations as usize);
699
700 let start = PreciseTime::now();
701 for i in 0..iterations as usize {
702 p.push(i);
703 }
704 let t = c.pop();
705 assert!(t == 0);
706 let end = PreciseTime::now();
707 let throughput =
708 (iterations as f64 / (start.to(end)).num_nanoseconds().unwrap() as f64) * 1000000000f64;
709 println!(
710 "Spsc Throughput: {}/s -- (iterations: {} in {} ns)",
711 throughput,
712 iterations,
713 (start.to(end)).num_nanoseconds().unwrap()
714 );
715 }
716
717 #[test]
718 #[ignore]
719 fn bench_chan_throughput() {
720 let iterations: i64 = 2i64.pow(14);
721
722 let (tx, rx) = sync_channel(iterations as usize);
723
724 let start = PreciseTime::now();
725 for i in 0..iterations as usize {
726 tx.send(i).unwrap();
727 }
728 let t = rx.recv().unwrap();
729 assert!(t == 0);
730 let end = PreciseTime::now();
731 let throughput =
732 (iterations as f64 / (start.to(end)).num_nanoseconds().unwrap() as f64) * 1000000000f64;
733 println!(
734 "Chan Throughput: {}/s -- (iterations: {} in {} ns)",
735 throughput,
736 iterations,
737 (start.to(end)).num_nanoseconds().unwrap()
738 );
739 }
740
741}