1use std::{
2 cmp,
3 sync::{atomic, Arc, Mutex},
4 time::Instant,
5};
6
7use heapless::{binary_heap::Min, BinaryHeap};
8use linux_futex::{Futex, Private};
9
10const FUTEX_PARKED: i32 = -1;
11const FUTEX_EMPTY: i32 = 0;
12const FUTEX_NOTIFIED: i32 = 1;
13
14pub struct FutexQueue<T, const N: usize> {
17 queue: Mutex<BinaryHeap<Item<T>, Min, N>>,
20 reader_state: Futex<Private>,
22}
23
24impl<T, const N: usize> FutexQueue<T, N> {
25 pub fn new() -> (Sender<T, N>, Receiver<T, N>) {
26 let inner = Arc::new(FutexQueue {
27 queue: Mutex::new(BinaryHeap::default()),
28 reader_state: Futex::new(FUTEX_EMPTY),
29 });
30
31 (
32 Sender {
33 inner: inner.clone(),
34 },
35 Receiver { inner },
36 )
37 }
38}
39
40pub struct Sender<T, const N: usize> {
42 inner: Arc<FutexQueue<T, N>>,
43}
44
45pub struct Receiver<T, const N: usize> {
47 inner: Arc<FutexQueue<T, N>>,
48}
49
50impl<T, const N: usize> Sender<T, N> {
51 pub fn send(&self, item: T) -> Result<(), T> {
54 let res = self.inner.queue.lock().unwrap().push(Item::Immediate(item));
55
56 match res {
57 Ok(()) => {
58 if self
59 .inner
60 .reader_state
61 .value
62 .swap(FUTEX_NOTIFIED, atomic::Ordering::Release)
63 == FUTEX_PARKED
64 {
65 self.inner.reader_state.wake(1);
67 }
68
69 Ok(())
70 }
71 Err(item) => Err(item.into_value()),
72 }
73 }
74
75 pub fn send_scheduled(&self, item: T, instant: Instant) -> Result<(), T> {
78 let (res, reload_timer) = {
80 let mut queue = self.inner.queue.lock().unwrap();
81 let reload_timer = queue
85 .peek()
86 .map(|i| i.instant())
87 .flatten()
88 .map(|i| instant < i)
89 .unwrap_or(true);
90 let res = queue.push(Item::Scheduled(item, instant));
91 (res, reload_timer)
92 };
93
94 match res {
95 Ok(()) => {
96 if reload_timer
97 && self
98 .inner
99 .reader_state
100 .value
101 .swap(FUTEX_NOTIFIED, atomic::Ordering::Release)
102 == FUTEX_PARKED
103 {
104 self.inner.reader_state.wake(1);
105 }
106
107 Ok(())
108 }
109 Err(item) => Err(item.into_value()),
110 }
111 }
112}
113
114impl<T, const N: usize> Receiver<T, N> {
115 pub fn try_recv(&mut self) -> Result<Item<T>, Option<Instant>> {
119 let mut queue = self.inner.queue.lock().unwrap();
120
121 match queue.peek() {
122 Some(item) => {
123 match item {
124 Item::Immediate(_) => Ok(queue.pop().unwrap()),
126 Item::Scheduled(_, instant) => {
127 if *instant <= Instant::now() {
128 Ok(queue.pop().unwrap())
130 } else {
131 Err(Some(*instant))
133 }
134 }
135 }
136 }
137 None => Err(None),
139 }
140 }
141
142 pub fn recv(&mut self) -> Item<T> {
145 loop {
146 let next_instant = match self.try_recv() {
147 Ok(item) => return item,
148 Err(next_instant) => next_instant,
149 };
150
151 if self
154 .inner
155 .reader_state
156 .value
157 .fetch_sub(1, atomic::Ordering::Acquire)
158 == FUTEX_NOTIFIED
159 {
160 continue;
161 }
162
163 if let Some(instant) = next_instant {
164 self.inner
165 .reader_state
166 .wait_bitset_until(FUTEX_PARKED, u32::MAX, instant)
167 .ok();
168 } else {
169 self.inner.reader_state.wait(FUTEX_PARKED).ok();
170 }
171
172 self.inner
174 .reader_state
175 .value
176 .store(FUTEX_EMPTY, atomic::Ordering::Release);
177 }
178 }
179}
180
181pub enum Item<T> {
183 Immediate(T),
185 Scheduled(T, Instant),
187}
188
189impl<T> Item<T> {
190 pub fn value(&self) -> &T {
192 match self {
193 Item::Immediate(i) | Item::Scheduled(i, _) => i,
194 }
195 }
196
197 pub fn into_value(self) -> T {
199 match self {
200 Item::Immediate(i) | Item::Scheduled(i, _) => i,
201 }
202 }
203
204 pub fn instant(&self) -> Option<Instant> {
207 match self {
208 Item::Immediate(_) => None,
209 Item::Scheduled(_, instant) => Some(*instant),
210 }
211 }
212}
213
214impl<T> Eq for Item<T> {}
215
216impl<T> PartialEq for Item<T> {
217 fn eq(&self, other: &Self) -> bool {
218 match (self, other) {
219 (Item::Immediate(_), Item::Immediate(_)) => true,
220 (Item::Immediate(_), Item::Scheduled(_, _)) => false,
221 (Item::Scheduled(_, _), Item::Immediate(_)) => false,
222 (Item::Scheduled(_, i1), Item::Scheduled(_, i2)) => i1 == i2,
223 }
224 }
225}
226
227impl<T> Ord for Item<T> {
228 fn cmp(&self, other: &Self) -> cmp::Ordering {
229 self.partial_cmp(other).unwrap_or(cmp::Ordering::Less)
230 }
231}
232
233impl<T> PartialOrd for Item<T> {
235 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
236 match (self, other) {
237 (Item::Immediate(_), Item::Immediate(_)) => None,
238 (Item::Immediate(_), Item::Scheduled(_, _)) => Some(cmp::Ordering::Less),
239 (Item::Scheduled(_, _), Item::Immediate(_)) => Some(cmp::Ordering::Greater),
240 (Item::Scheduled(_, i1), Item::Scheduled(_, i2)) => Some(i1.cmp(i2)),
241 }
242 }
243}