osal_rs/freertos/queue.rs
1/***************************************************************************
2 *
3 * osal-rs
4 * Copyright (C) 2023/2026 Antonio Salsi <passy.linux@zresa.it>
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 *
18 ***************************************************************************/
19
20//! Queue-based inter-thread communication for FreeRTOS.
21//!
22//! This module provides FIFO queue primitives for safe message passing between threads
23//! and interrupt service routines. Supports both byte-based and typed queues.
24
25use core::ffi::c_void;
26use core::fmt::{Debug, Display};
27use core::marker::PhantomData;
28use core::ops::Deref;
29
30use alloc::vec;
31
32use super::ffi::{QueueHandle, pdFALSE, vQueueDelete, xQueueCreateCountingSemaphore, xQueueReceive, xQueueReceiveFromISR};
33use super::types::{BaseType, UBaseType, TickType};
34use super::system::System;
35use crate::traits::{ToTick, QueueFn, SystemFn, QueueStreamedFn, ToBytes, BytesHasLen, FromBytes};
36use crate::utils::{Result, Error};
37use crate::{xQueueSendToBack, xQueueSendToBackFromISR};
38
39
40/// A FIFO queue for byte-based message passing.
41///
42/// Provides a thread-safe queue implementation for sending and receiving
43/// raw byte slices between threads. Supports both blocking and ISR-safe operations.
44///
45/// # Examples
46///
47/// ## Basic queue usage
48///
49/// ```ignore
50/// use osal_rs::os::{Queue, QueueFn};
51/// use core::time::Duration;
52///
53/// // Create a queue with 10 slots, each 32 bytes
54/// let queue = Queue::new(10, 32).unwrap();
55///
56/// // Send data
57/// let data = [1u8, 2, 3, 4];
58/// queue.post_with_to_tick(&data, Duration::from_millis(100)).unwrap();
59///
60/// // Receive data
61/// let mut buffer = [0u8; 4];
62/// queue.fetch_with_to_tick(&mut buffer, Duration::from_millis(100)).unwrap();
63/// assert_eq!(buffer, [1, 2, 3, 4]);
64/// ```
65///
66/// ## Producer-consumer pattern
67///
68/// ```ignore
69/// use osal_rs::os::{Queue, QueueFn, Thread};
70/// use alloc::sync::Arc;
71/// use core::time::Duration;
72///
73/// let queue = Arc::new(Queue::new(5, 4).unwrap());
74/// let queue_clone = queue.clone();
75///
76/// // Consumer thread
77/// let consumer = Thread::new("consumer", 2048, 5, move || {
78/// let mut buffer = [0u8; 4];
79/// loop {
80/// if queue_clone.fetch(&mut buffer, 1000).is_ok() {
81/// println!("Received: {:?}", buffer);
82/// }
83/// }
84/// }).unwrap();
85///
86/// consumer.start().unwrap();
87///
88/// // Producer
89/// let data = [0xAA, 0xBB, 0xCC, 0xDD];
90/// queue.post(&data, 1000).unwrap();
91/// ```
92pub struct Queue (QueueHandle);
93
94unsafe impl Send for Queue {}
95unsafe impl Sync for Queue {}
96
97impl Queue {
98 #[inline]
99 pub fn fetch_with_to_tick(&self, buffer: &mut [u8], time: impl ToTick) -> Result<()> {
100 self.fetch(buffer, time.to_ticks())
101 }
102
103 #[inline]
104 pub fn post_with_to_tick(&self, item: &[u8], time: impl ToTick) -> Result<()> {
105 self.post(item, time.to_ticks())
106 }
107}
108
109impl QueueFn for Queue {
110 fn new (size: UBaseType, message_size: super::types::UBaseType) -> Result<Self> {
111 let handle = unsafe { xQueueCreateCountingSemaphore(size, message_size) };
112 if handle.is_null() {
113 Err(Error::OutOfMemory)
114 } else {
115 Ok(Self (handle))
116 }
117 }
118
119 fn fetch(&self, buffer: &mut [u8], time: TickType) -> Result<()> {
120 let ret = unsafe {
121 xQueueReceive(
122 self.0,
123 buffer.as_mut_ptr() as *mut c_void,
124 time,
125 )
126 };
127 if ret == 0 {
128 Err(Error::Timeout)
129 } else {
130 Ok(())
131 }
132 }
133
134 fn fetch_from_isr(&self, buffer: &mut [u8]) -> Result<()> {
135
136 let mut task_woken_by_receive: BaseType = pdFALSE;
137
138 let ret = unsafe {
139 xQueueReceiveFromISR(
140 self.0,
141 buffer.as_mut_ptr() as *mut c_void,
142 &mut task_woken_by_receive
143 )
144 };
145 if ret == 0 {
146 Err(Error::Timeout)
147 } else {
148
149 System::yield_from_isr(task_woken_by_receive);
150
151 Ok(())
152 }
153 }
154
155 fn post(&self, item: &[u8], time: TickType) -> Result<()> {
156 let ret = xQueueSendToBack!(
157 self.0,
158 item.as_ptr() as *const c_void,
159 time
160 );
161
162 if ret == 0 {
163 Err(Error::Timeout)
164 } else {
165 Ok(())
166 }
167 }
168
169 fn post_from_isr(&self, item: &[u8]) -> Result<()> {
170
171 let mut task_woken_by_receive: BaseType = pdFALSE;
172
173 let ret = xQueueSendToBackFromISR!(
174 self.0,
175 item.as_ptr() as *const c_void,
176 &mut task_woken_by_receive
177 );
178
179 if ret == 0 {
180 Err(Error::Timeout)
181 } else {
182 System::yield_from_isr(task_woken_by_receive);
183
184 Ok(())
185 }
186 }
187
188 fn delete(&mut self) {
189 unsafe {
190 vQueueDelete(self.0);
191 self.0 = core::ptr::null_mut();
192 }
193 }
194}
195
196impl Drop for Queue {
197 fn drop(&mut self) {
198 if self.0.is_null() {
199 return;
200 }
201 self.delete();
202 }
203}
204
205impl Deref for Queue {
206 type Target = QueueHandle;
207
208 fn deref(&self) -> &Self::Target {
209 &self.0
210 }
211}
212
213impl Debug for Queue {
214 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
215 f.debug_struct("Queue")
216 .field("handle", &self.0)
217 .finish()
218 }
219}
220
221impl Display for Queue {
222 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
223 write!(f, "Queue {{ handle: {:?} }}", self.0)
224 }
225}
226
227/// A type-safe FIFO queue for message passing.
228///
229/// Unlike [`Queue`], which works with raw byte slices, `QueueStreamed` provides
230/// a type-safe interface for sending and receiving structured data. The type must
231/// implement serialization traits.
232///
233/// # Type Parameters
234///
235/// * `T` - The message type. Must implement `ToBytes`, `BytesHasLen`, and `FromBytes`
236///
237/// # Examples
238///
239/// ## Basic typed queue usage
240///
241/// ```ignore
242/// use osal_rs::os::{QueueStreamed, QueueStreamedFn};
243/// use core::time::Duration;
244///
245/// #[derive(Debug, Clone, Copy)]
246/// struct Message {
247/// id: u32,
248/// value: i16,
249/// }
250///
251/// // Assuming Message implements the required traits
252/// let queue: QueueStreamed<Message> = QueueStreamed::new(10, size_of::<Message>()).unwrap();
253///
254/// // Send a message
255/// let msg = Message { id: 1, value: 42 };
256/// queue.post_with_to_tick(&msg, Duration::from_millis(100)).unwrap();
257///
258/// // Receive a message
259/// let mut received = Message { id: 0, value: 0 };
260/// queue.fetch_with_to_tick(&mut received, Duration::from_millis(100)).unwrap();
261/// assert_eq!(received.id, 1);
262/// assert_eq!(received.value, 42);
263/// ```
264///
265/// ## Command queue pattern
266///
267/// ```ignore
268/// use osal_rs::os::{QueueStreamed, Thread};
269/// use alloc::sync::Arc;
270///
271/// enum Command {
272/// Start,
273/// Stop,
274/// SetValue(u32),
275/// }
276///
277/// let cmd_queue = Arc::new(QueueStreamed::<Command>::new(10, 8).unwrap());
278/// let queue_clone = cmd_queue.clone();
279///
280/// let handler = Thread::new("handler", 2048, 5, move || {
281/// loop {
282/// let mut cmd = Command::Stop;
283/// if queue_clone.fetch(&mut cmd, 1000).is_ok() {
284/// match cmd {
285/// Command::Start => { /* start operation */ },
286/// Command::Stop => { /* stop operation */ },
287/// Command::SetValue(val) => { /* set value */ },
288/// }
289/// }
290/// }
291/// }).unwrap();
292/// ```
293pub struct QueueStreamed<T: ToBytes + BytesHasLen + FromBytes> (Queue, PhantomData<T>);
294
295unsafe impl<T: ToBytes + BytesHasLen + FromBytes> Send for QueueStreamed<T> {}
296unsafe impl<T: ToBytes + BytesHasLen + FromBytes> Sync for QueueStreamed<T> {}
297
298impl<T> QueueStreamed<T>
299where
300 T: ToBytes + BytesHasLen + FromBytes {
301 #[inline]
302 fn fetch_with_to_tick(&self, buffer: &mut T, time: impl ToTick) -> Result<()> {
303 self.fetch(buffer, time.to_ticks())
304 }
305
306 #[inline]
307 fn post_with_to_tick(&self, item: &T, time: impl ToTick) -> Result<()> {
308 self.post(item, time.to_ticks())
309 }
310}
311
312impl<T> QueueStreamedFn<T> for QueueStreamed<T>
313where
314 T: ToBytes + BytesHasLen + FromBytes {
315
316 #[inline]
317 fn new (size: UBaseType, message_size: UBaseType) -> Result<Self> {
318 Ok(Self (Queue::new(size, message_size)?, PhantomData))
319 }
320
321 fn fetch(&self, buffer: &mut T, time: TickType) -> Result<()> {
322 let mut buf_bytes = vec![0u8; buffer.len()];
323
324 if let Ok(()) = self.0.fetch(&mut buf_bytes, time) {
325 *buffer = T::from_bytes(&buf_bytes)?;
326 Ok(())
327 } else {
328 Err(Error::Timeout)
329 }
330 }
331
332 fn fetch_from_isr(&self, buffer: &mut T) -> Result<()> {
333 let mut buf_bytes = vec![0u8; buffer.len()];
334
335 if let Ok(()) = self.0.fetch_from_isr(&mut buf_bytes) {
336 *buffer = T::from_bytes(&buf_bytes)?;
337 Ok(())
338 } else {
339 Err(Error::Timeout)
340 }
341 }
342
343 #[inline]
344 fn post(&self, item: &T, time: TickType) -> Result<()> {
345 self.0.post(&item.to_bytes(), time)
346 }
347
348 #[inline]
349 fn post_from_isr(&self, item: &T) -> Result<()> {
350 self.0.post_from_isr(&item.to_bytes())
351 }
352
353 #[inline]
354 fn delete(&mut self) {
355 self.0.delete()
356 }
357}
358
359impl<T> Deref for QueueStreamed<T>
360where
361 T: ToBytes + BytesHasLen + FromBytes {
362 type Target = QueueHandle;
363
364 fn deref(&self) -> &Self::Target {
365 &self.0.0
366 }
367}
368
369impl<T> Debug for QueueStreamed<T>
370where
371 T: ToBytes + BytesHasLen + FromBytes {
372 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
373 f.debug_struct("QueueStreamed")
374 .field("handle", &self.0.0)
375 .finish()
376 }
377}
378
379impl<T> Display for QueueStreamed<T>
380where
381 T: ToBytes + BytesHasLen + FromBytes {
382 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
383 write!(f, "QueueStreamed {{ handle: {:?} }}", self.0.0)
384 }
385}