osal_rs/freertos/
queue.rs

1/***************************************************************************
2 *
3 * osal-rs
4 * Copyright (C) 2023/2026 Antonio Salsi <passy.linux@zresa.it>
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 *
18 ***************************************************************************/
19
20//! Queue-based inter-thread communication for FreeRTOS.
21//!
22//! This module provides FIFO queue primitives for safe message passing between threads
23//! and interrupt service routines. Supports both byte-based and typed queues.
24
25use core::ffi::c_void;
26use core::fmt::{Debug, Display};
27use core::marker::PhantomData;
28use core::ops::Deref;
29
30use alloc::vec;
31
32use super::ffi::{QueueHandle, pdFALSE, vQueueDelete, xQueueCreateCountingSemaphore, xQueueReceive, xQueueReceiveFromISR};
33use super::types::{BaseType, UBaseType, TickType};
34use super::system::System;
35use crate::traits::{ToTick, QueueFn, SystemFn, QueueStreamedFn, ToBytes, BytesHasLen, FromBytes};
36use crate::utils::{Result, Error};
37use crate::{xQueueSendToBack, xQueueSendToBackFromISR};
38
39
40/// A FIFO queue for byte-based message passing.
41///
42/// Provides a thread-safe queue implementation for sending and receiving
43/// raw byte slices between threads. Supports both blocking and ISR-safe operations.
44///
45/// # Examples
46///
47/// ## Basic queue usage
48///
49/// ```ignore
50/// use osal_rs::os::{Queue, QueueFn};
51/// use core::time::Duration;
52/// 
53/// // Create a queue with 10 slots, each 32 bytes
54/// let queue = Queue::new(10, 32).unwrap();
55/// 
56/// // Send data
57/// let data = [1u8, 2, 3, 4];
58/// queue.post_with_to_tick(&data, Duration::from_millis(100)).unwrap();
59/// 
60/// // Receive data
61/// let mut buffer = [0u8; 4];
62/// queue.fetch_with_to_tick(&mut buffer, Duration::from_millis(100)).unwrap();
63/// assert_eq!(buffer, [1, 2, 3, 4]);
64/// ```
65///
66/// ## Producer-consumer pattern
67///
68/// ```ignore
69/// use osal_rs::os::{Queue, QueueFn, Thread};
70/// use alloc::sync::Arc;
71/// use core::time::Duration;
72/// 
73/// let queue = Arc::new(Queue::new(5, 4).unwrap());
74/// let queue_clone = queue.clone();
75/// 
76/// // Consumer thread
77/// let consumer = Thread::new("consumer", 2048, 5, move || {
78///     let mut buffer = [0u8; 4];
79///     loop {
80///         if queue_clone.fetch(&mut buffer, 1000).is_ok() {
81///             println!("Received: {:?}", buffer);
82///         }
83///     }
84/// }).unwrap();
85/// 
86/// consumer.start().unwrap();
87/// 
88/// // Producer
89/// let data = [0xAA, 0xBB, 0xCC, 0xDD];
90/// queue.post(&data, 1000).unwrap();
91/// ```
92pub struct Queue (QueueHandle);
93
94unsafe impl Send for Queue {}
95unsafe impl Sync for Queue {}
96
97impl Queue {
98    #[inline]
99    pub fn fetch_with_to_tick(&self, buffer: &mut [u8], time: impl ToTick) -> Result<()> {
100        self.fetch(buffer, time.to_ticks())
101    }
102
103    #[inline]
104    pub fn post_with_to_tick(&self, item: &[u8], time: impl ToTick) -> Result<()> {
105        self.post(item, time.to_ticks())
106    }
107}
108
109impl QueueFn for Queue {
110    fn new (size: UBaseType, message_size: UBaseType) -> Result<Self> {
111        let handle = unsafe { xQueueCreateCountingSemaphore(size, message_size) };
112        if handle.is_null() {
113            Err(Error::OutOfMemory)
114        } else {
115            Ok(Self (handle))
116        }
117    }
118
119    fn fetch(&self, buffer: &mut [u8], time: TickType) -> Result<()> {
120        let ret = unsafe {
121            xQueueReceive(
122                self.0,
123                buffer.as_mut_ptr() as *mut c_void,
124                time,
125            )
126        };
127        if ret == 0 {
128            Err(Error::Timeout)
129        } else {
130            Ok(())
131        }
132    }
133
134    fn fetch_from_isr(&self, buffer: &mut [u8]) -> Result<()> {
135
136        let mut task_woken_by_receive: BaseType = pdFALSE;
137
138        let ret = unsafe {
139            xQueueReceiveFromISR(
140                self.0,
141                buffer.as_mut_ptr() as *mut c_void,
142                &mut task_woken_by_receive
143            )
144        };
145        if ret == 0 {
146            Err(Error::Timeout)
147        } else {
148
149            System::yield_from_isr(task_woken_by_receive);
150            
151            Ok(())
152        }
153    }
154
155    fn post(&self, item: &[u8], time: TickType) -> Result<()> {
156        let ret = xQueueSendToBack!(
157                            self.0,
158                            item.as_ptr() as *const c_void,
159                            time
160                        );
161        
162        if ret == 0 {
163            Err(Error::Timeout)
164        } else {
165            Ok(())
166        }
167    }
168
169    fn post_from_isr(&self, item: &[u8]) -> Result<()> {
170
171        let mut task_woken_by_receive: BaseType = pdFALSE;
172
173        let ret = xQueueSendToBackFromISR!(
174                            self.0,
175                            item.as_ptr() as *const c_void,
176                            &mut task_woken_by_receive
177                        );
178        
179        if ret == 0 {
180            Err(Error::Timeout)
181        } else {
182            System::yield_from_isr(task_woken_by_receive);
183
184            Ok(())
185        }
186    }
187
188    fn delete(&mut self) {
189        unsafe {
190            vQueueDelete(self.0);
191            self.0 = core::ptr::null_mut();
192        }
193    }
194}
195
196impl Drop for Queue {
197    fn drop(&mut self) {
198        if self.0.is_null() {
199            return;
200        }
201        self.delete();
202    }
203}
204
205impl Deref for Queue {
206    type Target = QueueHandle;
207
208    fn deref(&self) -> &Self::Target {
209        &self.0
210    }
211}
212
213impl Debug for Queue {
214    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
215        f.debug_struct("Queue")
216            .field("handle", &self.0)
217            .finish()
218    }
219}
220
221impl Display for Queue {
222    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
223        write!(f, "Queue {{ handle: {:?} }}", self.0)
224    }
225}
226
227/// A type-safe FIFO queue for message passing.
228///
229/// Unlike [`Queue`], which works with raw byte slices, `QueueStreamed` provides
230/// a type-safe interface for sending and receiving structured data. The type must
231/// implement serialization traits.
232///
233/// # Type Parameters
234///
235/// * `T` - The message type. Must implement `ToBytes`, `BytesHasLen`, and `FromBytes`
236///
237/// # Examples
238///
239/// ## Basic typed queue usage
240///
241/// ```ignore
242/// use osal_rs::os::{QueueStreamed, QueueStreamedFn};
243/// use core::time::Duration;
244/// 
245/// #[derive(Debug, Clone, Copy)]
246/// struct Message {
247///     id: u32,
248///     value: i16,
249/// }
250/// 
251/// // Assuming Message implements the required traits
252/// let queue: QueueStreamed<Message> = QueueStreamed::new(10, size_of::<Message>()).unwrap();
253/// 
254/// // Send a message
255/// let msg = Message { id: 1, value: 42 };
256/// queue.post_with_to_tick(&msg, Duration::from_millis(100)).unwrap();
257/// 
258/// // Receive a message
259/// let mut received = Message { id: 0, value: 0 };
260/// queue.fetch_with_to_tick(&mut received, Duration::from_millis(100)).unwrap();
261/// assert_eq!(received.id, 1);
262/// assert_eq!(received.value, 42);
263/// ```
264///
265/// ## Command queue pattern
266///
267/// ```ignore
268/// use osal_rs::os::{QueueStreamed, Thread};
269/// use alloc::sync::Arc;
270/// 
271/// enum Command {
272///     Start,
273///     Stop,
274///     SetValue(u32),
275/// }
276/// 
277/// let cmd_queue = Arc::new(QueueStreamed::<Command>::new(10, 8).unwrap());
278/// let queue_clone = cmd_queue.clone();
279/// 
280/// let handler = Thread::new("handler", 2048, 5, move || {
281///     loop {
282///         let mut cmd = Command::Stop;
283///         if queue_clone.fetch(&mut cmd, 1000).is_ok() {
284///             match cmd {
285///                 Command::Start => { /* start operation */ },
286///                 Command::Stop => { /* stop operation */ },
287///                 Command::SetValue(val) => { /* set value */ },
288///             }
289///         }
290///     }
291/// }).unwrap();
292/// ```
293pub struct QueueStreamed<T: ToBytes + BytesHasLen + FromBytes> (Queue, PhantomData<T>);
294
295unsafe impl<T: ToBytes + BytesHasLen + FromBytes> Send for QueueStreamed<T> {}
296unsafe impl<T: ToBytes + BytesHasLen + FromBytes> Sync for QueueStreamed<T> {}
297
298impl<T> QueueStreamed<T> 
299where 
300    T: ToBytes + BytesHasLen + FromBytes {
301    #[inline]
302    fn fetch_with_to_tick(&self, buffer: &mut T, time: impl ToTick) -> Result<()> {
303        self.fetch(buffer, time.to_ticks())
304    }
305
306    #[inline]
307    fn post_with_to_tick(&self, item: &T, time: impl ToTick) -> Result<()> {
308        self.post(item, time.to_ticks())
309    }
310}
311
312impl<T> QueueStreamedFn<T> for QueueStreamed<T> 
313where 
314    T: ToBytes + BytesHasLen + FromBytes {
315
316    #[inline]
317    fn new (size: UBaseType, message_size: UBaseType) -> Result<Self> {
318        Ok(Self (Queue::new(size, message_size)?, PhantomData))
319    }
320
321    fn fetch(&self, buffer: &mut T, time: TickType) -> Result<()> {
322        let mut buf_bytes = vec![0u8; buffer.len()];        
323
324        if let Ok(()) = self.0.fetch(&mut buf_bytes, time) {
325            *buffer = T::from_bytes(&buf_bytes)?;
326            Ok(())
327        } else {
328            Err(Error::Timeout)
329        }
330    }
331
332    fn fetch_from_isr(&self, buffer: &mut T) -> Result<()> {
333        let mut buf_bytes = vec![0u8; buffer.len()];        
334
335        if let Ok(()) = self.0.fetch_from_isr(&mut buf_bytes) {
336            *buffer = T::from_bytes(&buf_bytes)?;
337            Ok(())
338        } else {
339            Err(Error::Timeout)
340        }
341    }
342
343    #[inline]
344    fn post(&self, item: &T, time: TickType) -> Result<()> {
345        self.0.post(&item.to_bytes(), time)
346    }
347
348    #[inline]
349    fn post_from_isr(&self, item: &T) -> Result<()> {
350        self.0.post_from_isr(&item.to_bytes())
351    }
352
353    #[inline]
354    fn delete(&mut self) {
355        self.0.delete()
356    }
357}
358
359impl<T> Deref for QueueStreamed<T> 
360where 
361    T: ToBytes + BytesHasLen + FromBytes {
362    type Target = QueueHandle;
363
364    fn deref(&self) -> &Self::Target {
365        &self.0.0
366    }   
367}
368
369impl<T> Debug for QueueStreamed<T> 
370where 
371    T: ToBytes + BytesHasLen + FromBytes {
372    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
373        f.debug_struct("QueueStreamed")
374            .field("handle", &self.0.0)
375            .finish()
376    }
377}
378
379impl<T> Display for QueueStreamed<T> 
380where 
381    T: ToBytes + BytesHasLen + FromBytes {
382    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
383        write!(f, "QueueStreamed {{ handle: {:?} }}", self.0.0)
384    }
385}