lock_free/
queue_faa.rs

1//! FAA Queue: Fetch-And-Add based queue
2//! Simpler and often faster than CAS-based queues for bounded scenarios
3
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::cell::UnsafeCell;
6use std::mem::MaybeUninit;
7use std::hint;
8
9const CACHE_LINE: usize = 128;
10const PATIENCE: usize = 10;
11
12#[repr(align(128))]
13struct Slot<T> {
14    turn: AtomicUsize,
15    data: UnsafeCell<MaybeUninit<T>>,
16}
17
18unsafe impl<T: Send> Sync for Slot<T> {}
19
20/// FAA-based bounded queue - extremely fast for producer/consumer workloads
21#[repr(align(128))]
22pub struct FAAQueue<T> {
23    head: AtomicUsize,
24    _pad1: [u8; CACHE_LINE - 8],
25    tail: AtomicUsize,
26    _pad2: [u8; CACHE_LINE - 8],
27    buffer: Vec<Slot<T>>,
28    mask: usize,
29}
30
31impl<T> FAAQueue<T> {
32    pub fn new(capacity: usize) -> Self {
33        assert!(capacity.is_power_of_two());
34        
35        let mut buffer = Vec::with_capacity(capacity);
36        for i in 0..capacity {
37            buffer.push(Slot {
38                turn: AtomicUsize::new(i),
39                data: UnsafeCell::new(MaybeUninit::uninit()),
40            });
41        }
42
43        FAAQueue {
44            head: AtomicUsize::new(0),
45            _pad1: [0; CACHE_LINE - 8],
46            tail: AtomicUsize::new(0),
47            _pad2: [0; CACHE_LINE - 8],
48            buffer,
49            mask: capacity - 1,
50        }
51    }
52
53    #[inline(always)]
54    pub fn enqueue(&self, item: T) -> bool {
55        let mut tail = self.tail.load(Ordering::Relaxed);
56        
57        loop {
58            let slot = &self.buffer[tail & self.mask];
59            let turn = slot.turn.load(Ordering::Acquire);
60            
61            if turn == tail {
62                match self.tail.compare_exchange_weak(
63                    tail,
64                    tail + 1,
65                    Ordering::Relaxed,
66                    Ordering::Relaxed
67                ) {
68                    Ok(_) => {
69                        unsafe {
70                            (*slot.data.get()).write(item);
71                        }
72                        slot.turn.store(tail + 1, Ordering::Release);
73                        return true;
74                    }
75                    Err(actual) => tail = actual,
76                }
77            } else if turn + self.buffer.len() == tail + 1 {
78                return false; // Queue is full
79            } else {
80                tail = self.tail.load(Ordering::Relaxed);
81            }
82            
83            hint::spin_loop();
84        }
85    }
86
87    #[inline(always)]
88    pub fn dequeue(&self) -> Option<T> {
89        let mut head = self.head.load(Ordering::Relaxed);
90        
91        loop {
92            let slot = &self.buffer[head & self.mask];
93            let turn = slot.turn.load(Ordering::Acquire);
94            
95            if turn == head + 1 {
96                match self.head.compare_exchange_weak(
97                    head,
98                    head + 1,
99                    Ordering::Relaxed,
100                    Ordering::Relaxed
101                ) {
102                    Ok(_) => {
103                        let data = unsafe {
104                            (*slot.data.get()).assume_init_read()
105                        };
106                        slot.turn.store(head + self.buffer.len(), Ordering::Release);
107                        return Some(data);
108                    }
109                    Err(actual) => head = actual,
110                }
111            } else if turn == head {
112                return None; // Queue is empty
113            } else {
114                head = self.head.load(Ordering::Relaxed);
115            }
116            
117            hint::spin_loop();
118        }
119    }
120
121    #[inline(always)]
122    pub fn try_enqueue(&self, item: T) -> Result<(), T> {
123        let tail = self.tail.fetch_add(1, Ordering::Relaxed);
124        let slot = &self.buffer[tail & self.mask];
125        
126        let mut spin = 0;
127        loop {
128            let turn = slot.turn.load(Ordering::Acquire);
129            if turn == tail {
130                unsafe {
131                    (*slot.data.get()).write(item);
132                }
133                slot.turn.store(tail + 1, Ordering::Release);
134                return Ok(());
135            }
136            
137            spin += 1;
138            if spin > PATIENCE {
139                return Err(item);
140            }
141            hint::spin_loop();
142        }
143    }
144
145    #[inline(always)]
146    pub fn try_dequeue(&self) -> Option<T> {
147        let head = self.head.fetch_add(1, Ordering::Relaxed);
148        let slot = &self.buffer[head & self.mask];
149        
150        let mut spin = 0;
151        loop {
152            let turn = slot.turn.load(Ordering::Acquire);
153            if turn == head + 1 {
154                let data = unsafe {
155                    (*slot.data.get()).assume_init_read()
156                };
157                slot.turn.store(head + self.buffer.len(), Ordering::Release);
158                return Some(data);
159            }
160            
161            spin += 1;
162            if spin > PATIENCE {
163                return None;
164            }
165            hint::spin_loop();
166        }
167    }
168}
169
170unsafe impl<T: Send> Send for FAAQueue<T> {}
171unsafe impl<T: Send> Sync for FAAQueue<T> {}