osal_rs/freertos/
queue.rs1use core::ffi::c_void;
21use core::fmt::{Debug, Display};
22use core::marker::PhantomData;
23use core::ops::Deref;
24
25use alloc::vec;
26
27use super::ffi::{QueueHandle, pdFALSE, vQueueDelete, xQueueCreateCountingSemaphore, xQueueReceive, xQueueReceiveFromISR};
28use super::types::{BaseType, UBaseType, TickType};
29use super::system::System;
30use crate::traits::{ToTick, QueueFn, SystemFn, QueueStreamedFn, ToBytes, BytesHasLen, FromBytes};
31use crate::utils::{Result, Error};
32use crate::{xQueueSendToBack, xQueueSendToBackFromISR};
33
34
35pub struct Queue (QueueHandle);
36
37unsafe impl Send for Queue {}
38unsafe impl Sync for Queue {}
39
40impl Queue {
41 #[inline]
42 pub fn fetch_with_to_tick(&self, buffer: &mut [u8], time: impl ToTick) -> Result<()> {
43 self.fetch(buffer, time.to_ticks())
44 }
45
46 #[inline]
47 pub fn post_with_to_tick(&self, item: &[u8], time: impl ToTick) -> Result<()> {
48 self.post(item, time.to_ticks())
49 }
50}
51
52impl QueueFn for Queue {
53 fn new (size: UBaseType, message_size: super::types::UBaseType) -> Result<Self> {
54 let handle = unsafe { xQueueCreateCountingSemaphore(size, message_size) };
55 if handle.is_null() {
56 Err(Error::OutOfMemory)
57 } else {
58 Ok(Self (handle))
59 }
60 }
61
62 fn fetch(&self, buffer: &mut [u8], time: TickType) -> Result<()> {
63 let ret = unsafe {
64 xQueueReceive(
65 self.0,
66 buffer.as_mut_ptr() as *mut c_void,
67 time,
68 )
69 };
70 if ret == 0 {
71 Err(Error::Timeout)
72 } else {
73 Ok(())
74 }
75 }
76
77 fn fetch_from_isr(&self, buffer: &mut [u8]) -> Result<()> {
78
79 let mut task_woken_by_receive: BaseType = pdFALSE;
80
81 let ret = unsafe {
82 xQueueReceiveFromISR(
83 self.0,
84 buffer.as_mut_ptr() as *mut c_void,
85 &mut task_woken_by_receive
86 )
87 };
88 if ret == 0 {
89 Err(Error::Timeout)
90 } else {
91
92 System::yield_from_isr(task_woken_by_receive);
93
94 Ok(())
95 }
96 }
97
98 fn post(&self, item: &[u8], time: TickType) -> Result<()> {
99 let ret = xQueueSendToBack!(
100 self.0,
101 item.as_ptr() as *const c_void,
102 time
103 );
104
105 if ret == 0 {
106 Err(Error::Timeout)
107 } else {
108 Ok(())
109 }
110 }
111
112 fn post_from_isr(&self, item: &[u8]) -> Result<()> {
113
114 let mut task_woken_by_receive: BaseType = pdFALSE;
115
116 let ret = xQueueSendToBackFromISR!(
117 self.0,
118 item.as_ptr() as *const c_void,
119 &mut task_woken_by_receive
120 );
121
122 if ret == 0 {
123 Err(Error::Timeout)
124 } else {
125 System::yield_from_isr(task_woken_by_receive);
126
127 Ok(())
128 }
129 }
130
131 fn delete(&mut self) {
132 unsafe {
133 vQueueDelete(self.0);
134 self.0 = core::ptr::null_mut();
135 }
136 }
137}
138
139impl Drop for Queue {
140 fn drop(&mut self) {
141 if self.0.is_null() {
142 return;
143 }
144 self.delete();
145 }
146}
147
148impl Deref for Queue {
149 type Target = QueueHandle;
150
151 fn deref(&self) -> &Self::Target {
152 &self.0
153 }
154}
155
156impl Debug for Queue {
157 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
158 f.debug_struct("Queue")
159 .field("handle", &self.0)
160 .finish()
161 }
162}
163
164impl Display for Queue {
165 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
166 write!(f, "Queue {{ handle: {:?} }}", self.0)
167 }
168}
169
170pub struct QueueStreamed<T: ToBytes + BytesHasLen + FromBytes> (Queue, PhantomData<T>);
171
172unsafe impl<T: ToBytes + BytesHasLen + FromBytes> Send for QueueStreamed<T> {}
173unsafe impl<T: ToBytes + BytesHasLen + FromBytes> Sync for QueueStreamed<T> {}
174
175impl<T> QueueStreamed<T>
176where
177 T: ToBytes + BytesHasLen + FromBytes {
178 #[inline]
179 fn fetch_with_to_tick(&self, buffer: &mut T, time: impl ToTick) -> Result<()> {
180 self.fetch(buffer, time.to_ticks())
181 }
182
183 #[inline]
184 fn post_with_to_tick(&self, item: &T, time: impl ToTick) -> Result<()> {
185 self.post(item, time.to_ticks())
186 }
187}
188
189impl<T> QueueStreamedFn<T> for QueueStreamed<T>
190where
191 T: ToBytes + BytesHasLen + FromBytes {
192
193 #[inline]
194 fn new (size: UBaseType, message_size: UBaseType) -> Result<Self> {
195 Ok(Self (Queue::new(size, message_size)?, PhantomData))
196 }
197
198 fn fetch(&self, buffer: &mut T, time: TickType) -> Result<()> {
199 let mut buf_bytes = vec![0u8; buffer.len()];
200
201 if let Ok(()) = self.0.fetch(&mut buf_bytes, time) {
202 *buffer = T::from_bytes(&buf_bytes)?;
203 Ok(())
204 } else {
205 Err(Error::Timeout)
206 }
207 }
208
209 fn fetch_from_isr(&self, buffer: &mut T) -> Result<()> {
210 let mut buf_bytes = vec![0u8; buffer.len()];
211
212 if let Ok(()) = self.0.fetch_from_isr(&mut buf_bytes) {
213 *buffer = T::from_bytes(&buf_bytes)?;
214 Ok(())
215 } else {
216 Err(Error::Timeout)
217 }
218 }
219
220 #[inline]
221 fn post(&self, item: &T, time: TickType) -> Result<()> {
222 self.0.post(&item.to_bytes(), time)
223 }
224
225 #[inline]
226 fn post_from_isr(&self, item: &T) -> Result<()> {
227 self.0.post_from_isr(&item.to_bytes())
228 }
229
230 #[inline]
231 fn delete(&mut self) {
232 self.0.delete()
233 }
234}
235
236impl<T> Deref for QueueStreamed<T>
237where
238 T: ToBytes + BytesHasLen + FromBytes {
239 type Target = QueueHandle;
240
241 fn deref(&self) -> &Self::Target {
242 &self.0.0
243 }
244}
245
246impl<T> Debug for QueueStreamed<T>
247where
248 T: ToBytes + BytesHasLen + FromBytes {
249 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
250 f.debug_struct("QueueStreamed")
251 .field("handle", &self.0.0)
252 .finish()
253 }
254}
255
256impl<T> Display for QueueStreamed<T>
257where
258 T: ToBytes + BytesHasLen + FromBytes {
259 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
260 write!(f, "QueueStreamed {{ handle: {:?} }}", self.0.0)
261 }
262}