Skip to main content

fq/
lib.rs

1/*!
2A fast and simple ring-buffer-based single-producer, single-consumer queue with no dependencies. You can use this to write Rust programs with low-latency message passing.
3
4## Installation
5Add this to your `Cargo.toml`:
6```TOML
7[dependencies]
8fq = "0.0.4"
9```
10
11## Quickstart
12```rust
13use fq::FastQueue;
14use std::thread;
15
16let (mut producer, mut consumer) = FastQueue::<String>::new(2);
17
18let sender = thread::spawn(move || {
19    producer.push("Hello, thread".to_owned())
20        .expect("Unable to send to queue");
21});
22
23let receiver = thread::spawn(move || {
24    while let Some(value) = consumer.next() {
25        assert_eq!(value, "Hello, thread");
26    }
27});
28
29sender.join().expect("The sender thread has panicked");
30receiver.join().expect("The receiver thread has panicked");
31```
32
33## How does it work?
34The ring buffer structure allows for a contiguous data structure. The idea is that if we are able to get extreme
35cache locality, we can improve performance by reducing cache misses. This is also the reason why if you use
36smart pointers like `Box<T>`, performance *may* degrade since cache locality gets degraded. For very large
37`T` types, you are more limited by `memcpy()` performance and less from queue implementations. As such,
38ring buffers can be considered strongly optimized for data of a few word sizes with some non-linear performance
39degradation for larger sizes. Additional optimizations are provided for CPUs that support `sse` and `prfchw`
40instructions. As and when Rust `std` provides more relevant instructions, they will be added. This is simply a
41high-level explanation of some of the techniques employed by this crate, you can read the code to gain a better
42understanding of what's happening under the hood.
43
44## Profiles
45The crate is fully synchronous and runtime-agnostic. We are heavily reliant on `std` for memory management, so
46it's unlikely that we will support `#[no_std]` runtimes anytime soon. You should be using the `release` or
47`maxperf` profiles for optimal performance.
48
49## Principles
50* This crate will always prioritize message throughput over memory usage.
51* This crate will always support generic types.
52* This crate will always provide a wait-free **and** lock-free API.
53* This crate will use unsafe Rust where possible for maximal throughput.
54
55## CPU Features
56On `x86` and `x86_64` targets, prefetch instructions are available on the `stable` toolchain. To make use of prefetch instructions on the `aarch64` target, you should enable the `unstable` feature and use the `nightly`
57toolchain.
58```TOML
59[dependencies]
60fq = { version = "0.0.4", features = ["unstable"] }
61```
62
63## Benchmarks
64Benchmarks are strictly difficult due to the nature of the program, it's somewhat simple to do a same-CPU
65bench but performance will still be affected based on the core type and cache contention. Benchmarks are
66provided in the [benches](benches/bench.rs) directory and can be run with `cargo bench`. Contributions via
67PRs for additional benchmarks are welcome.
68*/
69#![cfg_attr(
70    all(nightly, target_arch = "aarch64"),
71    feature(stdarch_aarch64_prefetch)
72)]
73use core::alloc::Layout;
74use core::cell::UnsafeCell;
75use core::marker::PhantomData;
76use core::mem::{MaybeUninit, size_of};
77use core::ptr;
78use core::sync::atomic::{AtomicUsize, Ordering};
79use std::alloc::{alloc, dealloc, handle_alloc_error};
80use std::sync::Arc;
81
82/// Padding to prevent false sharing
83#[cfg_attr(
84    any(
85        target_arch = "x86_64",
86        target_arch = "aarch64",
87        target_arch = "arm64ec",
88        target_arch = "powerpc64",
89    ),
90    repr(C, align(128))
91)]
92#[cfg_attr(
93    any(
94        target_arch = "arm",
95        target_arch = "mips",
96        target_arch = "mips32r6",
97        target_arch = "mips64",
98        target_arch = "mips64r6",
99        target_arch = "sparc",
100        target_arch = "hexagon",
101    ),
102    repr(C, align(32))
103)]
104#[cfg_attr(
105    not(any(
106        target_arch = "x86_64",
107        target_arch = "aarch64",
108        target_arch = "arm64ec",
109        target_arch = "powerpc64",
110        target_arch = "arm",
111        target_arch = "mips",
112        target_arch = "mips32r6",
113        target_arch = "mips64",
114        target_arch = "mips64r6",
115        target_arch = "sparc",
116        target_arch = "hexagon",
117    )),
118    repr(C, align(64))
119)]
120struct CachePadded<T>(T);
121
122#[cfg(any(
123    target_arch = "x86_64",
124    target_arch = "aarch64",
125    target_arch = "arm64ec",
126    target_arch = "powerpc64",
127))]
128const CACHE_LINE_SIZE: usize = 128;
129
130#[cfg(any(
131    target_arch = "arm",
132    target_arch = "mips",
133    target_arch = "mips32r6",
134    target_arch = "mips64",
135    target_arch = "mips64r6",
136    target_arch = "sparc",
137    target_arch = "hexagon",
138))]
139const CACHE_LINE_SIZE: usize = 32;
140
141#[cfg(not(any(
142    target_arch = "x86_64",
143    target_arch = "aarch64",
144    target_arch = "arm64ec",
145    target_arch = "powerpc64",
146    target_arch = "arm",
147    target_arch = "mips",
148    target_arch = "mips32r6",
149    target_arch = "mips64",
150    target_arch = "mips64r6",
151    target_arch = "sparc",
152    target_arch = "hexagon",
153)))]
154const CACHE_LINE_SIZE: usize = 64;
155
156/// A fast lock-free single-producer, single-consumer queue
157pub struct FastQueue<T> {
158    /// Capacity mask (capacity - 1) for fast modulo
159    mask: CachePadded<usize>,
160
161    /// The actual capacity
162    capacity: CachePadded<usize>,
163
164    /// Buffer storing elements directly
165    buffer: CachePadded<*mut MaybeUninit<T>>,
166
167    /// Written by producer, read by consumer.
168    head: CachePadded<AtomicUsize>,
169
170    /// Written by consumer, read by producer.
171    tail: CachePadded<AtomicUsize>,
172
173    _pd: PhantomData<T>,
174}
175
176unsafe impl<T: Send> Send for FastQueue<T> {}
177unsafe impl<T: Send> Sync for FastQueue<T> {}
178
179impl<T> FastQueue<T> {
180    /// Creates a SPSC queue with the given capacity. The allocated capacity may be higher.
181    ///
182    /// Capacity is rounded to the next power of two. The minimum allocated capacity is 2.
183    ///
184    /// # Example
185    /// ```
186    /// use fq::FastQueue;
187    /// struct Message {
188    ///     from: String,
189    ///     value: usize,
190    /// }
191    /// let (producer, consumer) = FastQueue::<Message>::new(2);
192    /// ```
193    #[allow(clippy::new_ret_no_self)]
194    pub fn new(capacity: usize) -> (Producer<T>, Consumer<T>) {
195        let capacity = capacity.next_power_of_two().max(2);
196        let mask = capacity - 1;
197
198        let layout =
199            Layout::from_size_align(capacity * size_of::<MaybeUninit<T>>(), CACHE_LINE_SIZE)
200                .expect("layout");
201        let buffer = unsafe { alloc(layout) as *mut MaybeUninit<T> };
202
203        if buffer.is_null() {
204            handle_alloc_error(layout);
205        }
206
207        let queue = Arc::new(FastQueue {
208            mask: CachePadded(mask),
209            capacity: CachePadded(capacity),
210            buffer: CachePadded(buffer),
211            head: CachePadded(AtomicUsize::new(0)),
212            tail: CachePadded(AtomicUsize::new(0)),
213            _pd: PhantomData,
214        });
215
216        let producer = Producer {
217            queue: CachePadded(Arc::clone(&queue)),
218            head: CachePadded(UnsafeCell::new(0)),
219            cached_tail: CachePadded(UnsafeCell::new(0)),
220            _pd: PhantomData,
221        };
222
223        let consumer = Consumer {
224            queue: CachePadded(queue),
225            tail: CachePadded(UnsafeCell::new(0)),
226            cached_head: CachePadded(UnsafeCell::new(0)),
227            _pd: PhantomData,
228        };
229
230        (producer, consumer)
231    }
232}
233
234impl<T> Drop for FastQueue<T> {
235    /// Drops all elements in the queue. This will only drop the elements, not the queue itself.
236    fn drop(&mut self) {
237        let head = self.head.0.load(Ordering::Relaxed);
238        let mut tail = self.tail.0.load(Ordering::Relaxed);
239
240        while tail != head {
241            unsafe {
242                let index = tail & self.mask.0;
243                let slot = self.buffer.0.add(index);
244                ptr::drop_in_place((*slot).as_mut_ptr());
245            }
246            tail = tail.wrapping_add(1);
247        }
248
249        unsafe {
250            let layout = Layout::from_size_align(
251                self.capacity.0 * size_of::<MaybeUninit<T>>(),
252                CACHE_LINE_SIZE,
253            )
254            .expect("layout");
255            dealloc(self.buffer.0 as *mut u8, layout);
256        }
257    }
258}
259
260/// A producer for the `FastQueue`. This is used to send elements to the queue.
261pub struct Producer<T> {
262    queue: CachePadded<Arc<FastQueue<T>>>,
263    head: CachePadded<UnsafeCell<usize>>,
264    cached_tail: CachePadded<UnsafeCell<usize>>,
265    _pd: PhantomData<T>,
266}
267
268unsafe impl<T: Send> Send for Producer<T> {}
269
270/// A producer for the `FastQueue`. This is used to send elements to the queue.
271impl<T> Producer<T> {
272    /// Pushes a value into the queue. Returns `Ok(())` on success or `Err(T)` if the queue is full.
273    ///
274    /// # Example
275    /// ```
276    /// use fq::FastQueue;
277    /// let (mut producer, mut consumer) = FastQueue::new(2);
278    /// producer.push(42).unwrap();
279    /// assert_eq!(consumer.pop(), Some(42));
280    /// ```
281    #[inline(always)]
282    pub fn push(&mut self, value: T) -> Result<(), T> {
283        let head = unsafe { *self.head.0.get() };
284        let next_head = head.wrapping_add(1);
285
286        self.prefetch_write(next_head);
287
288        let cached_tail = unsafe { *self.cached_tail.0.get() };
289
290        if next_head.wrapping_sub(cached_tail) > self.queue.0.capacity.0 {
291            // Reload actual tail (slow path)
292            let tail = self.queue.0.tail.0.load(Ordering::Acquire);
293
294            if tail != cached_tail {
295                // Update cached tail
296                unsafe {
297                    *self.cached_tail.0.get() = tail;
298                }
299            }
300
301            // Check again with fresh tail
302            if next_head.wrapping_sub(tail) > self.queue.0.capacity.0 {
303                return Err(value);
304            }
305        }
306
307        unsafe {
308            let index = head & self.queue.0.mask.0;
309            let slot = self.queue.0.buffer.0.add(index);
310            (*slot).as_mut_ptr().write(value);
311            *self.head.0.get() = next_head;
312        }
313
314        self.queue.0.head.0.store(next_head, Ordering::Release);
315
316        Ok(())
317    }
318
319    /// Returns the current number of elements in the queue (may be stale)
320    ///
321    /// This function may return stale data when holding a lock on the queue.
322    /// # Example
323    /// ```
324    /// use fq::FastQueue;
325    /// let (mut producer, mut consumer) = FastQueue::new(2);
326    /// assert_eq!(consumer.len(), 0);
327    /// producer.push(42).unwrap();
328    /// assert_eq!(consumer.len(), 1);
329    /// ```
330    #[inline(always)]
331    pub fn len(&self) -> usize {
332        let head = unsafe { *self.head.0.get() };
333        let tail = self.queue.0.tail.0.load(Ordering::Relaxed);
334        head.wrapping_sub(tail)
335    }
336
337    /// Checks if the queue is empty (may be stale). This function will return `true` if the queue is empty, and `false` otherwise.
338    ///
339    /// This function will return stale data when holding a lock on the queue.
340    /// # Example
341    /// ```
342    /// use fq::FastQueue;
343    /// let (mut producer, mut consumer) = FastQueue::new(2);
344    /// assert!(consumer.is_empty());
345    /// producer.push(42).unwrap();
346    /// assert!(!consumer.is_empty());
347    /// ```
348    #[inline(always)]
349    pub fn is_empty(&self) -> bool {
350        self.len() == 0
351    }
352
353    /// Checks if the queue is full (may be stale). This function will return `true` if the queue is full, and `false` otherwise.
354    ///
355    /// # Example
356    /// ```
357    /// use fq::FastQueue;
358    /// let (mut producer, mut consumer) = FastQueue::<usize>::new(2);
359    /// producer.push(42).unwrap(); // ⚠️ Prefer handling the error over using unwrap()
360    /// assert_eq!(producer.is_full(), false);
361    /// producer.push(43).unwrap();
362    /// assert_eq!(producer.is_full(), true);
363    /// ```
364    #[inline(always)]
365    pub fn is_full(&self) -> bool {
366        self.len() >= self.queue.0.capacity.0
367    }
368
369    #[inline(always)]
370    fn prefetch_write(&self, index: usize) {
371        let slot_index = index & self.queue.0.mask.0;
372        let _slot = unsafe { self.queue.0.buffer.0.add(slot_index) };
373
374        #[cfg(all(target_arch = "x86_64", target_feature = "sse"))]
375        unsafe {
376            core::arch::x86_64::_mm_prefetch(_slot as *const i8, core::arch::x86_64::_MM_HINT_T0);
377        }
378
379        #[cfg(all(target_arch = "x86_64", target_feature = "prfchw"))]
380        unsafe {
381            core::arch::x86_64::_mm_prefetch(_slot as *const i8, core::arch::x86_64::_MM_HINT_ET0);
382        }
383
384        #[cfg(target_arch = "x86")]
385        unsafe {
386            core::arch::x86::_mm_prefetch(_slot as *const i8, core::arch::x86::_MM_HINT_ET0);
387        }
388
389        #[cfg(all(feature = "unstable", nightly, target_arch = "aarch64"))]
390        unsafe {
391            core::arch::aarch64::_prefetch::<
392                { core::arch::aarch64::_PREFETCH_WRITE },
393                { core::arch::aarch64::_PREFETCH_LOCALITY0 },
394            >(_slot as *const i8);
395        }
396    }
397}
398
399/// A consumer for the `FastQueue`. This is used to receive items from the queue.
400pub struct Consumer<T> {
401    queue: CachePadded<Arc<FastQueue<T>>>,
402    tail: CachePadded<UnsafeCell<usize>>,
403    cached_head: CachePadded<UnsafeCell<usize>>,
404    _pd: PhantomData<T>,
405}
406
407unsafe impl<T: Send> Send for Consumer<T> {}
408
409/// A consumer for the `FastQueue`. This is used to receive items from the queue.
410impl<T> Consumer<T> {
411    /// Pops a value from the queue. Returns `Some(T)` on success or `None` if the queue is empty.
412    ///
413    /// # Example
414    /// ```
415    /// use fq::FastQueue;
416    /// let (mut producer, mut consumer) = FastQueue::new(2);
417    /// producer.push(42).unwrap();
418    /// assert_eq!(consumer.pop(), Some(42));
419    /// ```
420    #[inline(always)]
421    pub fn pop(&mut self) -> Option<T> {
422        let tail = unsafe { *self.tail.0.get() };
423
424        self.prefetch_read(tail.wrapping_add(1));
425
426        // Check cached head first (fast path)
427        let cached_head = unsafe { *self.cached_head.0.get() };
428
429        if tail == cached_head {
430            // Reload actual head (slow path)
431            let head = self.queue.0.head.0.load(Ordering::Acquire);
432
433            if head != cached_head {
434                // Update cached head
435                unsafe {
436                    *self.cached_head.0.get() = head;
437                }
438            }
439
440            // Check if still empty
441            if tail == head {
442                return None;
443            }
444        }
445
446        let value = unsafe {
447            let index = tail & self.queue.0.mask.0;
448            let slot = self.queue.0.buffer.0.add(index);
449            (*slot).as_ptr().read()
450        };
451
452        let next_tail = tail.wrapping_add(1);
453        unsafe { *self.tail.0.get() = next_tail };
454        self.queue.0.tail.0.store(next_tail, Ordering::Release);
455
456        Some(value)
457    }
458
459    /// Peeks at the front element without removing it.
460    ///
461    /// # Example
462    /// ```
463    /// use fq::FastQueue;
464    /// let (mut producer, mut consumer) = FastQueue::new(2);
465    /// producer.push(42).unwrap();
466    /// assert_eq!(consumer.peek(), Some(&42));
467    /// ```
468    #[inline(always)]
469    pub fn peek(&self) -> Option<&T> {
470        let tail = unsafe { *self.tail.0.get() };
471        let head = self.queue.0.head.0.load(Ordering::Acquire);
472
473        if tail == head {
474            return None;
475        }
476
477        unsafe {
478            let index = tail & self.queue.0.mask.0;
479            let slot = self.queue.0.buffer.0.add(index);
480            Some(&*(*slot).as_ptr())
481        }
482    }
483
484    /// Returns the current size of the queue (may be stale).
485    ///
486    /// This function will return stale data when holding a lock on the queue.
487    /// # Example
488    /// ```
489    /// use fq::FastQueue;
490    /// let (mut producer, mut consumer) = FastQueue::new(2);
491    /// assert_eq!(consumer.len(), 0);
492    /// producer.push(42).unwrap();
493    /// assert_eq!(consumer.len(), 1);
494    /// ```
495    #[inline(always)]
496    pub fn len(&self) -> usize {
497        let head = self.queue.0.head.0.load(Ordering::Relaxed);
498        let tail = unsafe { *self.tail.0.get() };
499        head.wrapping_sub(tail)
500    }
501
502    /// Checks if the queue is empty (may be stale). Returns `true` if the queue is empty, and `false` otherwise.
503    ///
504    /// This function will return stale data when holding a lock on the queue.
505    /// # Example
506    /// ```
507    /// use fq::FastQueue;
508    /// let (mut producer, mut consumer) = FastQueue::new(2);
509    /// assert_eq!(consumer.is_empty(), true);
510    /// producer.push(42).unwrap();
511    /// assert_eq!(consumer.is_empty(), false);
512    /// ```
513    #[inline(always)]
514    pub fn is_empty(&self) -> bool {
515        self.len() == 0
516    }
517
518    #[inline(always)]
519    fn prefetch_read(&self, index: usize) {
520        let slot_index = index & self.queue.0.mask.0;
521        let _slot = unsafe { self.queue.0.buffer.0.add(slot_index) };
522
523        #[cfg(all(target_arch = "x86_64", target_feature = "sse"))]
524        unsafe {
525            core::arch::x86_64::_mm_prefetch(_slot as *const i8, core::arch::x86_64::_MM_HINT_T0);
526        }
527
528        #[cfg(target_arch = "x86")]
529        unsafe {
530            core::arch::x86::_mm_prefetch(_slot as *const i8, core::arch::x86::_MM_HINT_T0);
531        }
532
533        #[cfg(all(feature = "unstable", nightly, target_arch = "aarch64"))]
534        unsafe {
535            core::arch::aarch64::_prefetch::<
536                { core::arch::aarch64::_PREFETCH_READ },
537                { core::arch::aarch64::_PREFETCH_LOCALITY0 },
538            >(_slot as *const i8);
539        }
540    }
541}
542
543impl<T> Iterator for Consumer<T> {
544    type Item = T;
545
546    /// Pops the next value from the queue. This is equivalent to calling `pop()`.
547    ///
548    /// # Example
549    /// ```
550    /// use fq::FastQueue;
551    /// let (mut producer, mut consumer) = FastQueue::new(4);
552    /// producer.push(42).unwrap();
553    /// producer.push(42).unwrap();
554    /// producer.push(42).unwrap();
555    /// while let Some(value) = consumer.next() {
556    ///     assert_eq!(value, 42);
557    /// }
558    /// ```
559    #[inline(always)]
560    fn next(&mut self) -> Option<Self::Item> {
561        self.pop()
562    }
563
564    /// Provides a size hint (may be stale)
565    #[inline(always)]
566    fn size_hint(&self) -> (usize, Option<usize>) {
567        // (lower bound, upper bound)
568        (self.len(), None)
569    }
570}
571
572#[cfg(test)]
573mod tests {
574    use super::*;
575    use std::sync::atomic::{AtomicBool, Ordering};
576    use std::thread;
577
578    #[test]
579    fn test_basic_push_pop() {
580        let (mut producer, mut consumer) = FastQueue::<usize>::new(2);
581
582        assert!(producer.push(42).is_ok());
583        assert_eq!(consumer.pop(), Some(42));
584        assert_eq!(consumer.pop(), None);
585    }
586
587    #[test]
588    fn test_capacity() {
589        let (mut producer, mut consumer) = FastQueue::<usize>::new(4);
590
591        assert!(producer.push(1).is_ok());
592        assert!(producer.push(2).is_ok());
593        assert!(producer.push(3).is_ok());
594        assert!(producer.push(4).is_ok());
595        assert!(producer.push(5).is_err()); // Full
596
597        assert_eq!(consumer.pop(), Some(1));
598        assert!(producer.push(5).is_ok()); // Space available now
599        assert_eq!(consumer.pop(), Some(2));
600        assert_eq!(consumer.pop(), Some(3));
601        assert_eq!(consumer.pop(), Some(4));
602        assert_eq!(consumer.pop(), Some(5));
603    }
604
605    #[test]
606    fn test_concurrent() {
607        const COUNT: usize = 1_000_000;
608        let (mut producer, mut consumer) = FastQueue::<usize>::new(1024);
609
610        let done = Arc::new(AtomicBool::new(false));
611        let done_clone = Arc::clone(&done);
612
613        // Producer thread
614        let producer_thread = thread::spawn(move || {
615            for i in 0..COUNT {
616                while producer.push(i).is_err() {
617                    std::hint::spin_loop();
618                }
619            }
620            done_clone.store(true, Ordering::Release);
621        });
622
623        // Consumer thread
624        let consumer_thread = thread::spawn(move || {
625            let mut count = 0;
626            while count < COUNT {
627                if let Some(val) = consumer.pop() {
628                    assert_eq!(val, count);
629                    count += 1;
630                } else if done.load(Ordering::Acquire) && consumer.is_empty() {
631                    break;
632                } else {
633                    std::hint::spin_loop();
634                }
635            }
636            assert_eq!(count, COUNT);
637        });
638
639        producer_thread.join().unwrap();
640        consumer_thread.join().unwrap();
641    }
642
643    #[test]
644    fn test_wraparound() {
645        let (mut producer, mut consumer) = FastQueue::<usize>::new(4);
646
647        // Fill queue
648        for i in 0..4 {
649            assert!(producer.push(i).is_ok());
650        }
651
652        // Consume half
653        for i in 0..2 {
654            assert_eq!(consumer.pop(), Some(i));
655        }
656
657        // Fill again (wraps around)
658        for i in 4..6 {
659            assert!(producer.push(i).is_ok());
660        }
661
662        // Consume all
663        for i in 2..6 {
664            assert_eq!(consumer.pop(), Some(i));
665        }
666
667        assert!(consumer.pop().is_none());
668    }
669}