osal_rs/freertos/
queue.rs

1/***************************************************************************
2 *
3 * osal-rs
4 * Copyright (C) 2023/2026 Antonio Salsi <passy.linux@zresa.it>
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 *
18 ***************************************************************************/
19
20use core::ffi::c_void;
21use core::fmt::{Debug, Display};
22use core::marker::PhantomData;
23use core::ops::Deref;
24
25use alloc::vec;
26
27use super::ffi::{QueueHandle, pdFALSE, vQueueDelete, xQueueCreateCountingSemaphore, xQueueReceive, xQueueReceiveFromISR};
28use super::types::{BaseType, UBaseType, TickType};
29use super::system::System;
30use crate::traits::{ToTick, QueueFn, SystemFn, QueueStreamedFn, ToBytes, BytesHasLen, FromBytes};
31use crate::utils::{Result, Error};
32use crate::{xQueueSendToBack, xQueueSendToBackFromISR};
33
34
35pub struct Queue (QueueHandle);
36
37unsafe impl Send for Queue {}
38unsafe impl Sync for Queue {}
39
40impl Queue {
41    #[inline]
42    pub fn fetch_with_to_tick(&self, buffer: &mut [u8], time: impl ToTick) -> Result<()> {
43        self.fetch(buffer, time.to_ticks())
44    }
45
46    #[inline]
47    pub fn post_with_to_tick(&self, item: &[u8], time: impl ToTick) -> Result<()> {
48        self.post(item, time.to_ticks())
49    }
50}
51
52impl QueueFn for Queue {
53    fn new (size: UBaseType, message_size: super::types::UBaseType) -> Result<Self> {
54        let handle = unsafe { xQueueCreateCountingSemaphore(size, message_size) };
55        if handle.is_null() {
56            Err(Error::OutOfMemory)
57        } else {
58            Ok(Self (handle))
59        }
60    }
61
62    fn fetch(&self, buffer: &mut [u8], time: TickType) -> Result<()> {
63        let ret = unsafe {
64            xQueueReceive(
65                self.0,
66                buffer.as_mut_ptr() as *mut c_void,
67                time,
68            )
69        };
70        if ret == 0 {
71            Err(Error::Timeout)
72        } else {
73            Ok(())
74        }
75    }
76
77    fn fetch_from_isr(&self, buffer: &mut [u8]) -> Result<()> {
78
79        let mut task_woken_by_receive: BaseType = pdFALSE;
80
81        let ret = unsafe {
82            xQueueReceiveFromISR(
83                self.0,
84                buffer.as_mut_ptr() as *mut c_void,
85                &mut task_woken_by_receive
86            )
87        };
88        if ret == 0 {
89            Err(Error::Timeout)
90        } else {
91
92            System::yield_from_isr(task_woken_by_receive);
93            
94            Ok(())
95        }
96    }
97
98    fn post(&self, item: &[u8], time: TickType) -> Result<()> {
99        let ret = xQueueSendToBack!(
100                            self.0,
101                            item.as_ptr() as *const c_void,
102                            time
103                        );
104        
105        if ret == 0 {
106            Err(Error::Timeout)
107        } else {
108            Ok(())
109        }
110    }
111
112    fn post_from_isr(&self, item: &[u8]) -> Result<()> {
113
114        let mut task_woken_by_receive: BaseType = pdFALSE;
115
116        let ret = xQueueSendToBackFromISR!(
117                            self.0,
118                            item.as_ptr() as *const c_void,
119                            &mut task_woken_by_receive
120                        );
121        
122        if ret == 0 {
123            Err(Error::Timeout)
124        } else {
125            System::yield_from_isr(task_woken_by_receive);
126
127            Ok(())
128        }
129    }
130
131    fn delete(&mut self) {
132        unsafe {
133            vQueueDelete(self.0);
134            self.0 = core::ptr::null_mut();
135        }
136    }
137}
138
139impl Drop for Queue {
140    fn drop(&mut self) {
141        if self.0.is_null() {
142            return;
143        }
144        self.delete();
145    }
146}
147
148impl Deref for Queue {
149    type Target = QueueHandle;
150
151    fn deref(&self) -> &Self::Target {
152        &self.0
153    }
154}
155
156impl Debug for Queue {
157    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
158        f.debug_struct("Queue")
159            .field("handle", &self.0)
160            .finish()
161    }
162}
163
164impl Display for Queue {
165    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
166        write!(f, "Queue {{ handle: {:?} }}", self.0)
167    }
168}
169
170pub struct QueueStreamed<T: ToBytes + BytesHasLen + FromBytes> (Queue, PhantomData<T>);
171
172unsafe impl<T: ToBytes + BytesHasLen + FromBytes> Send for QueueStreamed<T> {}
173unsafe impl<T: ToBytes + BytesHasLen + FromBytes> Sync for QueueStreamed<T> {}
174
175impl<T> QueueStreamed<T> 
176where 
177    T: ToBytes + BytesHasLen + FromBytes {
178    #[inline]
179    fn fetch_with_to_tick(&self, buffer: &mut T, time: impl ToTick) -> Result<()> {
180        self.fetch(buffer, time.to_ticks())
181    }
182
183    #[inline]
184    fn post_with_to_tick(&self, item: &T, time: impl ToTick) -> Result<()> {
185        self.post(item, time.to_ticks())
186    }
187}
188
189impl<T> QueueStreamedFn<T> for QueueStreamed<T> 
190where 
191    T: ToBytes + BytesHasLen + FromBytes {
192
193    #[inline]
194    fn new (size: UBaseType, message_size: UBaseType) -> Result<Self> {
195        Ok(Self (Queue::new(size, message_size)?, PhantomData))
196    }
197
198    fn fetch(&self, buffer: &mut T, time: TickType) -> Result<()> {
199        let mut buf_bytes = vec![0u8; buffer.len()];        
200
201        if let Ok(()) = self.0.fetch(&mut buf_bytes, time) {
202            *buffer = T::from_bytes(&buf_bytes)?;
203            Ok(())
204        } else {
205            Err(Error::Timeout)
206        }
207    }
208
209    fn fetch_from_isr(&self, buffer: &mut T) -> Result<()> {
210        let mut buf_bytes = vec![0u8; buffer.len()];        
211
212        if let Ok(()) = self.0.fetch_from_isr(&mut buf_bytes) {
213            *buffer = T::from_bytes(&buf_bytes)?;
214            Ok(())
215        } else {
216            Err(Error::Timeout)
217        }
218    }
219
220    #[inline]
221    fn post(&self, item: &T, time: TickType) -> Result<()> {
222        self.0.post(&item.to_bytes(), time)
223    }
224
225    #[inline]
226    fn post_from_isr(&self, item: &T) -> Result<()> {
227        self.0.post_from_isr(&item.to_bytes())
228    }
229
230    #[inline]
231    fn delete(&mut self) {
232        self.0.delete()
233    }
234}
235
236impl<T> Deref for QueueStreamed<T> 
237where 
238    T: ToBytes + BytesHasLen + FromBytes {
239    type Target = QueueHandle;
240
241    fn deref(&self) -> &Self::Target {
242        &self.0.0
243    }   
244}
245
246impl<T> Debug for QueueStreamed<T> 
247where 
248    T: ToBytes + BytesHasLen + FromBytes {
249    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
250        f.debug_struct("QueueStreamed")
251            .field("handle", &self.0.0)
252            .finish()
253    }
254}
255
256impl<T> Display for QueueStreamed<T> 
257where 
258    T: ToBytes + BytesHasLen + FromBytes {
259    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
260        write!(f, "QueueStreamed {{ handle: {:?} }}", self.0.0)
261    }
262}