Skip to main content

osal_rs/freertos/
queue.rs

1/***************************************************************************
2 *
3 * osal-rs
4 * Copyright (C) 2023/2026 Antonio Salsi <passy.linux@zresa.it>
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 *
18 ***************************************************************************/
19
20//! Queue-based inter-thread communication for FreeRTOS.
21//!
22//! This module provides FIFO queue primitives for safe message passing between threads
23//! and interrupt service routines. Supports both byte-based and typed queues.
24
25use core::ffi::c_void;
26use core::fmt::{Debug, Display};
27use core::marker::PhantomData;
28use core::ops::Deref;
29
30use alloc::vec::Vec;
31
32use super::ffi::{QueueHandle, pdFALSE, vQueueDelete, xQueueCreateCountingSemaphore, xQueueReceive, xQueueReceiveFromISR};
33use super::types::{BaseType, UBaseType, TickType};
34use super::system::System;
35use crate::traits::{ToTick, QueueFn, SystemFn, QueueStreamedFn, BytesHasLen};
36#[cfg(not(feature = "serde"))]
37use crate::traits::{Serialize, Deserialize};
38
39#[cfg(feature = "serde")]
40use osal_rs_serde::{Serialize, Deserialize, to_bytes};
41
42pub trait StructSerde : Serialize + BytesHasLen + Deserialize {}
43
44use crate::utils::{Result, Error};
45use crate::{xQueueSendToBack, xQueueSendToBackFromISR};
46
47
48/// A FIFO queue for byte-based message passing.
49///
50/// Provides a thread-safe queue implementation for sending and receiving
51/// raw byte slices between threads. Supports both blocking and ISR-safe operations.
52///
53/// # Examples
54///
55/// ## Basic queue usage
56///
57/// ```ignore
58/// use osal_rs::os::{Queue, QueueFn};
59/// use core::time::Duration;
60/// 
61/// // Create a queue with 10 slots, each 32 bytes
62/// let queue = Queue::new(10, 32).unwrap();
63/// 
64/// // Send data
65/// let data = [1u8, 2, 3, 4];
66/// queue.post_with_to_tick(&data, Duration::from_millis(100)).unwrap();
67/// 
68/// // Receive data
69/// let mut buffer = [0u8; 4];
70/// queue.fetch_with_to_tick(&mut buffer, Duration::from_millis(100)).unwrap();
71/// assert_eq!(buffer, [1, 2, 3, 4]);
72/// ```
73///
74/// ## Producer-consumer pattern
75///
76/// ```ignore
77/// use osal_rs::os::{Queue, QueueFn, Thread};
78/// use alloc::sync::Arc;
79/// use core::time::Duration;
80/// 
81/// let queue = Arc::new(Queue::new(5, 4).unwrap());
82/// let queue_clone = queue.clone();
83/// 
84/// // Consumer thread
85/// let consumer = Thread::new("consumer", 2048, 5, move || {
86///     let mut buffer = [0u8; 4];
87///     loop {
88///         if queue_clone.fetch(&mut buffer, 1000).is_ok() {
89///             println!("Received: {:?}", buffer);
90///         }
91///     }
92/// }).unwrap();
93/// 
94/// consumer.start().unwrap();
95/// 
96/// // Producer
97/// let data = [0xAA, 0xBB, 0xCC, 0xDD];
98/// queue.post(&data, 1000).unwrap();
99/// ```
100pub struct Queue (QueueHandle);
101
102unsafe impl Send for Queue {}
103unsafe impl Sync for Queue {}
104
105impl Queue {
106    /// Creates a new queue.
107    ///
108    /// # Parameters
109    ///
110    /// * `size` - Maximum number of messages the queue can hold
111    /// * `message_size` - Size in bytes of each message
112    ///
113    /// # Returns
114    ///
115    /// * `Ok(Self)` - Successfully created queue
116    /// * `Err(Error)` - Creation failed (insufficient memory, etc.)
117    ///
118    /// # Examples
119    ///
120    /// ```ignore
121    /// use osal_rs::os::{Queue, QueueFn};
122    /// 
123    /// // Queue for 5 messages of 16 bytes each
124    /// let queue = Queue::new(5, 16).unwrap();
125    /// ```
126    pub fn new (size: UBaseType, message_size: UBaseType) -> Result<Self> {
127        let handle = unsafe { xQueueCreateCountingSemaphore(size, message_size) };
128        if handle.is_null() {
129            Err(Error::OutOfMemory)
130        } else {
131            Ok(Self (handle))
132        }
133    }
134
135    #[inline]
136    pub fn fetch_with_to_tick(&self, buffer: &mut [u8], time: impl ToTick) -> Result<()> {
137        self.fetch(buffer, time.to_ticks())
138    }
139
140    #[inline]
141    pub fn post_with_to_tick(&self, item: &[u8], time: impl ToTick) -> Result<()> {
142        self.post(item, time.to_ticks())
143    }
144}
145
146impl QueueFn for Queue {
147
148    fn fetch(&self, buffer: &mut [u8], time: TickType) -> Result<()> {
149        let ret = unsafe {
150            xQueueReceive(
151                self.0,
152                buffer.as_mut_ptr() as *mut c_void,
153                time,
154            )
155        };
156        if ret == 0 {
157            Err(Error::Timeout)
158        } else {
159            Ok(())
160        }
161    }
162
163    fn fetch_from_isr(&self, buffer: &mut [u8]) -> Result<()> {
164
165        let mut task_woken_by_receive: BaseType = pdFALSE;
166
167        let ret = unsafe {
168            xQueueReceiveFromISR(
169                self.0,
170                buffer.as_mut_ptr() as *mut c_void,
171                &mut task_woken_by_receive
172            )
173        };
174        if ret == 0 {
175            Err(Error::Timeout)
176        } else {
177
178            System::yield_from_isr(task_woken_by_receive);
179            
180            Ok(())
181        }
182    }
183
184    fn post(&self, item: &[u8], time: TickType) -> Result<()> {
185        let ret = xQueueSendToBack!(
186                            self.0,
187                            item.as_ptr() as *const c_void,
188                            time
189                        );
190        
191        if ret == 0 {
192            Err(Error::Timeout)
193        } else {
194            Ok(())
195        }
196    }
197
198    fn post_from_isr(&self, item: &[u8]) -> Result<()> {
199
200        let mut task_woken_by_receive: BaseType = pdFALSE;
201
202        let ret = xQueueSendToBackFromISR!(
203                            self.0,
204                            item.as_ptr() as *const c_void,
205                            &mut task_woken_by_receive
206                        );
207        
208        if ret == 0 {
209            Err(Error::Timeout)
210        } else {
211            System::yield_from_isr(task_woken_by_receive);
212
213            Ok(())
214        }
215    }
216
217    fn delete(&mut self) {
218        unsafe {
219            vQueueDelete(self.0);
220            self.0 = core::ptr::null_mut();
221        }
222    }
223}
224
225impl Drop for Queue {
226    fn drop(&mut self) {
227        if self.0.is_null() {
228            return;
229        }
230        self.delete();
231    }
232}
233
234impl Deref for Queue {
235    type Target = QueueHandle;
236
237    fn deref(&self) -> &Self::Target {
238        &self.0
239    }
240}
241
242impl Debug for Queue {
243    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
244        f.debug_struct("Queue")
245            .field("handle", &self.0)
246            .finish()
247    }
248}
249
250impl Display for Queue {
251    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
252        write!(f, "Queue {{ handle: {:?} }}", self.0)
253    }
254}
255
256/// A type-safe FIFO queue for message passing.
257///
258/// Unlike [`Queue`], which works with raw byte slices, `QueueStreamed` provides
259/// a type-safe interface for sending and receiving structured data. The type must
260/// implement serialization traits.
261///
262/// # Type Parameters
263///
264/// * `T` - The message type. Must implement `ToBytes`, `BytesHasLen`, and `FromBytes`
265///
266/// # Examples
267///
268/// ## Basic typed queue usage
269///
270/// ```ignore
271/// use osal_rs::os::{QueueStreamed, QueueStreamedFn};
272/// use core::time::Duration;
273/// 
274/// #[derive(Debug, Clone, Copy)]
275/// struct Message {
276///     id: u32,
277///     value: i16,
278/// }
279/// 
280/// // Assuming Message implements the required traits
281/// let queue: QueueStreamed<Message> = QueueStreamed::new(10, size_of::<Message>()).unwrap();
282/// 
283/// // Send a message
284/// let msg = Message { id: 1, value: 42 };
285/// queue.post_with_to_tick(&msg, Duration::from_millis(100)).unwrap();
286/// 
287/// // Receive a message
288/// let mut received = Message { id: 0, value: 0 };
289/// queue.fetch_with_to_tick(&mut received, Duration::from_millis(100)).unwrap();
290/// assert_eq!(received.id, 1);
291/// assert_eq!(received.value, 42);
292/// ```
293///
294/// ## Command queue pattern
295///
296/// ```ignore
297/// use osal_rs::os::{QueueStreamed, Thread};
298/// use alloc::sync::Arc;
299/// 
300/// enum Command {
301///     Start,
302///     Stop,
303///     SetValue(u32),
304/// }
305/// 
306/// let cmd_queue = Arc::new(QueueStreamed::<Command>::new(10, 8).unwrap());
307/// let queue_clone = cmd_queue.clone();
308/// 
309/// let handler = Thread::new("handler", 2048, 5, move || {
310///     loop {
311///         let mut cmd = Command::Stop;
312///         if queue_clone.fetch(&mut cmd, 1000).is_ok() {
313///             match cmd {
314///                 Command::Start => { /* start operation */ },
315///                 Command::Stop => { /* stop operation */ },
316///                 Command::SetValue(val) => { /* set value */ },
317///             }
318///         }
319///     }
320/// }).unwrap();
321/// ```
322pub struct QueueStreamed<T: StructSerde> (Queue, PhantomData<T>);
323
324unsafe impl<T: StructSerde> Send for QueueStreamed<T> {}
325unsafe impl<T: StructSerde> Sync for QueueStreamed<T> {}
326
327impl<T> QueueStreamed<T> 
328where 
329    T: StructSerde {
330    /// Creates a new type-safe queue.
331    ///
332    /// # Parameters
333    ///
334    /// * `size` - Maximum number of messages
335    /// * `message_size` - Size of each message (typically `size_of::<T>()`)
336    ///
337    /// # Returns
338    ///
339    /// * `Ok(Self)` - Successfully created queue
340    /// * `Err(Error)` - Creation failed
341    #[inline]
342    pub fn new (size: UBaseType, message_size: UBaseType) -> Result<Self> {
343        Ok(Self (Queue::new(size, message_size)?, PhantomData))
344    }
345
346    #[inline]
347    fn fetch_with_to_tick(&self, buffer: &mut T, time: impl ToTick) -> Result<()> {
348        self.fetch(buffer, time.to_ticks())
349    }
350
351    #[inline]
352    fn post_with_to_tick(&self, item: &T, time: impl ToTick) -> Result<()> {
353        self.post(item, time.to_ticks())
354    }
355}
356
357#[cfg(not(feature = "serde"))]
358impl<T> QueueStreamedFn<T> for QueueStreamed<T> 
359where 
360    T: StructSerde {
361
362    fn fetch(&self, buffer: &mut T, time: TickType) -> Result<()> {
363        let mut buf_bytes = Vec::with_capacity(buffer.len());         
364
365        if let Ok(()) = self.0.fetch(&mut buf_bytes, time) {
366            *buffer = T::from_bytes(&buf_bytes)?;
367            Ok(())
368        } else {
369            Err(Error::Timeout)
370        }
371    }
372
373    fn fetch_from_isr(&self, buffer: &mut T) -> Result<()> {
374        let mut buf_bytes = Vec::with_capacity(buffer.len());      
375
376        if let Ok(()) = self.0.fetch_from_isr(&mut buf_bytes) {
377            *buffer = T::from_bytes(&buf_bytes)?;
378            Ok(())
379        } else {
380            Err(Error::Timeout)
381        }
382    }
383
384    #[inline]
385    fn post(&self, item: &T, time: TickType) -> Result<()> {
386        self.0.post(&item.to_bytes(), time)
387    }
388
389    #[inline]
390    fn post_from_isr(&self, item: &T) -> Result<()> {
391        self.0.post_from_isr(&item.to_bytes())
392    }
393
394    #[inline]
395    fn delete(&mut self) {
396        self.0.delete()
397    }
398}
399
400#[cfg(feature = "serde")]
401impl<T> QueueStreamedFn<T> for QueueStreamed<T> 
402where 
403    T: StructSerde {
404
405    fn fetch(&self, buffer: &mut T, time: TickType) -> Result<()> {
406        let mut buf_bytes = Vec::with_capacity(buffer.len());     
407
408        if let Ok(()) = self.0.fetch(&mut buf_bytes, time) {
409            
410            to_bytes(buffer, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
411
412            Ok(())
413        } else {
414            Err(Error::Timeout)
415        }
416    }
417
418    fn fetch_from_isr(&self, buffer: &mut T) -> Result<()> {
419        let mut buf_bytes = Vec::with_capacity(buffer.len());       
420
421        if let Ok(()) = self.0.fetch_from_isr(&mut buf_bytes) {
422            to_bytes(buffer, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
423            Ok(())
424        } else {
425            Err(Error::Timeout)
426        }
427    }
428
429    fn post(&self, item: &T, time: TickType) -> Result<()> {
430
431
432        let mut buf_bytes = Vec::with_capacity(item.len()); 
433
434        to_bytes(item, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
435
436        self.0.post(&buf_bytes, time)
437    }
438
439    fn post_from_isr(&self, item: &T) -> Result<()> {
440
441        let mut buf_bytes = Vec::with_capacity(item.len()); 
442
443        to_bytes(item, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
444
445        self.0.post_from_isr(&buf_bytes)
446    }
447
448    #[inline]
449    fn delete(&mut self) {
450        self.0.delete()
451    }
452}
453
454impl<T> Deref for QueueStreamed<T> 
455where 
456    T: StructSerde {
457    type Target = QueueHandle;
458
459    fn deref(&self) -> &Self::Target {
460        &self.0.0
461    }   
462}
463
464impl<T> Debug for QueueStreamed<T> 
465where 
466    T: StructSerde {
467    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
468        f.debug_struct("QueueStreamed")
469            .field("handle", &self.0.0)
470            .finish()
471    }
472}
473
474impl<T> Display for QueueStreamed<T> 
475where 
476    T: StructSerde {
477    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
478        write!(f, "QueueStreamed {{ handle: {:?} }}", self.0.0)
479    }
480}
481