gil/spsc/
mod.rs

1//! Single-producer single-consumer (SPSC) queue.
2//!
3//! This is the fastest queue variant, as it requires no atomic synchronization for the data buffer itself,
4//! only for the head and tail indices. It is inspired by the `ProducerConsumerQueue` in Facebook's Folly library.
5//!
6//! # Performance
7//!
8//! **Improvements over original inspiration:**
9//! - **Single Allocation:** The queue metadata (head/tail indices) and the data buffer are allocated
10//!   in a single contiguous memory block. This reduces cache misses by keeping related data close in memory.
11//! - **False Sharing Prevention:** The head and tail indices are explicitly padded to separate cache lines
12//!   to prevent false sharing between the producer and consumer cores.
13//!
14//! # When to use
15//!
16//! Use this queue for 1-to-1 thread communication. It offers the best possible throughput and latency.
17//!
18//! # Reference
19//!
20//! * [Facebook Folly ProducerConsumerQueue](https://github.com/facebook/folly/blob/main/folly/ProducerConsumerQueue.h)
21
22use core::num::NonZeroUsize;
23
24pub(crate) use self::queue::QueuePtr;
25pub(crate) mod shards;
26pub use self::{receiver::Receiver, sender::Sender};
27
28mod queue;
29mod receiver;
30mod sender;
31
32/// Creates a new single-producer single-consumer (SPSC) queue.
33///
34/// See the [module-level documentation](self) for more details on performance and usage.
35///
36/// # Arguments
37///
38/// * `capacity` - The capacity of the queue.
39///
40/// # Returns
41///
42/// A tuple containing the [`Sender`] and [`Receiver`] handles.
43///
44/// # Examples
45///
46/// ```
47/// use core::num::NonZeroUsize;
48/// use gil::spsc::channel;
49///
50/// let (tx, rx) = channel::<usize>(NonZeroUsize::new(1024).unwrap());
51/// ```
52pub fn channel<T>(capacity: NonZeroUsize) -> (Sender<T>, Receiver<T>) {
53    let queue = queue::QueuePtr::with_size(capacity);
54    (Sender::new(queue.clone()), Receiver::new(queue))
55}
56
57#[cfg(all(test, not(feature = "loom")))]
58mod test {
59    use std::num::NonZeroUsize;
60
61    use super::*;
62    use crate::thread;
63
64    #[test]
65    fn test_valid_sends() {
66        const COUNTS: NonZeroUsize = NonZeroUsize::new(4096).unwrap();
67        let (mut tx, mut rx) = channel::<usize>(COUNTS);
68
69        thread::spawn(move || {
70            for i in 0..COUNTS.get() << 3 {
71                tx.send(i as usize);
72            }
73        });
74
75        for i in 0..COUNTS.get() << 3 {
76            let r = rx.recv();
77            assert_eq!(r, i as usize);
78        }
79    }
80
81    #[test]
82    fn test_valid_try_sends() {
83        let (mut tx, mut rx) = channel::<usize>(NonZeroUsize::new(4).unwrap());
84        for _ in 0..4 {
85            assert!(rx.try_recv().is_none());
86        }
87        for i in 0..4 {
88            tx.try_send(i).unwrap();
89        }
90        assert!(tx.try_send(5).is_err());
91
92        for i in 0..4 {
93            assert_eq!(rx.try_recv(), Some(i));
94        }
95        assert!(rx.try_recv().is_none());
96        for i in 0..4 {
97            tx.try_send(i).unwrap();
98        }
99    }
100
101    #[cfg(feature = "async")]
102    #[test]
103    fn test_async_send() {
104        futures::executor::block_on(async {
105            const COUNTS: NonZeroUsize = NonZeroUsize::new(4096).unwrap();
106
107            let (mut tx, mut rx) = channel::<usize>(COUNTS);
108
109            thread::spawn(move || {
110                for i in 0..COUNTS.get() << 1 {
111                    futures::executor::block_on(tx.send_async(i));
112                }
113                drop(tx);
114            });
115            for i in 0..COUNTS.get() << 1 {
116                assert_eq!(rx.recv_async().await, i);
117            }
118        });
119    }
120
121    #[test]
122    fn test_batched_send_recv() {
123        const CAPACITY: NonZeroUsize = NonZeroUsize::new(1024).unwrap();
124        const TOTAL_ITEMS: usize = 1024 << 4;
125        let (mut tx, mut rx) = channel::<usize>(CAPACITY);
126
127        thread::spawn(move || {
128            let mut sent = 0;
129            while sent < TOTAL_ITEMS {
130                let buffer = tx.write_buffer();
131                let batch_size = buffer.len().min(TOTAL_ITEMS - sent);
132                for i in 0..batch_size {
133                    buffer[i].write(sent + i);
134                }
135                unsafe { tx.commit(batch_size) };
136                sent += batch_size;
137            }
138        });
139
140        let mut received = 0;
141        let mut expected = 0;
142
143        while received < TOTAL_ITEMS {
144            let buffer = rx.read_buffer();
145            if buffer.is_empty() {
146                continue;
147            }
148            for &value in buffer.iter() {
149                assert_eq!(value, expected);
150                expected += 1;
151            }
152            let count = buffer.len();
153            unsafe { rx.advance(count) };
154            received += count;
155        }
156
157        assert_eq!(received, TOTAL_ITEMS);
158    }
159
160    #[test]
161    fn test_drop_remaining_elements() {
162        use std::sync::atomic::{AtomicUsize, Ordering};
163
164        static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
165
166        #[derive(Clone)]
167        struct DropCounter;
168
169        impl Drop for DropCounter {
170            fn drop(&mut self) {
171                DROP_COUNT.fetch_add(1, Ordering::SeqCst);
172            }
173        }
174
175        DROP_COUNT.store(0, Ordering::SeqCst);
176
177        {
178            let (mut tx, rx) = channel::<DropCounter>(NonZeroUsize::new(16).unwrap());
179
180            // Send 5 items but don't receive them
181            for _ in 0..5 {
182                tx.send(DropCounter);
183            }
184
185            // Drop both ends - remaining items should be dropped
186            drop(tx);
187            drop(rx);
188        }
189
190        // All 5 items should have been dropped
191        assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 5);
192    }
193}
194
195#[cfg(all(test, feature = "loom"))]
196mod loom_test {
197    use core::num::NonZeroUsize;
198
199    use super::*;
200    use crate::thread;
201
202    #[test]
203    fn basic_loom() {
204        loom::model(|| {
205            let (mut tx, mut rx) = channel::<usize>(NonZeroUsize::new(2).unwrap());
206            let counts = 3;
207
208            thread::spawn(move || {
209                for i in 0..counts {
210                    tx.send(i);
211                }
212            });
213
214            for i in 0..counts {
215                let r = rx.recv();
216                assert_eq!(r, i);
217            }
218        })
219    }
220
221    #[test]
222    fn try_ops_loom() {
223        loom::model(|| {
224            let (mut tx, mut rx) = channel::<usize>(NonZeroUsize::new(2).unwrap());
225
226            thread::spawn(move || {
227                let mut i = 0;
228                while i < 3 {
229                    if tx.try_send(i).is_ok() {
230                        i += 1;
231                    }
232                    loom::thread::yield_now();
233                }
234            });
235
236            let mut i = 0;
237            while i < 3 {
238                if let Some(val) = rx.try_recv() {
239                    assert_eq!(val, i);
240                    i += 1;
241                }
242                loom::thread::yield_now();
243            }
244        })
245    }
246
247    #[test]
248    fn batched_ops_loom() {
249        loom::model(|| {
250            let (mut tx, mut rx) = channel::<usize>(NonZeroUsize::new(2).unwrap());
251            let total = 3;
252
253            thread::spawn(move || {
254                let mut sent = 0;
255                while sent < total {
256                    let buf = tx.write_buffer();
257                    if !buf.is_empty() {
258                        let count = buf.len().min(total - sent);
259                        for (i, item) in buf.iter_mut().take(count).enumerate() {
260                            item.write(sent + i);
261                        }
262                        unsafe { tx.commit(count) };
263                        sent += count;
264                    }
265                    loom::thread::yield_now();
266                }
267            });
268
269            let mut received = 0;
270            while received < total {
271                let buf = rx.read_buffer();
272                if !buf.is_empty() {
273                    let count = buf.len();
274                    for (i, item) in buf.iter().take(count).enumerate() {
275                        assert_eq!(*item, received + i);
276                    }
277                    unsafe { rx.advance(count) };
278                    received += count;
279                }
280                loom::thread::yield_now();
281            }
282        })
283    }
284}