veecle_freertos_integration/
queue.rs

1use alloc::sync::Arc;
2use core::ffi::CStr;
3use core::future::poll_fn;
4use core::marker::PhantomData;
5use core::mem::{ManuallyDrop, MaybeUninit};
6use core::task::Poll;
7
8use atomic_waker::AtomicWaker;
9use veecle_freertos_sys::bindings::{
10    QueueHandle_t, StackType_t, UBaseType_t, pdTRUE, shim_xQueueCreate, shim_xQueueReceive,
11    shim_xQueueSendToBack, uxQueueMessagesWaiting, uxQueueSpacesAvailable, vQueueDelete,
12};
13
14use crate::isr::InterruptContext;
15use crate::units::Duration;
16use crate::{FreeRtosError, Task, TaskPriority};
17
18/// A blocking queue with a finite size. For an asynchronous queue, see [`AsyncQueueSender`] and
19/// [`AsyncQueueReceiver`].
20///
21/// The items are owned by the queue and move ownership when sending.
22///
23/// Dropping a [`Queue`] does *not* destroy the underlying FreeRTOS queue.
24///
25/// ## Usage in FFIs
26///
27/// The implementation works with raw memory representations. This means
28/// that the type `T` layout must be understandable by the receiver. This
29/// is usually the case for types that are `Send` and `Sized` in Rust.
30///
31/// If communication with "C" is expected, users `must` ensure the types are
32/// C-compatible. This can be achieved by annotating them with the `#[repr(C)]`
33/// attribute.
34#[derive(Debug)]
35pub struct Queue<T> {
36    handle: QueueHandle_t,
37    item_type: PhantomData<T>,
38}
39
40// SAFETY: The queue struct only contains a pointer to the FreeRTOS resource so it is always Send.
41unsafe impl<T> Send for Queue<T> {}
42
43// SAFETY: The queue struct only contains a pointer to the FreeRTOS resource so it is always Sync.
44unsafe impl<T> Sync for Queue<T> {}
45
46impl<T> Unpin for Queue<T> {}
47
48impl<T> Queue<T>
49where
50    T: Send + Sized + 'static,
51{
52    /// Creates a new `Queue` with item type `T` via dynamic memory allocation.
53    pub fn new(max_size: UBaseType_t) -> Result<Queue<T>, FreeRtosError> {
54        let item_size = size_of::<T>();
55
56        // SAFETY:
57        // The binding for `shim_xQueueCreate` requires that `configSUPPORT_DYNAMIC_ALLOCATION` is enabled in the
58        // FreeRTOS configuration file. Not having the dynamic allocation enabled generates a compilation error.
59        // The NULL result from `shim_xQueueCreate` is captured and converted into a Rust error.
60        let handle = unsafe { shim_xQueueCreate(max_size, item_size as UBaseType_t) };
61
62        if handle.is_null() {
63            return Err(FreeRtosError::OutOfMemory);
64        }
65
66        Ok(Queue {
67            handle,
68            item_type: PhantomData,
69        })
70    }
71
72    /// Creates a `Queue` from a raw queue handle.
73    ///
74    /// # Safety
75    ///
76    /// `handle` must be a valid FreeRTOS regular queue handle (not semaphore or mutex).
77    /// The queue item type `T` must match the `handle`'s item type.
78    /// The queue handle must stay valid until the `Queue` and all its clones are dropped.
79    #[inline]
80    pub unsafe fn from_raw_handle(handle: QueueHandle_t) -> Self {
81        Self {
82            handle,
83            item_type: PhantomData,
84        }
85    }
86
87    /// Returns the raw queue handle, a pointer to the queue.
88    #[inline]
89    pub fn raw_handle(&self) -> QueueHandle_t {
90        self.handle
91    }
92
93    /// Sends an item to the end of the queue. Waits for the queue to have empty space for it.
94    pub fn send(&self, item: T, max_wait: Duration) -> Result<(), T> {
95        let item = ManuallyDrop::new(item);
96        // SAFETY:
97        // Our handle is always a valid undeleted queue handle.
98        // The queue takes ownership of the value pointed to by `pvItemToQueue` on success.
99        // To avoid double-dropping, the `item` is wrapped in `ManuallyDrop`.
100        if unsafe {
101            shim_xQueueSendToBack(self.handle, (&raw const *item).cast(), max_wait.ticks())
102        } == pdTRUE()
103        {
104            Ok(())
105        } else {
106            Err(ManuallyDrop::into_inner(item))
107        }
108    }
109
110    /// Sends an item to the end of the queue, from an interrupt.
111    pub fn send_from_isr(&self, context: &mut InterruptContext, item: T) -> Result<(), T> {
112        let item = ManuallyDrop::new(item);
113        // SAFETY:
114        // The queue, and therefore its handle, are created during the construction of Self, ensuring the argument
115        // `xQueue` is correct. The value pointed by `pvItemToQueue` is owned by the current function, ensuring
116        // it exists while `shim_xQueueSendToBackFromISR` is executed.
117        // To avoid double-dropping, the `item` is wrapped in `ManuallyDrop`.
118        if unsafe {
119            veecle_freertos_sys::bindings::shim_xQueueSendToBackFromISR(
120                self.handle,
121                (&raw const *item).cast(),
122                context.get_task_field_mut(),
123            )
124        } == pdTRUE()
125        {
126            Ok(())
127        } else {
128            Err(ManuallyDrop::into_inner(item))
129        }
130    }
131
132    /// Waits for an item to be available on the queue.
133    pub fn receive(&self, max_wait: Duration) -> Result<T, FreeRtosError> {
134        let mut buffer = MaybeUninit::<T>::uninit();
135
136        // SAFETY:
137        // The queue, and therefore its handle, are created during the construction of Self, ensuring the argument
138        // `xQueue` is correct. The buffer is created right before this call, ensuring its pointer to be valid.
139        if unsafe { shim_xQueueReceive(self.handle, buffer.as_mut_ptr().cast(), max_wait.ticks()) }
140            == pdTRUE()
141        {
142            // SAFETY:
143            // It is ensured by `xQueueReceive` that pdTRUE is returned if, and only if, a value has been copied into
144            // the buffer, allowing us to assume it has been initialized.
145            Ok(unsafe { buffer.assume_init() })
146        } else {
147            Err(FreeRtosError::QueueReceiveTimeout)
148        }
149    }
150
151    /// Returns the number of messages waiting in the queue.
152    pub fn messages_waiting(&self) -> UBaseType_t {
153        // SAFETY:
154        // The queue, and therefore its handle, are created during the construction of Self, ensuring the argument
155        // `xQueue` is correct.
156        unsafe { uxQueueMessagesWaiting(self.handle) }
157    }
158
159    /// Returns the number of spaces available in the queue.
160    pub fn spaces_available(&self) -> UBaseType_t {
161        // SAFETY:
162        // The queue, and therefore its handle, are created during the construction of Self, ensuring the argument
163        // `xQueue` is correct.
164        unsafe { uxQueueSpacesAvailable(self.handle) }
165    }
166}
167
168impl<T> Clone for Queue<T> {
169    fn clone(&self) -> Self {
170        Self {
171            handle: self.handle,
172            item_type: self.item_type,
173        }
174    }
175}
176
177/// An asynchronous queue with a finite size. For a purely blocking queue, see [`Queue`].
178///
179/// The items are owned by the queue and move ownership when sending.
180///
181/// ## Usage in FFIs
182///
183/// The implementation works with raw memory representations. This means
184/// that the type `T` layout must be understandable by the receiver. This
185/// is usually the case for types that are `Send` and `Sized` in Rust.
186///
187/// If communication with "C" is expected, users `must` ensure the types are
188/// C-compatible. This can be achieved by annotating them with the `#[repr(C)]`
189/// attribute.
190#[derive(Debug)]
191struct AsyncQueue<T> {
192    send_waker: AtomicWaker,
193    receive_waker: AtomicWaker,
194    queue: Queue<T>,
195}
196
197impl<T> AsyncQueue<T>
198where
199    T: Send + Sized + 'static,
200{
201    /// Creates a new `AsyncQueue` capable of holding `length` items of type `T` via dynamic memory allocation.
202    pub fn new(length: UBaseType_t) -> Result<Self, FreeRtosError> {
203        Ok(AsyncQueue {
204            send_waker: AtomicWaker::default(),
205            receive_waker: AtomicWaker::default(),
206            queue: Queue::new(length)?,
207        })
208    }
209
210    /// Returns the number of messages waiting in the queue.
211    #[inline]
212    pub fn messages_waiting(&self) -> UBaseType_t {
213        self.queue.messages_waiting()
214    }
215}
216
217impl<T> Drop for AsyncQueue<T> {
218    fn drop(&mut self) {
219        // SAFETY:
220        // The queue, and therefore its handle, are created during the construction of Self, ensuring the argument
221        // `xQueue` is correct.
222        unsafe {
223            vQueueDelete(self.queue.handle);
224        }
225    }
226}
227
228/// An asynchronous queue sender. Can be used to send data to an [`AsyncQueueReceiver`]. Use [`channel`] to create.
229///
230/// For a purely blocking queue, see [`Queue`].
231///
232/// The items are owned by the queue and move ownership when sending.
233#[derive(Debug)]
234pub struct AsyncQueueSender<T>(Arc<AsyncQueue<T>>);
235
236impl<T> AsyncQueueSender<T>
237where
238    T: Send + Sized + 'static,
239{
240    /// Returns the number of messages waiting in the queue.
241    #[inline]
242    pub fn messages_waiting(&self) -> UBaseType_t {
243        self.0.messages_waiting()
244    }
245
246    /// Sends an item to the end of the queue.
247    ///
248    /// Waits for the queue to have empty space for up to `max_wait`. If `max_wait` is 0 and the queue is full,
249    /// this function returns immediately.
250    #[inline]
251    pub fn send_blocking(&mut self, item: T, max_wait: Duration) -> Result<(), T> {
252        let result = self.0.queue.send(item, max_wait);
253
254        if result.is_ok() {
255            self.0.receive_waker.wake();
256        }
257
258        result
259    }
260
261    /// Sends an item to the end of the queue, from an interrupt.
262    #[inline]
263    pub fn send_from_isr(&mut self, context: &mut InterruptContext, item: T) -> Result<(), T> {
264        let result = self.0.queue.send_from_isr(context, item);
265
266        if result.is_ok() {
267            self.0.receive_waker.wake();
268        }
269
270        result
271    }
272
273    /// Resolves when at least one space is available in the queue.
274    async fn poll_ready(&mut self) {
275        poll_fn(|cx| {
276            self.0.send_waker.register(cx.waker());
277
278            let result = self.0.queue.spaces_available();
279
280            if result == 0 {
281                Poll::Pending
282            } else {
283                Poll::Ready(())
284            }
285        })
286        .await;
287    }
288
289    /// Asynchronous version of [`send_blocking`](Self::send_blocking).
290    ///
291    /// This function stays pending until the queue has space for the item.
292    pub async fn send(&mut self, item: T) {
293        self.poll_ready().await;
294
295        // `T` doesn't implement `Debug`, so we cannot `expect()`.
296        if self.0.queue.send(item, Duration::zero()).is_err() {
297            // `poll_ready` resolving guarantees a free slot in the queue, so `send` will never fail.
298            unreachable!("sending failed unexpectedly");
299        };
300
301        self.0.receive_waker.wake();
302    }
303}
304
305/// An asynchronous queue receiver. Can be used to receive data from an [`AsyncQueueSender`]. Use [`channel`] to create.
306///
307/// For a purely blocking queue, see [`Queue`].
308#[derive(Debug)]
309pub struct AsyncQueueReceiver<T>(Arc<AsyncQueue<T>>);
310
311impl<T> AsyncQueueReceiver<T>
312where
313    T: Send + Sized + 'static,
314{
315    /// Returns the number of messages waiting in the queue.
316    #[inline]
317    pub fn messages_waiting(&self) -> UBaseType_t {
318        self.0.messages_waiting()
319    }
320
321    /// Waits for an item to be available on the queue.
322    ///
323    /// Returns an item if available and an error if no item is available after `max_wait`.
324    pub fn receive_blocking(&mut self, max_wait: Duration) -> Result<T, FreeRtosError> {
325        let result = self.0.queue.receive(max_wait);
326
327        if result.is_ok() {
328            self.0.send_waker.wake();
329        }
330
331        result
332    }
333
334    /// Asynchronous version of [`receive_blocking`](Self::receive_blocking).
335    ///
336    /// This function stays pending until the queue has received an item.
337    pub async fn receive(&mut self) -> T {
338        poll_fn(|cx| {
339            let result = self.0.queue.receive(Duration::zero());
340
341            if let Ok(item) = result {
342                self.0.send_waker.wake();
343                Poll::Ready(item)
344            } else {
345                self.0.receive_waker.register(cx.waker());
346                Poll::Pending
347            }
348        })
349        .await
350    }
351}
352
353/// Creates a [`AsyncQueueSender`] [`AsyncQueueReceiver`] pair.
354pub fn channel<T>(
355    max_size: UBaseType_t,
356) -> Result<(AsyncQueueSender<T>, AsyncQueueReceiver<T>), FreeRtosError>
357where
358    T: Send + Sized + 'static,
359{
360    let queue = Arc::new(AsyncQueue::new(max_size)?);
361    let sender = AsyncQueueSender(queue.clone());
362    let receiver = AsyncQueueReceiver(queue);
363
364    Ok((sender, receiver))
365}
366
367/// Builder for a task that can receive items from a blocking [`Queue`] and send them to an
368/// asynchronous queue.
369#[derive(Debug)]
370pub struct BlockingToAsyncQueueTaskBuilder<T> {
371    name: &'static CStr,
372    queue: Queue<T>,
373    priority: TaskPriority,
374    capacity: UBaseType_t,
375    stack_size: StackType_t,
376}
377
378impl<T> BlockingToAsyncQueueTaskBuilder<T>
379where
380    T: Send + Sized + 'static,
381{
382    /// Creates a new queue bridge task builder.
383    pub fn new(name: &'static CStr, queue: Queue<T>, capacity: UBaseType_t) -> Self {
384        // This value was determined by trial and error and has worked consistently during tests. It is *not*
385        // derived from anything and might need to change with future versions of Rust or the crate.
386        const BASE_STACK_SIZE: StackType_t = 256;
387
388        // The FreeRTOS task requires memory for two instances of T to handle resending on failure.
389        let data_size = size_of::<T>() as StackType_t * 2;
390
391        Self {
392            name,
393            queue,
394            capacity,
395            priority: TaskPriority(1),
396            stack_size: BASE_STACK_SIZE + data_size,
397        }
398    }
399
400    /// Sets the priority of the FreeRTOS task.
401    pub fn priority(mut self, priority: TaskPriority) -> Self {
402        self.priority = priority;
403        self
404    }
405
406    /// Sets the stack size of the FreeRTOS task.
407    pub fn stack_size(mut self, stack_size: StackType_t) -> Self {
408        self.stack_size = stack_size;
409        self
410    }
411
412    /// Creates the task and returns a receiver to receive items from the blocking queue in an asynchronous manner.
413    pub fn create(self) -> Result<AsyncQueueReceiver<T>, FreeRtosError> {
414        let (mut sender, receiver) = channel(self.capacity)?;
415
416        Task::new()
417            .name(self.name)
418            .stack_size(self.stack_size)
419            .priority(self.priority)
420            .start(move |_| {
421                loop {
422                    // Any non-zero delay behaves the same because after a timeout it will try again until the operation
423                    // succeeds. The longer the delay the better, since we don't want to waste
424                    // resources starting the same operation over and over, so we use the maximum
425                    // allowed timeout.
426                    let duration = Duration::max();
427
428                    if let Ok(mut data) = self.queue.receive(duration) {
429                        while let Err(saved_data) = sender.send_blocking(data, duration) {
430                            data = saved_data;
431                        }
432                    }
433                }
434            })?;
435
436        Ok(receiver)
437    }
438}
439
440/// Builder for a task that can receive items from an asynchronous queue and send them to a
441/// blocking [`Queue`].
442#[derive(Debug)]
443pub struct AsyncToBlockingQueueTaskBuilder<T> {
444    name: &'static CStr,
445    queue: Queue<T>,
446    priority: TaskPriority,
447    capacity: UBaseType_t,
448    stack_size: StackType_t,
449}
450
451impl<T> AsyncToBlockingQueueTaskBuilder<T>
452where
453    T: Send + Sized + 'static,
454{
455    /// Creates a new queue bridge task builder.
456    pub fn new(name: &'static CStr, queue: Queue<T>, capacity: UBaseType_t) -> Self {
457        // This value was determined by trial and error and has worked consistently during tests. It is *not*
458        // derived from anything and might need to change with future versions of Rust or the crate.
459        const BASE_STACK_SIZE: StackType_t = 256;
460
461        // The FreeRTOS task requires memory for two instances of T to handle resending on failure.
462        let data_size = size_of::<T>() as StackType_t * 2;
463
464        Self {
465            name,
466            queue,
467            priority: TaskPriority(1),
468            capacity,
469            stack_size: BASE_STACK_SIZE + data_size,
470        }
471    }
472
473    /// Sets the priority of the FreeRTOS task.
474    pub fn priority(mut self, priority: TaskPriority) -> Self {
475        self.priority = priority;
476        self
477    }
478
479    /// Sets the stack size of the FreeRTOS task.
480    pub fn stack_size(mut self, stack_size: StackType_t) -> Self {
481        self.stack_size = stack_size;
482        self
483    }
484
485    /// Creates the task and returns a sender to send items to the blocking queue in an asynchronous manner.
486    pub fn create(self) -> Result<AsyncQueueSender<T>, FreeRtosError> {
487        let (sender, mut receiver) = channel(self.capacity)?;
488
489        Task::new()
490            .name(self.name)
491            .stack_size(self.stack_size)
492            .priority(self.priority)
493            .start(move |_| {
494                loop {
495                    // Any non-zero delay behaves the same because after a timeout it will try again until the operation
496                    // succeeds. The longer the delay the better, since we don't want to waste
497                    // resources starting the same operation over and over, so we use the maximum
498                    // allowed timeout.
499                    let duration = Duration::max();
500
501                    if let Ok(mut data) = receiver.receive_blocking(duration) {
502                        while let Err(saved_data) = self.queue.send(data, duration) {
503                            data = saved_data;
504                        }
505                    }
506                }
507            })?;
508
509        Ok(sender)
510    }
511}