osal_rs/freertos/queue.rs
1/***************************************************************************
2 *
3 * osal-rs
4 * Copyright (C) 2026 Antonio Salsi <passy.linux@zresa.it>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, see <https://www.gnu.org/licenses/>.
18 *
19 ***************************************************************************/
20
21//! Queue-based inter-thread communication for FreeRTOS.
22//!
23//! This module provides FIFO queue primitives for safe message passing between threads
24//! and interrupt service routines. Supports both byte-based and typed queues.
25
26use core::ffi::c_void;
27use core::fmt::{Debug, Display};
28use core::marker::PhantomData;
29use core::ops::Deref;
30
31use alloc::vec::Vec;
32
33use super::ffi::{QueueHandle, pdFALSE, vQueueDelete, xQueueGenericCreate, xQueueReceive, xQueueReceiveFromISR};
34use super::types::{BaseType, UBaseType, TickType};
35use super::system::System;
36use crate::traits::{ToTick, QueueFn, SystemFn, QueueStreamedFn, BytesHasLen};
37#[cfg(not(feature = "serde"))]
38use crate::traits::{Serialize, Deserialize};
39
40#[cfg(feature = "serde")]
41use osal_rs_serde::{Serialize, Deserialize, to_bytes};
42
43pub trait StructSerde : Serialize + BytesHasLen + Deserialize {}
44
45use crate::utils::{Result, Error};
46
47
48/// A FIFO queue for byte-based message passing.
49///
50/// Provides a thread-safe queue implementation for sending and receiving
51/// raw byte slices between threads. Supports both blocking and ISR-safe operations.
52///
53/// # Examples
54///
55/// ## Basic queue usage
56///
57/// ```ignore
58/// use osal_rs::os::{Queue, QueueFn};
59/// use core::time::Duration;
60///
61/// // Create a queue with 10 slots, each 32 bytes
62/// let queue = Queue::new(10, 32).unwrap();
63///
64/// // Send data
65/// let data = [1u8, 2, 3, 4];
66/// queue.post_with_to_tick(&data, Duration::from_millis(100)).unwrap();
67///
68/// // Receive data
69/// let mut buffer = [0u8; 4];
70/// queue.fetch_with_to_tick(&mut buffer, Duration::from_millis(100)).unwrap();
71/// assert_eq!(buffer, [1, 2, 3, 4]);
72/// ```
73///
74/// ## Producer-consumer pattern
75///
76/// ```ignore
77/// use osal_rs::os::{Queue, QueueFn, Thread};
78/// use alloc::sync::Arc;
79/// use core::time::Duration;
80///
81/// let queue = Arc::new(Queue::new(5, 4).unwrap());
82/// let queue_clone = queue.clone();
83///
84/// // Consumer thread
85/// let consumer = Thread::new("consumer", 2048, 5, move || {
86/// let mut buffer = [0u8; 4];
87/// loop {
88/// if queue_clone.fetch(&mut buffer, 1000).is_ok() {
89/// println!("Received: {:?}", buffer);
90/// }
91/// }
92/// }).unwrap();
93///
94/// consumer.start().unwrap();
95///
96/// // Producer
97/// let data = [0xAA, 0xBB, 0xCC, 0xDD];
98/// queue.post(&data, 1000).unwrap();
99/// ```
100pub struct Queue (QueueHandle);
101
102unsafe impl Send for Queue {}
103unsafe impl Sync for Queue {}
104
105impl Queue {
106 /// Creates a new queue.
107 ///
108 /// # Parameters
109 ///
110 /// * `size` - Maximum number of messages the queue can hold
111 /// * `message_size` - Size in bytes of each message
112 ///
113 /// # Returns
114 ///
115 /// * `Ok(Self)` - Successfully created queue
116 /// * `Err(Error)` - Creation failed (insufficient memory, etc.)
117 ///
118 /// # Examples
119 ///
120 /// ```ignore
121 /// use osal_rs::os::{Queue, QueueFn};
122 ///
123 /// // Queue for 5 messages of 16 bytes each
124 /// let queue = Queue::new(5, 16).unwrap();
125 /// ```
126 pub fn new (size: UBaseType, message_size: UBaseType) -> Result<Self> {
127 const QUEUE_TYPE_BASE: u8 = 0; // queueQUEUE_TYPE_BASE
128 let handle = unsafe { xQueueGenericCreate(size, message_size, QUEUE_TYPE_BASE) };
129 if handle.is_null() {
130 Err(Error::OutOfMemory)
131 } else {
132 Ok(Self (handle))
133 }
134 }
135
136 /// Receives data from the queue with a convertible timeout.
137 ///
138 /// This is a convenience method that accepts any type implementing `ToTick`
139 /// (like `Duration`) and converts it to ticks before calling `fetch()`.
140 ///
141 /// # Arguments
142 ///
143 /// * `buffer` - Mutable slice to receive data into
144 /// * `time` - Timeout value (e.g., `Duration::from_millis(100)`)
145 ///
146 /// # Returns
147 ///
148 /// * `Ok(())` - Data successfully received
149 /// * `Err(Error::Timeout)` - No data available within timeout
150 ///
151 /// # Examples
152 ///
153 /// ```ignore
154 /// use osal_rs::os::{Queue, QueueFn};
155 /// use core::time::Duration;
156 ///
157 /// let queue = Queue::new(5, 16).unwrap();
158 /// let mut buffer = [0u8; 16];
159 /// queue.fetch_with_to_tick(&mut buffer, Duration::from_millis(100))?;
160 /// ```
161 #[inline]
162 pub fn fetch_with_to_tick(&self, buffer: &mut [u8], time: impl ToTick) -> Result<()> {
163 self.fetch(buffer, time.to_ticks())
164 }
165
166 /// Sends data to the queue with a convertible timeout.
167 ///
168 /// This is a convenience method that accepts any type implementing `ToTick`
169 /// (like `Duration`) and converts it to ticks before calling `post()`.
170 ///
171 /// # Arguments
172 ///
173 /// * `item` - Slice of data to send
174 /// * `time` - Timeout value (e.g., `Duration::from_millis(100)`)
175 ///
176 /// # Returns
177 ///
178 /// * `Ok(())` - Data successfully sent
179 /// * `Err(Error::Timeout)` - Queue full, could not send within timeout
180 ///
181 /// # Examples
182 ///
183 /// ```ignore
184 /// use osal_rs::os::{Queue, QueueFn};
185 /// use core::time::Duration;
186 ///
187 /// let queue = Queue::new(5, 16).unwrap();
188 /// let data = [1u8, 2, 3, 4];
189 /// queue.post_with_to_tick(&data, Duration::from_millis(100))?;
190 /// ```
191 #[inline]
192 pub fn post_with_to_tick(&self, item: &[u8], time: impl ToTick) -> Result<()> {
193 self.post(item, time.to_ticks())
194 }
195}
196
197impl QueueFn for Queue {
198
199 /// Receives data from the queue, blocking until data is available or timeout.
200 ///
201 /// This function blocks the calling thread until data is available or the
202 /// specified timeout expires.
203 ///
204 /// # Arguments
205 ///
206 /// * `buffer` - Mutable byte slice to receive data into
207 /// * `time` - Timeout in system ticks (0 = no wait, MAX = wait forever)
208 ///
209 /// # Returns
210 ///
211 /// * `Ok(())` - Data successfully received into buffer
212 /// * `Err(Error::Timeout)` - No data available within timeout period
213 ///
214 /// # Examples
215 ///
216 /// ```ignore
217 /// use osal_rs::os::{Queue, QueueFn};
218 ///
219 /// let queue = Queue::new(5, 16).unwrap();
220 /// let mut buffer = [0u8; 16];
221 ///
222 /// // Wait up to 1000 ticks for data
223 /// match queue.fetch(&mut buffer, 1000) {
224 /// Ok(()) => println!("Received data: {:?}", buffer),
225 /// Err(_) => println!("Timeout"),
226 /// }
227 /// ```
228 fn fetch(&self, buffer: &mut [u8], time: TickType) -> Result<()> {
229 let ret = unsafe {
230 xQueueReceive(
231 self.0,
232 buffer.as_mut_ptr() as *mut c_void,
233 time,
234 )
235 };
236 if ret == 0 {
237 Err(Error::Timeout)
238 } else {
239 Ok(())
240 }
241 }
242
243 /// Receives data from the queue in an interrupt service routine (ISR).
244 ///
245 /// This is the ISR-safe version of `fetch()`. It does not block and will
246 /// trigger a context switch if a higher priority task is woken.
247 ///
248 /// # Arguments
249 ///
250 /// * `buffer` - Mutable byte slice to receive data into
251 ///
252 /// # Returns
253 ///
254 /// * `Ok(())` - Data successfully received
255 /// * `Err(Error::Timeout)` - Queue is empty
256 ///
257 /// # Safety
258 ///
259 /// Must only be called from ISR context.
260 ///
261 /// # Examples
262 ///
263 /// ```ignore
264 /// // In interrupt handler
265 /// use osal_rs::os::{Queue, QueueFn};
266 ///
267 /// fn irq_handler(queue: &Queue) {
268 /// let mut buffer = [0u8; 16];
269 /// if queue.fetch_from_isr(&mut buffer).is_ok() {
270 /// // Process received data
271 /// }
272 /// }
273 /// ```
274 fn fetch_from_isr(&self, buffer: &mut [u8]) -> Result<()> {
275
276 let mut task_woken_by_receive: BaseType = pdFALSE;
277
278 let ret = unsafe {
279 xQueueReceiveFromISR(
280 self.0,
281 buffer.as_mut_ptr() as *mut c_void,
282 &mut task_woken_by_receive
283 )
284 };
285 if ret == 0 {
286 Err(Error::Timeout)
287 } else {
288
289 System::yield_from_isr(task_woken_by_receive);
290
291 Ok(())
292 }
293 }
294
295 /// Sends data to the back of the queue, blocking until space is available.
296 ///
297 /// This function blocks the calling thread until space becomes available
298 /// or the timeout expires.
299 ///
300 /// # Arguments
301 ///
302 /// * `item` - Byte slice to send
303 /// * `time` - Timeout in system ticks (0 = no wait, MAX = wait forever)
304 ///
305 /// # Returns
306 ///
307 /// * `Ok(())` - Data successfully sent
308 /// * `Err(Error::Timeout)` - Queue full, could not send within timeout
309 ///
310 /// # Examples
311 ///
312 /// ```ignore
313 /// use osal_rs::os::{Queue, QueueFn};
314 ///
315 /// let queue = Queue::new(5, 16).unwrap();
316 /// let data = [0xAA, 0xBB, 0xCC, 0xDD];
317 ///
318 /// // Wait up to 1000 ticks to send
319 /// queue.post(&data, 1000)?;
320 /// ```
321 fn post(&self, item: &[u8], time: TickType) -> Result<()> {
322 let ret = xQueueSendToBack!(
323 self.0,
324 item.as_ptr() as *const c_void,
325 time
326 );
327
328 if ret == 0 {
329 Err(Error::Timeout)
330 } else {
331 Ok(())
332 }
333 }
334
335 /// Sends data to the queue from an interrupt service routine (ISR).
336 ///
337 /// This is the ISR-safe version of `post()`. It does not block and will
338 /// trigger a context switch if a higher priority task is woken.
339 ///
340 /// # Arguments
341 ///
342 /// * `item` - Byte slice to send
343 ///
344 /// # Returns
345 ///
346 /// * `Ok(())` - Data successfully sent
347 /// * `Err(Error::Timeout)` - Queue is full
348 ///
349 /// # Safety
350 ///
351 /// Must only be called from ISR context.
352 ///
353 /// # Examples
354 ///
355 /// ```ignore
356 /// // In interrupt handler
357 /// use osal_rs::os::{Queue, QueueFn};
358 ///
359 /// fn irq_handler(queue: &Queue) {
360 /// let data = [0x01, 0x02, 0x03];
361 /// queue.post_from_isr(&data).ok();
362 /// }
363 /// ```
364 fn post_from_isr(&self, item: &[u8]) -> Result<()> {
365
366 let mut task_woken_by_receive: BaseType = pdFALSE;
367
368 let ret = xQueueSendToBackFromISR!(
369 self.0,
370 item.as_ptr() as *const c_void,
371 &mut task_woken_by_receive
372 );
373
374 if ret == 0 {
375 Err(Error::Timeout)
376 } else {
377 System::yield_from_isr(task_woken_by_receive);
378
379 Ok(())
380 }
381 }
382
383 /// Deletes the queue and frees its resources.
384 ///
385 /// This function destroys the queue and releases any memory allocated for it.
386 /// After calling this, the queue should not be used. The handle is set to null.
387 ///
388 /// # Safety
389 ///
390 /// Ensure no threads are waiting on this queue before deleting it.
391 ///
392 /// # Examples
393 ///
394 /// ```ignore
395 /// use osal_rs::os::{Queue, QueueFn};
396 ///
397 /// let mut queue = Queue::new(5, 16).unwrap();
398 /// // Use the queue...
399 /// queue.delete();
400 /// ```
401 fn delete(&mut self) {
402 unsafe {
403 vQueueDelete(self.0);
404 self.0 = core::ptr::null_mut();
405 }
406 }
407}
408
409/// Automatically deletes the queue when it goes out of scope.
410///
411/// This ensures proper cleanup of FreeRTOS resources.
412impl Drop for Queue {
413 fn drop(&mut self) {
414 if self.0.is_null() {
415 return;
416 }
417 self.delete();
418 }
419}
420
421/// Allows dereferencing to the underlying FreeRTOS queue handle.
422impl Deref for Queue {
423 type Target = QueueHandle;
424
425 fn deref(&self) -> &Self::Target {
426 &self.0
427 }
428}
429
430/// Formats the queue for debugging purposes.
431impl Debug for Queue {
432 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
433 f.debug_struct("Queue")
434 .field("handle", &self.0)
435 .finish()
436 }
437}
438
439/// Formats the queue for display purposes.
440impl Display for Queue {
441 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
442 write!(f, "Queue {{ handle: {:?} }}", self.0)
443 }
444}
445
446/// A type-safe FIFO queue for message passing.
447///
448/// Unlike [`Queue`], which works with raw byte slices, `QueueStreamed` provides
449/// a type-safe interface for sending and receiving structured data. The type must
450/// implement serialization traits.
451///
452/// # Type Parameters
453///
454/// * `T` - The message type. Must implement `ToBytes`, `BytesHasLen`, and `FromBytes`
455///
456/// # Examples
457///
458/// ## Basic typed queue usage
459///
460/// ```ignore
461/// use osal_rs::os::{QueueStreamed, QueueStreamedFn};
462/// use core::time::Duration;
463///
464/// #[derive(Debug, Clone, Copy)]
465/// struct Message {
466/// id: u32,
467/// value: i16,
468/// }
469///
470/// // Assuming Message implements the required traits
471/// let queue: QueueStreamed<Message> = QueueStreamed::new(10, size_of::<Message>()).unwrap();
472///
473/// // Send a message
474/// let msg = Message { id: 1, value: 42 };
475/// queue.post_with_to_tick(&msg, Duration::from_millis(100)).unwrap();
476///
477/// // Receive a message
478/// let mut received = Message { id: 0, value: 0 };
479/// queue.fetch_with_to_tick(&mut received, Duration::from_millis(100)).unwrap();
480/// assert_eq!(received.id, 1);
481/// assert_eq!(received.value, 42);
482/// ```
483///
484/// ## Command queue pattern
485///
486/// ```ignore
487/// use osal_rs::os::{QueueStreamed, Thread};
488/// use alloc::sync::Arc;
489///
490/// enum Command {
491/// Start,
492/// Stop,
493/// SetValue(u32),
494/// }
495///
496/// let cmd_queue = Arc::new(QueueStreamed::<Command>::new(10, 8).unwrap());
497/// let queue_clone = cmd_queue.clone();
498///
499/// let handler = Thread::new("handler", 2048, 5, move || {
500/// loop {
501/// let mut cmd = Command::Stop;
502/// if queue_clone.fetch(&mut cmd, 1000).is_ok() {
503/// match cmd {
504/// Command::Start => { /* start operation */ },
505/// Command::Stop => { /* stop operation */ },
506/// Command::SetValue(val) => { /* set value */ },
507/// }
508/// }
509/// }
510/// }).unwrap();
511/// ```
512pub struct QueueStreamed<T: StructSerde> (Queue, PhantomData<T>);
513
514unsafe impl<T: StructSerde> Send for QueueStreamed<T> {}
515unsafe impl<T: StructSerde> Sync for QueueStreamed<T> {}
516
517impl<T> QueueStreamed<T>
518where
519 T: StructSerde {
520 /// Creates a new type-safe queue.
521 ///
522 /// # Parameters
523 ///
524 /// * `size` - Maximum number of messages
525 /// * `message_size` - Size of each message (typically `size_of::<T>()`)
526 ///
527 /// # Returns
528 ///
529 /// * `Ok(Self)` - Successfully created queue
530 /// * `Err(Error)` - Creation failed
531 #[inline]
532 pub fn new (size: UBaseType, message_size: UBaseType) -> Result<Self> {
533 Ok(Self (Queue::new(size, message_size)?, PhantomData))
534 }
535
536 /// Receives a typed message with a convertible timeout.
537 ///
538 /// This is a convenience method that accepts any type implementing `ToTick`.
539 ///
540 /// # Arguments
541 ///
542 /// * `buffer` - Mutable reference to receive the message into
543 /// * `time` - Timeout value (e.g., `Duration::from_millis(100)`)
544 ///
545 /// # Returns
546 ///
547 /// * `Ok(())` - Message successfully received and deserialized
548 /// * `Err(Error)` - Timeout or deserialization error
549 ///
550 /// # Examples
551 ///
552 /// ```ignore
553 /// use osal_rs::os::QueueStreamed;
554 /// use core::time::Duration;
555 ///
556 /// let queue: QueueStreamed<MyMessage> = QueueStreamed::new(5, size_of::<MyMessage>()).unwrap();
557 /// let mut msg = MyMessage::default();
558 /// queue.fetch_with_to_tick(&mut msg, Duration::from_millis(100))?;
559 /// ```
560 #[inline]
561 fn fetch_with_to_tick(&self, buffer: &mut T, time: impl ToTick) -> Result<()> {
562 self.fetch(buffer, time.to_ticks())
563 }
564
565 /// Sends a typed message with a convertible timeout.
566 ///
567 /// This is a convenience method that accepts any type implementing `ToTick`.
568 ///
569 /// # Arguments
570 ///
571 /// * `item` - Reference to the message to send
572 /// * `time` - Timeout value (e.g., `Duration::from_millis(100)`)
573 ///
574 /// # Returns
575 ///
576 /// * `Ok(())` - Message successfully serialized and sent
577 /// * `Err(Error)` - Timeout or serialization error
578 ///
579 /// # Examples
580 ///
581 /// ```ignore
582 /// use osal_rs::os::QueueStreamed;
583 /// use core::time::Duration;
584 ///
585 /// let queue: QueueStreamed<MyMessage> = QueueStreamed::new(5, size_of::<MyMessage>()).unwrap();
586 /// let msg = MyMessage { id: 1, value: 42 };
587 /// queue.post_with_to_tick(&msg, Duration::from_millis(100))?;
588 /// ```
589 #[inline]
590 fn post_with_to_tick(&self, item: &T, time: impl ToTick) -> Result<()> {
591 self.post(item, time.to_ticks())
592 }
593}
594
595#[cfg(not(feature = "serde"))]
596impl<T> QueueStreamedFn<T> for QueueStreamed<T>
597where
598 T: StructSerde {
599
600 /// Receives a typed message from the queue (without serde feature).
601 ///
602 /// Deserializes the message from bytes using the custom serialization traits.
603 ///
604 /// # Arguments
605 ///
606 /// * `buffer` - Mutable reference to receive the deserialized message
607 /// * `time` - Timeout in system ticks
608 ///
609 /// # Returns
610 ///
611 /// * `Ok(())` - Message successfully received and deserialized
612 /// * `Err(Error::Timeout)` - Queue empty or timeout
613 /// * `Err(Error)` - Deserialization error
614 fn fetch(&self, buffer: &mut T, time: TickType) -> Result<()> {
615 let mut buf_bytes = Vec::with_capacity(buffer.len());
616
617 if let Ok(()) = self.0.fetch(&mut buf_bytes, time) {
618 *buffer = T::from_bytes(&buf_bytes)?;
619 Ok(())
620 } else {
621 Err(Error::Timeout)
622 }
623 }
624
625 /// Receives a typed message from ISR context (without serde feature).
626 ///
627 /// ISR-safe version that does not block. Deserializes the message from bytes.
628 ///
629 /// # Arguments
630 ///
631 /// * `buffer` - Mutable reference to receive the deserialized message
632 ///
633 /// # Returns
634 ///
635 /// * `Ok(())` - Message successfully received and deserialized
636 /// * `Err(Error::Timeout)` - Queue is empty
637 /// * `Err(Error)` - Deserialization error
638 ///
639 /// # Safety
640 ///
641 /// Must only be called from ISR context.
642 fn fetch_from_isr(&self, buffer: &mut T) -> Result<()> {
643 let mut buf_bytes = Vec::with_capacity(buffer.len());
644
645 if let Ok(()) = self.0.fetch_from_isr(&mut buf_bytes) {
646 *buffer = T::from_bytes(&buf_bytes)?;
647 Ok(())
648 } else {
649 Err(Error::Timeout)
650 }
651 }
652
653 /// Sends a typed message to the queue (without serde feature).
654 ///
655 /// Serializes the message to bytes using the custom serialization traits.
656 ///
657 /// # Arguments
658 ///
659 /// * `item` - Reference to the message to send
660 /// * `time` - Timeout in system ticks
661 ///
662 /// # Returns
663 ///
664 /// * `Ok(())` - Message successfully serialized and sent
665 /// * `Err(Error::Timeout)` - Queue full
666 /// * `Err(Error)` - Serialization error
667 #[inline]
668 fn post(&self, item: &T, time: TickType) -> Result<()> {
669 self.0.post(&item.to_bytes(), time)
670 }
671
672 /// Sends a typed message from ISR context (without serde feature).
673 ///
674 /// ISR-safe version that does not block. Serializes the message to bytes.
675 ///
676 /// # Arguments
677 ///
678 /// * `item` - Reference to the message to send
679 ///
680 /// # Returns
681 ///
682 /// * `Ok(())` - Message successfully serialized and sent
683 /// * `Err(Error::Timeout)` - Queue is full
684 /// * `Err(Error)` - Serialization error
685 ///
686 /// # Safety
687 ///
688 /// Must only be called from ISR context.
689 #[inline]
690 fn post_from_isr(&self, item: &T) -> Result<()> {
691 self.0.post_from_isr(&item.to_bytes())
692 }
693
694 /// Deletes the typed queue.
695 ///
696 /// Delegates to the underlying byte queue's delete method.
697 #[inline]
698 fn delete(&mut self) {
699 self.0.delete()
700 }
701}
702
703#[cfg(feature = "serde")]
704impl<T> QueueStreamedFn<T> for QueueStreamed<T>
705where
706 T: StructSerde {
707
708 /// Receives a typed message from the queue (with serde feature).
709 ///
710 /// Deserializes the message from bytes using the serde framework.
711 ///
712 /// # Arguments
713 ///
714 /// * `buffer` - Mutable reference to receive the deserialized message
715 /// * `time` - Timeout in system ticks
716 ///
717 /// # Returns
718 ///
719 /// * `Ok(())` - Message successfully received and deserialized
720 /// * `Err(Error::Timeout)` - Queue empty or timeout
721 /// * `Err(Error::Unhandled)` - Deserialization error
722 fn fetch(&self, buffer: &mut T, time: TickType) -> Result<()> {
723 let mut buf_bytes = Vec::with_capacity(buffer.len());
724
725 if let Ok(()) = self.0.fetch(&mut buf_bytes, time) {
726
727 to_bytes(buffer, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
728
729 Ok(())
730 } else {
731 Err(Error::Timeout)
732 }
733 }
734
735 /// Receives a typed message from ISR context (with serde feature).
736 ///
737 /// ISR-safe version that does not block. Deserializes using serde.
738 ///
739 /// # Arguments
740 ///
741 /// * `buffer` - Mutable reference to receive the deserialized message
742 ///
743 /// # Returns
744 ///
745 /// * `Ok(())` - Message successfully received and deserialized
746 /// * `Err(Error::Timeout)` - Queue is empty
747 /// * `Err(Error::Unhandled)` - Deserialization error
748 ///
749 /// # Safety
750 ///
751 /// Must only be called from ISR context.
752 fn fetch_from_isr(&self, buffer: &mut T) -> Result<()> {
753 let mut buf_bytes = Vec::with_capacity(buffer.len());
754
755 if let Ok(()) = self.0.fetch_from_isr(&mut buf_bytes) {
756 to_bytes(buffer, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
757 Ok(())
758 } else {
759 Err(Error::Timeout)
760 }
761 }
762
763 /// Sends a typed message to the queue (with serde feature).
764 ///
765 /// Serializes the message to bytes using the serde framework.
766 ///
767 /// # Arguments
768 ///
769 /// * `item` - Reference to the message to send
770 /// * `time` - Timeout in system ticks
771 ///
772 /// # Returns
773 ///
774 /// * `Ok(())` - Message successfully serialized and sent
775 /// * `Err(Error::Timeout)` - Queue full
776 /// * `Err(Error::Unhandled)` - Serialization error
777 fn post(&self, item: &T, time: TickType) -> Result<()> {
778
779
780 let mut buf_bytes = Vec::with_capacity(item.len());
781
782 to_bytes(item, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
783
784 self.0.post(&buf_bytes, time)
785 }
786
787 /// Sends a typed message from ISR context (with serde feature).
788 ///
789 /// ISR-safe version that does not block. Serializes using serde.
790 ///
791 /// # Arguments
792 ///
793 /// * `item` - Reference to the message to send
794 ///
795 /// # Returns
796 ///
797 /// * `Ok(())` - Message successfully serialized and sent
798 /// * `Err(Error::Timeout)` - Queue is full
799 /// * `Err(Error::Unhandled)` - Serialization error
800 ///
801 /// # Safety
802 ///
803 /// Must only be called from ISR context.
804 fn post_from_isr(&self, item: &T) -> Result<()> {
805
806 let mut buf_bytes = Vec::with_capacity(item.len());
807
808 to_bytes(item, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
809
810 self.0.post_from_isr(&buf_bytes)
811 }
812
813 /// Deletes the typed queue (serde version).
814 ///
815 /// Delegates to the underlying byte queue's delete method.
816 #[inline]
817 fn delete(&mut self) {
818 self.0.delete()
819 }
820}
821
822/// Allows dereferencing to the underlying FreeRTOS queue handle.
823impl<T> Deref for QueueStreamed<T>
824where
825 T: StructSerde {
826 type Target = QueueHandle;
827
828 fn deref(&self) -> &Self::Target {
829 &self.0.0
830 }
831}
832
833/// Formats the typed queue for debugging purposes.
834impl<T> Debug for QueueStreamed<T>
835where
836 T: StructSerde {
837 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
838 f.debug_struct("QueueStreamed")
839 .field("handle", &self.0.0)
840 .finish()
841 }
842}
843
844/// Formats the typed queue for display purposes.
845impl<T> Display for QueueStreamed<T>
846where
847 T: StructSerde {
848 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
849 write!(f, "QueueStreamed {{ handle: {:?} }}", self.0.0)
850 }
851}
852