veecle_freertos_integration/queue.rs
1use alloc::sync::Arc;
2use core::ffi::CStr;
3use core::future::poll_fn;
4use core::marker::PhantomData;
5use core::mem::{ManuallyDrop, MaybeUninit};
6use core::task::Poll;
7
8use atomic_waker::AtomicWaker;
9use veecle_freertos_sys::bindings::{
10 QueueHandle_t, StackType_t, UBaseType_t, pdTRUE, shim_xQueueCreate, shim_xQueueReceive,
11 shim_xQueueSendToBack, uxQueueMessagesWaiting, uxQueueSpacesAvailable, vQueueDelete,
12};
13
14use crate::isr::InterruptContext;
15use crate::units::Duration;
16use crate::{FreeRtosError, Task, TaskPriority};
17
18/// A blocking queue with a finite size. For an asynchronous queue, see [`AsyncQueueSender`] and
19/// [`AsyncQueueReceiver`].
20///
21/// The items are owned by the queue and move ownership when sending.
22///
23/// Dropping a [`Queue`] does *not* destroy the underlying FreeRTOS queue.
24///
25/// ## Usage in FFIs
26///
27/// The implementation works with raw memory representations. This means
28/// that the type `T` layout must be understandable by the receiver. This
29/// is usually the case for types that are `Send` and `Sized` in Rust.
30///
31/// If communication with "C" is expected, users `must` ensure the types are
32/// C-compatible. This can be achieved by annotating them with the `#[repr(C)]`
33/// attribute.
34#[derive(Debug)]
35pub struct Queue<T> {
36 handle: QueueHandle_t,
37 item_type: PhantomData<T>,
38}
39
40// SAFETY: The queue struct only contains a pointer to the FreeRTOS resource so it is always Send.
41unsafe impl<T> Send for Queue<T> {}
42
43// SAFETY: The queue struct only contains a pointer to the FreeRTOS resource so it is always Sync.
44unsafe impl<T> Sync for Queue<T> {}
45
46impl<T> Unpin for Queue<T> {}
47
48impl<T> Queue<T>
49where
50 T: Send + Sized + 'static,
51{
52 /// Creates a new `Queue` with item type `T` via dynamic memory allocation.
53 pub fn new(max_size: UBaseType_t) -> Result<Queue<T>, FreeRtosError> {
54 let item_size = size_of::<T>();
55
56 // SAFETY:
57 // The binding for `shim_xQueueCreate` requires that `configSUPPORT_DYNAMIC_ALLOCATION` is enabled in the
58 // FreeRTOS configuration file. Not having the dynamic allocation enabled generates a compilation error.
59 // The NULL result from `shim_xQueueCreate` is captured and converted into a Rust error.
60 let handle = unsafe { shim_xQueueCreate(max_size, item_size as UBaseType_t) };
61
62 if handle.is_null() {
63 return Err(FreeRtosError::OutOfMemory);
64 }
65
66 Ok(Queue {
67 handle,
68 item_type: PhantomData,
69 })
70 }
71
72 /// Creates a `Queue` from a raw queue handle.
73 ///
74 /// # Safety
75 ///
76 /// `handle` must be a valid FreeRTOS regular queue handle (not semaphore or mutex).
77 /// The queue item type `T` must match the `handle`'s item type.
78 /// The queue handle must stay valid until the `Queue` and all its clones are dropped.
79 #[inline]
80 pub unsafe fn from_raw_handle(handle: QueueHandle_t) -> Self {
81 Self {
82 handle,
83 item_type: PhantomData,
84 }
85 }
86
87 /// Returns the raw queue handle, a pointer to the queue.
88 #[inline]
89 pub fn raw_handle(&self) -> QueueHandle_t {
90 self.handle
91 }
92
93 /// Sends an item to the end of the queue. Waits for the queue to have empty space for it.
94 pub fn send(&self, item: T, max_wait: Duration) -> Result<(), T> {
95 let item = ManuallyDrop::new(item);
96 // SAFETY:
97 // Our handle is always a valid undeleted queue handle.
98 // The queue takes ownership of the value pointed to by `pvItemToQueue` on success.
99 // To avoid double-dropping, the `item` is wrapped in `ManuallyDrop`.
100 if unsafe {
101 shim_xQueueSendToBack(self.handle, (&raw const *item).cast(), max_wait.ticks())
102 } == pdTRUE()
103 {
104 Ok(())
105 } else {
106 Err(ManuallyDrop::into_inner(item))
107 }
108 }
109
110 /// Sends an item to the end of the queue, from an interrupt.
111 pub fn send_from_isr(&self, context: &mut InterruptContext, item: T) -> Result<(), T> {
112 let item = ManuallyDrop::new(item);
113 // SAFETY:
114 // The queue, and therefore its handle, are created during the construction of Self, ensuring the argument
115 // `xQueue` is correct. The value pointed by `pvItemToQueue` is owned by the current function, ensuring
116 // it exists while `shim_xQueueSendToBackFromISR` is executed.
117 // To avoid double-dropping, the `item` is wrapped in `ManuallyDrop`.
118 if unsafe {
119 veecle_freertos_sys::bindings::shim_xQueueSendToBackFromISR(
120 self.handle,
121 (&raw const *item).cast(),
122 context.get_task_field_mut(),
123 )
124 } == pdTRUE()
125 {
126 Ok(())
127 } else {
128 Err(ManuallyDrop::into_inner(item))
129 }
130 }
131
132 /// Waits for an item to be available on the queue.
133 pub fn receive(&self, max_wait: Duration) -> Result<T, FreeRtosError> {
134 let mut buffer = MaybeUninit::<T>::uninit();
135
136 // SAFETY:
137 // The queue, and therefore its handle, are created during the construction of Self, ensuring the argument
138 // `xQueue` is correct. The buffer is created right before this call, ensuring its pointer to be valid.
139 if unsafe { shim_xQueueReceive(self.handle, buffer.as_mut_ptr().cast(), max_wait.ticks()) }
140 == pdTRUE()
141 {
142 // SAFETY:
143 // It is ensured by `xQueueReceive` that pdTRUE is returned if, and only if, a value has been copied into
144 // the buffer, allowing us to assume it has been initialized.
145 Ok(unsafe { buffer.assume_init() })
146 } else {
147 Err(FreeRtosError::QueueReceiveTimeout)
148 }
149 }
150
151 /// Returns the number of messages waiting in the queue.
152 pub fn messages_waiting(&self) -> UBaseType_t {
153 // SAFETY:
154 // The queue, and therefore its handle, are created during the construction of Self, ensuring the argument
155 // `xQueue` is correct.
156 unsafe { uxQueueMessagesWaiting(self.handle) }
157 }
158
159 /// Returns the number of spaces available in the queue.
160 pub fn spaces_available(&self) -> UBaseType_t {
161 // SAFETY:
162 // The queue, and therefore its handle, are created during the construction of Self, ensuring the argument
163 // `xQueue` is correct.
164 unsafe { uxQueueSpacesAvailable(self.handle) }
165 }
166}
167
168impl<T> Clone for Queue<T> {
169 fn clone(&self) -> Self {
170 Self {
171 handle: self.handle,
172 item_type: self.item_type,
173 }
174 }
175}
176
177/// An asynchronous queue with a finite size. For a purely blocking queue, see [`Queue`].
178///
179/// The items are owned by the queue and move ownership when sending.
180///
181/// ## Usage in FFIs
182///
183/// The implementation works with raw memory representations. This means
184/// that the type `T` layout must be understandable by the receiver. This
185/// is usually the case for types that are `Send` and `Sized` in Rust.
186///
187/// If communication with "C" is expected, users `must` ensure the types are
188/// C-compatible. This can be achieved by annotating them with the `#[repr(C)]`
189/// attribute.
190#[derive(Debug)]
191struct AsyncQueue<T> {
192 send_waker: AtomicWaker,
193 receive_waker: AtomicWaker,
194 queue: Queue<T>,
195}
196
197impl<T> AsyncQueue<T>
198where
199 T: Send + Sized + 'static,
200{
201 /// Creates a new `AsyncQueue` capable of holding `length` items of type `T` via dynamic memory allocation.
202 pub fn new(length: UBaseType_t) -> Result<Self, FreeRtosError> {
203 Ok(AsyncQueue {
204 send_waker: AtomicWaker::default(),
205 receive_waker: AtomicWaker::default(),
206 queue: Queue::new(length)?,
207 })
208 }
209
210 /// Returns the number of messages waiting in the queue.
211 #[inline]
212 pub fn messages_waiting(&self) -> UBaseType_t {
213 self.queue.messages_waiting()
214 }
215}
216
217impl<T> Drop for AsyncQueue<T> {
218 fn drop(&mut self) {
219 // SAFETY:
220 // The queue, and therefore its handle, are created during the construction of Self, ensuring the argument
221 // `xQueue` is correct.
222 unsafe {
223 vQueueDelete(self.queue.handle);
224 }
225 }
226}
227
228/// An asynchronous queue sender. Can be used to send data to an [`AsyncQueueReceiver`]. Use [`channel`] to create.
229///
230/// For a purely blocking queue, see [`Queue`].
231///
232/// The items are owned by the queue and move ownership when sending.
233#[derive(Debug)]
234pub struct AsyncQueueSender<T>(Arc<AsyncQueue<T>>);
235
236impl<T> AsyncQueueSender<T>
237where
238 T: Send + Sized + 'static,
239{
240 /// Returns the number of messages waiting in the queue.
241 #[inline]
242 pub fn messages_waiting(&self) -> UBaseType_t {
243 self.0.messages_waiting()
244 }
245
246 /// Sends an item to the end of the queue.
247 ///
248 /// Waits for the queue to have empty space for up to `max_wait`. If `max_wait` is 0 and the queue is full,
249 /// this function returns immediately.
250 #[inline]
251 pub fn send_blocking(&mut self, item: T, max_wait: Duration) -> Result<(), T> {
252 let result = self.0.queue.send(item, max_wait);
253
254 if result.is_ok() {
255 self.0.receive_waker.wake();
256 }
257
258 result
259 }
260
261 /// Sends an item to the end of the queue, from an interrupt.
262 #[inline]
263 pub fn send_from_isr(&mut self, context: &mut InterruptContext, item: T) -> Result<(), T> {
264 let result = self.0.queue.send_from_isr(context, item);
265
266 if result.is_ok() {
267 self.0.receive_waker.wake();
268 }
269
270 result
271 }
272
273 /// Resolves when at least one space is available in the queue.
274 async fn poll_ready(&mut self) {
275 poll_fn(|cx| {
276 self.0.send_waker.register(cx.waker());
277
278 let result = self.0.queue.spaces_available();
279
280 if result == 0 {
281 Poll::Pending
282 } else {
283 Poll::Ready(())
284 }
285 })
286 .await;
287 }
288
289 /// Asynchronous version of [`send_blocking`](Self::send_blocking).
290 ///
291 /// This function stays pending until the queue has space for the item.
292 pub async fn send(&mut self, item: T) {
293 self.poll_ready().await;
294
295 // `T` doesn't implement `Debug`, so we cannot `expect()`.
296 if self.0.queue.send(item, Duration::zero()).is_err() {
297 // `poll_ready` resolving guarantees a free slot in the queue, so `send` will never fail.
298 unreachable!("sending failed unexpectedly");
299 };
300
301 self.0.receive_waker.wake();
302 }
303}
304
305/// An asynchronous queue receiver. Can be used to receive data from an [`AsyncQueueSender`]. Use [`channel`] to create.
306///
307/// For a purely blocking queue, see [`Queue`].
308#[derive(Debug)]
309pub struct AsyncQueueReceiver<T>(Arc<AsyncQueue<T>>);
310
311impl<T> AsyncQueueReceiver<T>
312where
313 T: Send + Sized + 'static,
314{
315 /// Returns the number of messages waiting in the queue.
316 #[inline]
317 pub fn messages_waiting(&self) -> UBaseType_t {
318 self.0.messages_waiting()
319 }
320
321 /// Waits for an item to be available on the queue.
322 ///
323 /// Returns an item if available and an error if no item is available after `max_wait`.
324 pub fn receive_blocking(&mut self, max_wait: Duration) -> Result<T, FreeRtosError> {
325 let result = self.0.queue.receive(max_wait);
326
327 if result.is_ok() {
328 self.0.send_waker.wake();
329 }
330
331 result
332 }
333
334 /// Asynchronous version of [`receive_blocking`](Self::receive_blocking).
335 ///
336 /// This function stays pending until the queue has received an item.
337 pub async fn receive(&mut self) -> T {
338 poll_fn(|cx| {
339 let result = self.0.queue.receive(Duration::zero());
340
341 if let Ok(item) = result {
342 self.0.send_waker.wake();
343 Poll::Ready(item)
344 } else {
345 self.0.receive_waker.register(cx.waker());
346 Poll::Pending
347 }
348 })
349 .await
350 }
351}
352
353/// Creates a [`AsyncQueueSender`] [`AsyncQueueReceiver`] pair.
354pub fn channel<T>(
355 max_size: UBaseType_t,
356) -> Result<(AsyncQueueSender<T>, AsyncQueueReceiver<T>), FreeRtosError>
357where
358 T: Send + Sized + 'static,
359{
360 let queue = Arc::new(AsyncQueue::new(max_size)?);
361 let sender = AsyncQueueSender(queue.clone());
362 let receiver = AsyncQueueReceiver(queue);
363
364 Ok((sender, receiver))
365}
366
367/// Builder for a task that can receive items from a blocking [`Queue`] and send them to an
368/// asynchronous queue.
369#[derive(Debug)]
370pub struct BlockingToAsyncQueueTaskBuilder<T> {
371 name: &'static CStr,
372 queue: Queue<T>,
373 priority: TaskPriority,
374 capacity: UBaseType_t,
375 stack_size: StackType_t,
376}
377
378impl<T> BlockingToAsyncQueueTaskBuilder<T>
379where
380 T: Send + Sized + 'static,
381{
382 /// Creates a new queue bridge task builder.
383 pub fn new(name: &'static CStr, queue: Queue<T>, capacity: UBaseType_t) -> Self {
384 // This value was determined by trial and error and has worked consistently during tests. It is *not*
385 // derived from anything and might need to change with future versions of Rust or the crate.
386 const BASE_STACK_SIZE: StackType_t = 256;
387
388 // The FreeRTOS task requires memory for two instances of T to handle resending on failure.
389 let data_size = size_of::<T>() as StackType_t * 2;
390
391 Self {
392 name,
393 queue,
394 capacity,
395 priority: TaskPriority(1),
396 stack_size: BASE_STACK_SIZE + data_size,
397 }
398 }
399
400 /// Sets the priority of the FreeRTOS task.
401 pub fn priority(mut self, priority: TaskPriority) -> Self {
402 self.priority = priority;
403 self
404 }
405
406 /// Sets the stack size of the FreeRTOS task.
407 pub fn stack_size(mut self, stack_size: StackType_t) -> Self {
408 self.stack_size = stack_size;
409 self
410 }
411
412 /// Creates the task and returns a receiver to receive items from the blocking queue in an asynchronous manner.
413 pub fn create(self) -> Result<AsyncQueueReceiver<T>, FreeRtosError> {
414 let (mut sender, receiver) = channel(self.capacity)?;
415
416 Task::new()
417 .name(self.name)
418 .stack_size(self.stack_size)
419 .priority(self.priority)
420 .start(move |_| {
421 loop {
422 // Any non-zero delay behaves the same because after a timeout it will try again until the operation
423 // succeeds. The longer the delay the better, since we don't want to waste
424 // resources starting the same operation over and over, so we use the maximum
425 // allowed timeout.
426 let duration = Duration::max();
427
428 if let Ok(mut data) = self.queue.receive(duration) {
429 while let Err(saved_data) = sender.send_blocking(data, duration) {
430 data = saved_data;
431 }
432 }
433 }
434 })?;
435
436 Ok(receiver)
437 }
438}
439
440/// Builder for a task that can receive items from an asynchronous queue and send them to a
441/// blocking [`Queue`].
442#[derive(Debug)]
443pub struct AsyncToBlockingQueueTaskBuilder<T> {
444 name: &'static CStr,
445 queue: Queue<T>,
446 priority: TaskPriority,
447 capacity: UBaseType_t,
448 stack_size: StackType_t,
449}
450
451impl<T> AsyncToBlockingQueueTaskBuilder<T>
452where
453 T: Send + Sized + 'static,
454{
455 /// Creates a new queue bridge task builder.
456 pub fn new(name: &'static CStr, queue: Queue<T>, capacity: UBaseType_t) -> Self {
457 // This value was determined by trial and error and has worked consistently during tests. It is *not*
458 // derived from anything and might need to change with future versions of Rust or the crate.
459 const BASE_STACK_SIZE: StackType_t = 256;
460
461 // The FreeRTOS task requires memory for two instances of T to handle resending on failure.
462 let data_size = size_of::<T>() as StackType_t * 2;
463
464 Self {
465 name,
466 queue,
467 priority: TaskPriority(1),
468 capacity,
469 stack_size: BASE_STACK_SIZE + data_size,
470 }
471 }
472
473 /// Sets the priority of the FreeRTOS task.
474 pub fn priority(mut self, priority: TaskPriority) -> Self {
475 self.priority = priority;
476 self
477 }
478
479 /// Sets the stack size of the FreeRTOS task.
480 pub fn stack_size(mut self, stack_size: StackType_t) -> Self {
481 self.stack_size = stack_size;
482 self
483 }
484
485 /// Creates the task and returns a sender to send items to the blocking queue in an asynchronous manner.
486 pub fn create(self) -> Result<AsyncQueueSender<T>, FreeRtosError> {
487 let (sender, mut receiver) = channel(self.capacity)?;
488
489 Task::new()
490 .name(self.name)
491 .stack_size(self.stack_size)
492 .priority(self.priority)
493 .start(move |_| {
494 loop {
495 // Any non-zero delay behaves the same because after a timeout it will try again until the operation
496 // succeeds. The longer the delay the better, since we don't want to waste
497 // resources starting the same operation over and over, so we use the maximum
498 // allowed timeout.
499 let duration = Duration::max();
500
501 if let Ok(mut data) = receiver.receive_blocking(duration) {
502 while let Err(saved_data) = self.queue.send(data, duration) {
503 data = saved_data;
504 }
505 }
506 }
507 })?;
508
509 Ok(sender)
510 }
511}