osal_rs/freertos/queue.rs
1/***************************************************************************
2 *
3 * osal-rs
4 * Copyright (C) 2026 Antonio Salsi <passy.linux@zresa.it>
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 *
18 ***************************************************************************/
19
20//! Queue-based inter-thread communication for FreeRTOS.
21//!
22//! This module provides FIFO queue primitives for safe message passing between threads
23//! and interrupt service routines. Supports both byte-based and typed queues.
24
25use core::ffi::c_void;
26use core::fmt::{Debug, Display};
27use core::marker::PhantomData;
28use core::ops::Deref;
29
30use alloc::vec::Vec;
31
32use super::ffi::{QueueHandle, pdFALSE, vQueueDelete, xQueueCreateCountingSemaphore, xQueueReceive, xQueueReceiveFromISR};
33use super::types::{BaseType, UBaseType, TickType};
34use super::system::System;
35use crate::traits::{ToTick, QueueFn, SystemFn, QueueStreamedFn, BytesHasLen};
36#[cfg(not(feature = "serde"))]
37use crate::traits::{Serialize, Deserialize};
38
39#[cfg(feature = "serde")]
40use osal_rs_serde::{Serialize, Deserialize, to_bytes};
41
42pub trait StructSerde : Serialize + BytesHasLen + Deserialize {}
43
44use crate::utils::{Result, Error};
45
46
47/// A FIFO queue for byte-based message passing.
48///
49/// Provides a thread-safe queue implementation for sending and receiving
50/// raw byte slices between threads. Supports both blocking and ISR-safe operations.
51///
52/// # Examples
53///
54/// ## Basic queue usage
55///
56/// ```ignore
57/// use osal_rs::os::{Queue, QueueFn};
58/// use core::time::Duration;
59///
60/// // Create a queue with 10 slots, each 32 bytes
61/// let queue = Queue::new(10, 32).unwrap();
62///
63/// // Send data
64/// let data = [1u8, 2, 3, 4];
65/// queue.post_with_to_tick(&data, Duration::from_millis(100)).unwrap();
66///
67/// // Receive data
68/// let mut buffer = [0u8; 4];
69/// queue.fetch_with_to_tick(&mut buffer, Duration::from_millis(100)).unwrap();
70/// assert_eq!(buffer, [1, 2, 3, 4]);
71/// ```
72///
73/// ## Producer-consumer pattern
74///
75/// ```ignore
76/// use osal_rs::os::{Queue, QueueFn, Thread};
77/// use alloc::sync::Arc;
78/// use core::time::Duration;
79///
80/// let queue = Arc::new(Queue::new(5, 4).unwrap());
81/// let queue_clone = queue.clone();
82///
83/// // Consumer thread
84/// let consumer = Thread::new("consumer", 2048, 5, move || {
85/// let mut buffer = [0u8; 4];
86/// loop {
87/// if queue_clone.fetch(&mut buffer, 1000).is_ok() {
88/// println!("Received: {:?}", buffer);
89/// }
90/// }
91/// }).unwrap();
92///
93/// consumer.start().unwrap();
94///
95/// // Producer
96/// let data = [0xAA, 0xBB, 0xCC, 0xDD];
97/// queue.post(&data, 1000).unwrap();
98/// ```
99pub struct Queue (QueueHandle);
100
101unsafe impl Send for Queue {}
102unsafe impl Sync for Queue {}
103
104impl Queue {
105 /// Creates a new queue.
106 ///
107 /// # Parameters
108 ///
109 /// * `size` - Maximum number of messages the queue can hold
110 /// * `message_size` - Size in bytes of each message
111 ///
112 /// # Returns
113 ///
114 /// * `Ok(Self)` - Successfully created queue
115 /// * `Err(Error)` - Creation failed (insufficient memory, etc.)
116 ///
117 /// # Examples
118 ///
119 /// ```ignore
120 /// use osal_rs::os::{Queue, QueueFn};
121 ///
122 /// // Queue for 5 messages of 16 bytes each
123 /// let queue = Queue::new(5, 16).unwrap();
124 /// ```
125 pub fn new (size: UBaseType, message_size: UBaseType) -> Result<Self> {
126 let handle = unsafe { xQueueCreateCountingSemaphore(size, message_size) };
127 if handle.is_null() {
128 Err(Error::OutOfMemory)
129 } else {
130 Ok(Self (handle))
131 }
132 }
133
134 /// Receives data from the queue with a convertible timeout.
135 ///
136 /// This is a convenience method that accepts any type implementing `ToTick`
137 /// (like `Duration`) and converts it to ticks before calling `fetch()`.
138 ///
139 /// # Arguments
140 ///
141 /// * `buffer` - Mutable slice to receive data into
142 /// * `time` - Timeout value (e.g., `Duration::from_millis(100)`)
143 ///
144 /// # Returns
145 ///
146 /// * `Ok(())` - Data successfully received
147 /// * `Err(Error::Timeout)` - No data available within timeout
148 ///
149 /// # Examples
150 ///
151 /// ```ignore
152 /// use osal_rs::os::{Queue, QueueFn};
153 /// use core::time::Duration;
154 ///
155 /// let queue = Queue::new(5, 16).unwrap();
156 /// let mut buffer = [0u8; 16];
157 /// queue.fetch_with_to_tick(&mut buffer, Duration::from_millis(100))?;
158 /// ```
159 #[inline]
160 pub fn fetch_with_to_tick(&self, buffer: &mut [u8], time: impl ToTick) -> Result<()> {
161 self.fetch(buffer, time.to_ticks())
162 }
163
164 /// Sends data to the queue with a convertible timeout.
165 ///
166 /// This is a convenience method that accepts any type implementing `ToTick`
167 /// (like `Duration`) and converts it to ticks before calling `post()`.
168 ///
169 /// # Arguments
170 ///
171 /// * `item` - Slice of data to send
172 /// * `time` - Timeout value (e.g., `Duration::from_millis(100)`)
173 ///
174 /// # Returns
175 ///
176 /// * `Ok(())` - Data successfully sent
177 /// * `Err(Error::Timeout)` - Queue full, could not send within timeout
178 ///
179 /// # Examples
180 ///
181 /// ```ignore
182 /// use osal_rs::os::{Queue, QueueFn};
183 /// use core::time::Duration;
184 ///
185 /// let queue = Queue::new(5, 16).unwrap();
186 /// let data = [1u8, 2, 3, 4];
187 /// queue.post_with_to_tick(&data, Duration::from_millis(100))?;
188 /// ```
189 #[inline]
190 pub fn post_with_to_tick(&self, item: &[u8], time: impl ToTick) -> Result<()> {
191 self.post(item, time.to_ticks())
192 }
193}
194
195impl QueueFn for Queue {
196
197 /// Receives data from the queue, blocking until data is available or timeout.
198 ///
199 /// This function blocks the calling thread until data is available or the
200 /// specified timeout expires.
201 ///
202 /// # Arguments
203 ///
204 /// * `buffer` - Mutable byte slice to receive data into
205 /// * `time` - Timeout in system ticks (0 = no wait, MAX = wait forever)
206 ///
207 /// # Returns
208 ///
209 /// * `Ok(())` - Data successfully received into buffer
210 /// * `Err(Error::Timeout)` - No data available within timeout period
211 ///
212 /// # Examples
213 ///
214 /// ```ignore
215 /// use osal_rs::os::{Queue, QueueFn};
216 ///
217 /// let queue = Queue::new(5, 16).unwrap();
218 /// let mut buffer = [0u8; 16];
219 ///
220 /// // Wait up to 1000 ticks for data
221 /// match queue.fetch(&mut buffer, 1000) {
222 /// Ok(()) => println!("Received data: {:?}", buffer),
223 /// Err(_) => println!("Timeout"),
224 /// }
225 /// ```
226 fn fetch(&self, buffer: &mut [u8], time: TickType) -> Result<()> {
227 let ret = unsafe {
228 xQueueReceive(
229 self.0,
230 buffer.as_mut_ptr() as *mut c_void,
231 time,
232 )
233 };
234 if ret == 0 {
235 Err(Error::Timeout)
236 } else {
237 Ok(())
238 }
239 }
240
241 /// Receives data from the queue in an interrupt service routine (ISR).
242 ///
243 /// This is the ISR-safe version of `fetch()`. It does not block and will
244 /// trigger a context switch if a higher priority task is woken.
245 ///
246 /// # Arguments
247 ///
248 /// * `buffer` - Mutable byte slice to receive data into
249 ///
250 /// # Returns
251 ///
252 /// * `Ok(())` - Data successfully received
253 /// * `Err(Error::Timeout)` - Queue is empty
254 ///
255 /// # Safety
256 ///
257 /// Must only be called from ISR context.
258 ///
259 /// # Examples
260 ///
261 /// ```ignore
262 /// // In interrupt handler
263 /// use osal_rs::os::{Queue, QueueFn};
264 ///
265 /// fn irq_handler(queue: &Queue) {
266 /// let mut buffer = [0u8; 16];
267 /// if queue.fetch_from_isr(&mut buffer).is_ok() {
268 /// // Process received data
269 /// }
270 /// }
271 /// ```
272 fn fetch_from_isr(&self, buffer: &mut [u8]) -> Result<()> {
273
274 let mut task_woken_by_receive: BaseType = pdFALSE;
275
276 let ret = unsafe {
277 xQueueReceiveFromISR(
278 self.0,
279 buffer.as_mut_ptr() as *mut c_void,
280 &mut task_woken_by_receive
281 )
282 };
283 if ret == 0 {
284 Err(Error::Timeout)
285 } else {
286
287 System::yield_from_isr(task_woken_by_receive);
288
289 Ok(())
290 }
291 }
292
293 /// Sends data to the back of the queue, blocking until space is available.
294 ///
295 /// This function blocks the calling thread until space becomes available
296 /// or the timeout expires.
297 ///
298 /// # Arguments
299 ///
300 /// * `item` - Byte slice to send
301 /// * `time` - Timeout in system ticks (0 = no wait, MAX = wait forever)
302 ///
303 /// # Returns
304 ///
305 /// * `Ok(())` - Data successfully sent
306 /// * `Err(Error::Timeout)` - Queue full, could not send within timeout
307 ///
308 /// # Examples
309 ///
310 /// ```ignore
311 /// use osal_rs::os::{Queue, QueueFn};
312 ///
313 /// let queue = Queue::new(5, 16).unwrap();
314 /// let data = [0xAA, 0xBB, 0xCC, 0xDD];
315 ///
316 /// // Wait up to 1000 ticks to send
317 /// queue.post(&data, 1000)?;
318 /// ```
319 fn post(&self, item: &[u8], time: TickType) -> Result<()> {
320 let ret = xQueueSendToBack!(
321 self.0,
322 item.as_ptr() as *const c_void,
323 time
324 );
325
326 if ret == 0 {
327 Err(Error::Timeout)
328 } else {
329 Ok(())
330 }
331 }
332
333 /// Sends data to the queue from an interrupt service routine (ISR).
334 ///
335 /// This is the ISR-safe version of `post()`. It does not block and will
336 /// trigger a context switch if a higher priority task is woken.
337 ///
338 /// # Arguments
339 ///
340 /// * `item` - Byte slice to send
341 ///
342 /// # Returns
343 ///
344 /// * `Ok(())` - Data successfully sent
345 /// * `Err(Error::Timeout)` - Queue is full
346 ///
347 /// # Safety
348 ///
349 /// Must only be called from ISR context.
350 ///
351 /// # Examples
352 ///
353 /// ```ignore
354 /// // In interrupt handler
355 /// use osal_rs::os::{Queue, QueueFn};
356 ///
357 /// fn irq_handler(queue: &Queue) {
358 /// let data = [0x01, 0x02, 0x03];
359 /// queue.post_from_isr(&data).ok();
360 /// }
361 /// ```
362 fn post_from_isr(&self, item: &[u8]) -> Result<()> {
363
364 let mut task_woken_by_receive: BaseType = pdFALSE;
365
366 let ret = xQueueSendToBackFromISR!(
367 self.0,
368 item.as_ptr() as *const c_void,
369 &mut task_woken_by_receive
370 );
371
372 if ret == 0 {
373 Err(Error::Timeout)
374 } else {
375 System::yield_from_isr(task_woken_by_receive);
376
377 Ok(())
378 }
379 }
380
381 /// Deletes the queue and frees its resources.
382 ///
383 /// This function destroys the queue and releases any memory allocated for it.
384 /// After calling this, the queue should not be used. The handle is set to null.
385 ///
386 /// # Safety
387 ///
388 /// Ensure no threads are waiting on this queue before deleting it.
389 ///
390 /// # Examples
391 ///
392 /// ```ignore
393 /// use osal_rs::os::{Queue, QueueFn};
394 ///
395 /// let mut queue = Queue::new(5, 16).unwrap();
396 /// // Use the queue...
397 /// queue.delete();
398 /// ```
399 fn delete(&mut self) {
400 unsafe {
401 vQueueDelete(self.0);
402 self.0 = core::ptr::null_mut();
403 }
404 }
405}
406
407/// Automatically deletes the queue when it goes out of scope.
408///
409/// This ensures proper cleanup of FreeRTOS resources.
410impl Drop for Queue {
411 fn drop(&mut self) {
412 if self.0.is_null() {
413 return;
414 }
415 self.delete();
416 }
417}
418
419/// Allows dereferencing to the underlying FreeRTOS queue handle.
420impl Deref for Queue {
421 type Target = QueueHandle;
422
423 fn deref(&self) -> &Self::Target {
424 &self.0
425 }
426}
427
428/// Formats the queue for debugging purposes.
429impl Debug for Queue {
430 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
431 f.debug_struct("Queue")
432 .field("handle", &self.0)
433 .finish()
434 }
435}
436
437/// Formats the queue for display purposes.
438impl Display for Queue {
439 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
440 write!(f, "Queue {{ handle: {:?} }}", self.0)
441 }
442}
443
444/// A type-safe FIFO queue for message passing.
445///
446/// Unlike [`Queue`], which works with raw byte slices, `QueueStreamed` provides
447/// a type-safe interface for sending and receiving structured data. The type must
448/// implement serialization traits.
449///
450/// # Type Parameters
451///
452/// * `T` - The message type. Must implement `ToBytes`, `BytesHasLen`, and `FromBytes`
453///
454/// # Examples
455///
456/// ## Basic typed queue usage
457///
458/// ```ignore
459/// use osal_rs::os::{QueueStreamed, QueueStreamedFn};
460/// use core::time::Duration;
461///
462/// #[derive(Debug, Clone, Copy)]
463/// struct Message {
464/// id: u32,
465/// value: i16,
466/// }
467///
468/// // Assuming Message implements the required traits
469/// let queue: QueueStreamed<Message> = QueueStreamed::new(10, size_of::<Message>()).unwrap();
470///
471/// // Send a message
472/// let msg = Message { id: 1, value: 42 };
473/// queue.post_with_to_tick(&msg, Duration::from_millis(100)).unwrap();
474///
475/// // Receive a message
476/// let mut received = Message { id: 0, value: 0 };
477/// queue.fetch_with_to_tick(&mut received, Duration::from_millis(100)).unwrap();
478/// assert_eq!(received.id, 1);
479/// assert_eq!(received.value, 42);
480/// ```
481///
482/// ## Command queue pattern
483///
484/// ```ignore
485/// use osal_rs::os::{QueueStreamed, Thread};
486/// use alloc::sync::Arc;
487///
488/// enum Command {
489/// Start,
490/// Stop,
491/// SetValue(u32),
492/// }
493///
494/// let cmd_queue = Arc::new(QueueStreamed::<Command>::new(10, 8).unwrap());
495/// let queue_clone = cmd_queue.clone();
496///
497/// let handler = Thread::new("handler", 2048, 5, move || {
498/// loop {
499/// let mut cmd = Command::Stop;
500/// if queue_clone.fetch(&mut cmd, 1000).is_ok() {
501/// match cmd {
502/// Command::Start => { /* start operation */ },
503/// Command::Stop => { /* stop operation */ },
504/// Command::SetValue(val) => { /* set value */ },
505/// }
506/// }
507/// }
508/// }).unwrap();
509/// ```
510pub struct QueueStreamed<T: StructSerde> (Queue, PhantomData<T>);
511
512unsafe impl<T: StructSerde> Send for QueueStreamed<T> {}
513unsafe impl<T: StructSerde> Sync for QueueStreamed<T> {}
514
515impl<T> QueueStreamed<T>
516where
517 T: StructSerde {
518 /// Creates a new type-safe queue.
519 ///
520 /// # Parameters
521 ///
522 /// * `size` - Maximum number of messages
523 /// * `message_size` - Size of each message (typically `size_of::<T>()`)
524 ///
525 /// # Returns
526 ///
527 /// * `Ok(Self)` - Successfully created queue
528 /// * `Err(Error)` - Creation failed
529 #[inline]
530 pub fn new (size: UBaseType, message_size: UBaseType) -> Result<Self> {
531 Ok(Self (Queue::new(size, message_size)?, PhantomData))
532 }
533
534 /// Receives a typed message with a convertible timeout.
535 ///
536 /// This is a convenience method that accepts any type implementing `ToTick`.
537 ///
538 /// # Arguments
539 ///
540 /// * `buffer` - Mutable reference to receive the message into
541 /// * `time` - Timeout value (e.g., `Duration::from_millis(100)`)
542 ///
543 /// # Returns
544 ///
545 /// * `Ok(())` - Message successfully received and deserialized
546 /// * `Err(Error)` - Timeout or deserialization error
547 ///
548 /// # Examples
549 ///
550 /// ```ignore
551 /// use osal_rs::os::QueueStreamed;
552 /// use core::time::Duration;
553 ///
554 /// let queue: QueueStreamed<MyMessage> = QueueStreamed::new(5, size_of::<MyMessage>()).unwrap();
555 /// let mut msg = MyMessage::default();
556 /// queue.fetch_with_to_tick(&mut msg, Duration::from_millis(100))?;
557 /// ```
558 #[inline]
559 fn fetch_with_to_tick(&self, buffer: &mut T, time: impl ToTick) -> Result<()> {
560 self.fetch(buffer, time.to_ticks())
561 }
562
563 /// Sends a typed message with a convertible timeout.
564 ///
565 /// This is a convenience method that accepts any type implementing `ToTick`.
566 ///
567 /// # Arguments
568 ///
569 /// * `item` - Reference to the message to send
570 /// * `time` - Timeout value (e.g., `Duration::from_millis(100)`)
571 ///
572 /// # Returns
573 ///
574 /// * `Ok(())` - Message successfully serialized and sent
575 /// * `Err(Error)` - Timeout or serialization error
576 ///
577 /// # Examples
578 ///
579 /// ```ignore
580 /// use osal_rs::os::QueueStreamed;
581 /// use core::time::Duration;
582 ///
583 /// let queue: QueueStreamed<MyMessage> = QueueStreamed::new(5, size_of::<MyMessage>()).unwrap();
584 /// let msg = MyMessage { id: 1, value: 42 };
585 /// queue.post_with_to_tick(&msg, Duration::from_millis(100))?;
586 /// ```
587 #[inline]
588 fn post_with_to_tick(&self, item: &T, time: impl ToTick) -> Result<()> {
589 self.post(item, time.to_ticks())
590 }
591}
592
593#[cfg(not(feature = "serde"))]
594impl<T> QueueStreamedFn<T> for QueueStreamed<T>
595where
596 T: StructSerde {
597
598 /// Receives a typed message from the queue (without serde feature).
599 ///
600 /// Deserializes the message from bytes using the custom serialization traits.
601 ///
602 /// # Arguments
603 ///
604 /// * `buffer` - Mutable reference to receive the deserialized message
605 /// * `time` - Timeout in system ticks
606 ///
607 /// # Returns
608 ///
609 /// * `Ok(())` - Message successfully received and deserialized
610 /// * `Err(Error::Timeout)` - Queue empty or timeout
611 /// * `Err(Error)` - Deserialization error
612 fn fetch(&self, buffer: &mut T, time: TickType) -> Result<()> {
613 let mut buf_bytes = Vec::with_capacity(buffer.len());
614
615 if let Ok(()) = self.0.fetch(&mut buf_bytes, time) {
616 *buffer = T::from_bytes(&buf_bytes)?;
617 Ok(())
618 } else {
619 Err(Error::Timeout)
620 }
621 }
622
623 /// Receives a typed message from ISR context (without serde feature).
624 ///
625 /// ISR-safe version that does not block. Deserializes the message from bytes.
626 ///
627 /// # Arguments
628 ///
629 /// * `buffer` - Mutable reference to receive the deserialized message
630 ///
631 /// # Returns
632 ///
633 /// * `Ok(())` - Message successfully received and deserialized
634 /// * `Err(Error::Timeout)` - Queue is empty
635 /// * `Err(Error)` - Deserialization error
636 ///
637 /// # Safety
638 ///
639 /// Must only be called from ISR context.
640 fn fetch_from_isr(&self, buffer: &mut T) -> Result<()> {
641 let mut buf_bytes = Vec::with_capacity(buffer.len());
642
643 if let Ok(()) = self.0.fetch_from_isr(&mut buf_bytes) {
644 *buffer = T::from_bytes(&buf_bytes)?;
645 Ok(())
646 } else {
647 Err(Error::Timeout)
648 }
649 }
650
651 /// Sends a typed message to the queue (without serde feature).
652 ///
653 /// Serializes the message to bytes using the custom serialization traits.
654 ///
655 /// # Arguments
656 ///
657 /// * `item` - Reference to the message to send
658 /// * `time` - Timeout in system ticks
659 ///
660 /// # Returns
661 ///
662 /// * `Ok(())` - Message successfully serialized and sent
663 /// * `Err(Error::Timeout)` - Queue full
664 /// * `Err(Error)` - Serialization error
665 #[inline]
666 fn post(&self, item: &T, time: TickType) -> Result<()> {
667 self.0.post(&item.to_bytes(), time)
668 }
669
670 /// Sends a typed message from ISR context (without serde feature).
671 ///
672 /// ISR-safe version that does not block. Serializes the message to bytes.
673 ///
674 /// # Arguments
675 ///
676 /// * `item` - Reference to the message to send
677 ///
678 /// # Returns
679 ///
680 /// * `Ok(())` - Message successfully serialized and sent
681 /// * `Err(Error::Timeout)` - Queue is full
682 /// * `Err(Error)` - Serialization error
683 ///
684 /// # Safety
685 ///
686 /// Must only be called from ISR context.
687 #[inline]
688 fn post_from_isr(&self, item: &T) -> Result<()> {
689 self.0.post_from_isr(&item.to_bytes())
690 }
691
692 /// Deletes the typed queue.
693 ///
694 /// Delegates to the underlying byte queue's delete method.
695 #[inline]
696 fn delete(&mut self) {
697 self.0.delete()
698 }
699}
700
701#[cfg(feature = "serde")]
702impl<T> QueueStreamedFn<T> for QueueStreamed<T>
703where
704 T: StructSerde {
705
706 /// Receives a typed message from the queue (with serde feature).
707 ///
708 /// Deserializes the message from bytes using the serde framework.
709 ///
710 /// # Arguments
711 ///
712 /// * `buffer` - Mutable reference to receive the deserialized message
713 /// * `time` - Timeout in system ticks
714 ///
715 /// # Returns
716 ///
717 /// * `Ok(())` - Message successfully received and deserialized
718 /// * `Err(Error::Timeout)` - Queue empty or timeout
719 /// * `Err(Error::Unhandled)` - Deserialization error
720 fn fetch(&self, buffer: &mut T, time: TickType) -> Result<()> {
721 let mut buf_bytes = Vec::with_capacity(buffer.len());
722
723 if let Ok(()) = self.0.fetch(&mut buf_bytes, time) {
724
725 to_bytes(buffer, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
726
727 Ok(())
728 } else {
729 Err(Error::Timeout)
730 }
731 }
732
733 /// Receives a typed message from ISR context (with serde feature).
734 ///
735 /// ISR-safe version that does not block. Deserializes using serde.
736 ///
737 /// # Arguments
738 ///
739 /// * `buffer` - Mutable reference to receive the deserialized message
740 ///
741 /// # Returns
742 ///
743 /// * `Ok(())` - Message successfully received and deserialized
744 /// * `Err(Error::Timeout)` - Queue is empty
745 /// * `Err(Error::Unhandled)` - Deserialization error
746 ///
747 /// # Safety
748 ///
749 /// Must only be called from ISR context.
750 fn fetch_from_isr(&self, buffer: &mut T) -> Result<()> {
751 let mut buf_bytes = Vec::with_capacity(buffer.len());
752
753 if let Ok(()) = self.0.fetch_from_isr(&mut buf_bytes) {
754 to_bytes(buffer, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
755 Ok(())
756 } else {
757 Err(Error::Timeout)
758 }
759 }
760
761 /// Sends a typed message to the queue (with serde feature).
762 ///
763 /// Serializes the message to bytes using the serde framework.
764 ///
765 /// # Arguments
766 ///
767 /// * `item` - Reference to the message to send
768 /// * `time` - Timeout in system ticks
769 ///
770 /// # Returns
771 ///
772 /// * `Ok(())` - Message successfully serialized and sent
773 /// * `Err(Error::Timeout)` - Queue full
774 /// * `Err(Error::Unhandled)` - Serialization error
775 fn post(&self, item: &T, time: TickType) -> Result<()> {
776
777
778 let mut buf_bytes = Vec::with_capacity(item.len());
779
780 to_bytes(item, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
781
782 self.0.post(&buf_bytes, time)
783 }
784
785 /// Sends a typed message from ISR context (with serde feature).
786 ///
787 /// ISR-safe version that does not block. Serializes using serde.
788 ///
789 /// # Arguments
790 ///
791 /// * `item` - Reference to the message to send
792 ///
793 /// # Returns
794 ///
795 /// * `Ok(())` - Message successfully serialized and sent
796 /// * `Err(Error::Timeout)` - Queue is full
797 /// * `Err(Error::Unhandled)` - Serialization error
798 ///
799 /// # Safety
800 ///
801 /// Must only be called from ISR context.
802 fn post_from_isr(&self, item: &T) -> Result<()> {
803
804 let mut buf_bytes = Vec::with_capacity(item.len());
805
806 to_bytes(item, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
807
808 self.0.post_from_isr(&buf_bytes)
809 }
810
811 /// Deletes the typed queue (serde version).
812 ///
813 /// Delegates to the underlying byte queue's delete method.
814 #[inline]
815 fn delete(&mut self) {
816 self.0.delete()
817 }
818}
819
820/// Allows dereferencing to the underlying FreeRTOS queue handle.
821impl<T> Deref for QueueStreamed<T>
822where
823 T: StructSerde {
824 type Target = QueueHandle;
825
826 fn deref(&self) -> &Self::Target {
827 &self.0.0
828 }
829}
830
831/// Formats the typed queue for debugging purposes.
832impl<T> Debug for QueueStreamed<T>
833where
834 T: StructSerde {
835 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
836 f.debug_struct("QueueStreamed")
837 .field("handle", &self.0.0)
838 .finish()
839 }
840}
841
842/// Formats the typed queue for display purposes.
843impl<T> Display for QueueStreamed<T>
844where
845 T: StructSerde {
846 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
847 write!(f, "QueueStreamed {{ handle: {:?} }}", self.0.0)
848 }
849}
850