futex_queue/
lib.rs

1use std::{
2    cmp,
3    sync::{atomic, Arc, Mutex},
4    time::Instant,
5};
6
7use heapless::{binary_heap::Min, BinaryHeap};
8use linux_futex::{Futex, Private};
9
10const FUTEX_PARKED: i32 = -1;
11const FUTEX_EMPTY: i32 = 0;
12const FUTEX_NOTIFIED: i32 = 1;
13
14/// A fixed size MPSC queue with timer capability based on Linux futex. Suitable for real-time applications.
15/// Size N must be a power of 2.
16pub struct FutexQueue<T, const N: usize> {
17    // Ideally this would be lock-free priority queue, but it's a complicated beast
18    // All critical sections are as short as possible so hopefully this mutex spins for a few cycles without syscall
19    queue: Mutex<BinaryHeap<Item<T>, Min, N>>,
20    // Futex used to notify receiver
21    reader_state: Futex<Private>,
22}
23
24impl<T, const N: usize> FutexQueue<T, N> {
25    pub fn new() -> (Sender<T, N>, Receiver<T, N>) {
26        let inner = Arc::new(FutexQueue {
27            queue: Mutex::new(BinaryHeap::default()),
28            reader_state: Futex::new(FUTEX_EMPTY),
29        });
30
31        (
32            Sender {
33                inner: inner.clone(),
34            },
35            Receiver { inner },
36        )
37    }
38}
39
40/// Sender half of the queue. Safe to share between threads.
41pub struct Sender<T, const N: usize> {
42    inner: Arc<FutexQueue<T, N>>,
43}
44
45/// Receiver half of the queue.
46pub struct Receiver<T, const N: usize> {
47    inner: Arc<FutexQueue<T, N>>,
48}
49
50impl<T, const N: usize> Sender<T, N> {
51    /// Sends an item into the queue.
52    /// The receive order of sent items is not guaranteed.
53    pub fn send(&self, item: T) -> Result<(), T> {
54        let res = self.inner.queue.lock().unwrap().push(Item::Immediate(item));
55
56        match res {
57            Ok(()) => {
58                if self
59                    .inner
60                    .reader_state
61                    .value
62                    .swap(FUTEX_NOTIFIED, atomic::Ordering::Release)
63                    == FUTEX_PARKED
64                {
65                    // Wake up receiver thread because it was parked
66                    self.inner.reader_state.wake(1);
67                }
68
69                Ok(())
70            }
71            Err(item) => Err(item.into_value()),
72        }
73    }
74
75    /// Puts item into a queue to be received at a specified instant.
76    /// Receive order is earliest deadline first (after all immediate items).
77    pub fn send_scheduled(&self, item: T, instant: Instant) -> Result<(), T> {
78        // Keep critical section small
79        let (res, reload_timer) = {
80            let mut queue = self.inner.queue.lock().unwrap();
81            // Reload timer if new instant is the earliest in the queue or if there are no scheduled items.
82            // Note that this also evaluates to true if there is an immediate item at the front of the queue,
83            // but this edge case is rare and should not cause major performance issues.
84            let reload_timer = queue
85                .peek()
86                .map(|i| i.instant())
87                .flatten()
88                .map(|i| instant < i)
89                .unwrap_or(true);
90            let res = queue.push(Item::Scheduled(item, instant));
91            (res, reload_timer)
92        };
93
94        match res {
95            Ok(()) => {
96                if reload_timer
97                    && self
98                        .inner
99                        .reader_state
100                        .value
101                        .swap(FUTEX_NOTIFIED, atomic::Ordering::Release)
102                        == FUTEX_PARKED
103                {
104                    self.inner.reader_state.wake(1);
105                }
106
107                Ok(())
108            }
109            Err(item) => Err(item.into_value()),
110        }
111    }
112}
113
114impl<T, const N: usize> Receiver<T, N> {
115    /// Tries to receive from the queue without blocking.
116    /// Immediate items are returned first, then scheduled items in the order of earliest deadline first.
117    /// Error contains an optional Instant of the earliest (not ready) deadline in the queue.
118    pub fn try_recv(&mut self) -> Result<Item<T>, Option<Instant>> {
119        let mut queue = self.inner.queue.lock().unwrap();
120
121        match queue.peek() {
122            Some(item) => {
123                match item {
124                    // Immediate items are sorted at the beginning of the queue
125                    Item::Immediate(_) => Ok(queue.pop().unwrap()),
126                    Item::Scheduled(_, instant) => {
127                        if *instant <= Instant::now() {
128                            // Scheduled item is ready
129                            Ok(queue.pop().unwrap())
130                        } else {
131                            // Queue is not empty, but none of the scheduled items are ready
132                            Err(Some(*instant))
133                        }
134                    }
135                }
136            }
137            // Queue is empty and there are no scheduled items
138            None => Err(None),
139        }
140    }
141
142    /// Tries to receive from the queue and blocks the current thread if queue is empty.
143    /// Immediate items are returned first, then scheduled items in the order of earliest deadline first.
144    pub fn recv(&mut self) -> Item<T> {
145        loop {
146            let next_instant = match self.try_recv() {
147                Ok(item) => return item,
148                Err(next_instant) => next_instant,
149            };
150
151            // Check if anything new was queued while running to prevent expensive futex syscall
152            // Change NOTIFIED=>EMPTY or EMPTY=>PARKED, and continue in the first case
153            if self
154                .inner
155                .reader_state
156                .value
157                .fetch_sub(1, atomic::Ordering::Acquire)
158                == FUTEX_NOTIFIED
159            {
160                continue;
161            }
162
163            if let Some(instant) = next_instant {
164                self.inner
165                    .reader_state
166                    .wait_bitset_until(FUTEX_PARKED, u32::MAX, instant)
167                    .ok();
168            } else {
169                self.inner.reader_state.wait(FUTEX_PARKED).ok();
170            }
171
172            // Reset state
173            self.inner
174                .reader_state
175                .value
176                .store(FUTEX_EMPTY, atomic::Ordering::Release);
177        }
178    }
179}
180
181/// Represents queued item.
182pub enum Item<T> {
183    /// Item queued to be received immediately.
184    Immediate(T),
185    /// Item queued to be received at a specified instant.
186    Scheduled(T, Instant),
187}
188
189impl<T> Item<T> {
190    /// Returns reference to the item value.
191    pub fn value(&self) -> &T {
192        match self {
193            Item::Immediate(i) | Item::Scheduled(i, _) => i,
194        }
195    }
196
197    /// Consumes item to unwrap the contained value.
198    pub fn into_value(self) -> T {
199        match self {
200            Item::Immediate(i) | Item::Scheduled(i, _) => i,
201        }
202    }
203
204    /// Returns the scheduled instant of the item.
205    /// Returns None if the item was immediate.
206    pub fn instant(&self) -> Option<Instant> {
207        match self {
208            Item::Immediate(_) => None,
209            Item::Scheduled(_, instant) => Some(*instant),
210        }
211    }
212}
213
214impl<T> Eq for Item<T> {}
215
216impl<T> PartialEq for Item<T> {
217    fn eq(&self, other: &Self) -> bool {
218        match (self, other) {
219            (Item::Immediate(_), Item::Immediate(_)) => true,
220            (Item::Immediate(_), Item::Scheduled(_, _)) => false,
221            (Item::Scheduled(_, _), Item::Immediate(_)) => false,
222            (Item::Scheduled(_, i1), Item::Scheduled(_, i2)) => i1 == i2,
223        }
224    }
225}
226
227impl<T> Ord for Item<T> {
228    fn cmp(&self, other: &Self) -> cmp::Ordering {
229        self.partial_cmp(other).unwrap_or(cmp::Ordering::Less)
230    }
231}
232
233// Item::Immediate first, then Item::Scheduled sorted by instant
234impl<T> PartialOrd for Item<T> {
235    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
236        match (self, other) {
237            (Item::Immediate(_), Item::Immediate(_)) => None,
238            (Item::Immediate(_), Item::Scheduled(_, _)) => Some(cmp::Ordering::Less),
239            (Item::Scheduled(_, _), Item::Immediate(_)) => Some(cmp::Ordering::Greater),
240            (Item::Scheduled(_, i1), Item::Scheduled(_, i2)) => Some(i1.cmp(i2)),
241        }
242    }
243}