osal_rs/freertos/
queue.rs1use core::ffi::c_void;
26use core::fmt::{Debug, Display};
27use core::marker::PhantomData;
28use core::ops::Deref;
29
30use alloc::vec::Vec;
31
32use super::ffi::{QueueHandle, pdFALSE, vQueueDelete, xQueueCreateCountingSemaphore, xQueueReceive, xQueueReceiveFromISR};
33use super::types::{BaseType, UBaseType, TickType};
34use super::system::System;
35use crate::traits::{ToTick, QueueFn, SystemFn, QueueStreamedFn, BytesHasLen};
36#[cfg(not(feature = "serde"))]
37use crate::traits::{Serialize, Deserialize};
38
39#[cfg(feature = "serde")]
40use osal_rs_serde::{Serialize, Deserialize, to_bytes};
41
42pub trait StructSerde : Serialize + BytesHasLen + Deserialize {}
43
44use crate::utils::{Result, Error};
45use crate::{xQueueSendToBack, xQueueSendToBackFromISR};
46
47
48pub struct Queue (QueueHandle);
101
102unsafe impl Send for Queue {}
103unsafe impl Sync for Queue {}
104
105impl Queue {
106 pub fn new (size: UBaseType, message_size: UBaseType) -> Result<Self> {
127 let handle = unsafe { xQueueCreateCountingSemaphore(size, message_size) };
128 if handle.is_null() {
129 Err(Error::OutOfMemory)
130 } else {
131 Ok(Self (handle))
132 }
133 }
134
135 #[inline]
136 pub fn fetch_with_to_tick(&self, buffer: &mut [u8], time: impl ToTick) -> Result<()> {
137 self.fetch(buffer, time.to_ticks())
138 }
139
140 #[inline]
141 pub fn post_with_to_tick(&self, item: &[u8], time: impl ToTick) -> Result<()> {
142 self.post(item, time.to_ticks())
143 }
144}
145
146impl QueueFn for Queue {
147
148 fn fetch(&self, buffer: &mut [u8], time: TickType) -> Result<()> {
149 let ret = unsafe {
150 xQueueReceive(
151 self.0,
152 buffer.as_mut_ptr() as *mut c_void,
153 time,
154 )
155 };
156 if ret == 0 {
157 Err(Error::Timeout)
158 } else {
159 Ok(())
160 }
161 }
162
163 fn fetch_from_isr(&self, buffer: &mut [u8]) -> Result<()> {
164
165 let mut task_woken_by_receive: BaseType = pdFALSE;
166
167 let ret = unsafe {
168 xQueueReceiveFromISR(
169 self.0,
170 buffer.as_mut_ptr() as *mut c_void,
171 &mut task_woken_by_receive
172 )
173 };
174 if ret == 0 {
175 Err(Error::Timeout)
176 } else {
177
178 System::yield_from_isr(task_woken_by_receive);
179
180 Ok(())
181 }
182 }
183
184 fn post(&self, item: &[u8], time: TickType) -> Result<()> {
185 let ret = xQueueSendToBack!(
186 self.0,
187 item.as_ptr() as *const c_void,
188 time
189 );
190
191 if ret == 0 {
192 Err(Error::Timeout)
193 } else {
194 Ok(())
195 }
196 }
197
198 fn post_from_isr(&self, item: &[u8]) -> Result<()> {
199
200 let mut task_woken_by_receive: BaseType = pdFALSE;
201
202 let ret = xQueueSendToBackFromISR!(
203 self.0,
204 item.as_ptr() as *const c_void,
205 &mut task_woken_by_receive
206 );
207
208 if ret == 0 {
209 Err(Error::Timeout)
210 } else {
211 System::yield_from_isr(task_woken_by_receive);
212
213 Ok(())
214 }
215 }
216
217 fn delete(&mut self) {
218 unsafe {
219 vQueueDelete(self.0);
220 self.0 = core::ptr::null_mut();
221 }
222 }
223}
224
225impl Drop for Queue {
226 fn drop(&mut self) {
227 if self.0.is_null() {
228 return;
229 }
230 self.delete();
231 }
232}
233
234impl Deref for Queue {
235 type Target = QueueHandle;
236
237 fn deref(&self) -> &Self::Target {
238 &self.0
239 }
240}
241
242impl Debug for Queue {
243 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
244 f.debug_struct("Queue")
245 .field("handle", &self.0)
246 .finish()
247 }
248}
249
250impl Display for Queue {
251 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
252 write!(f, "Queue {{ handle: {:?} }}", self.0)
253 }
254}
255
256pub struct QueueStreamed<T: StructSerde> (Queue, PhantomData<T>);
323
324unsafe impl<T: StructSerde> Send for QueueStreamed<T> {}
325unsafe impl<T: StructSerde> Sync for QueueStreamed<T> {}
326
327impl<T> QueueStreamed<T>
328where
329 T: StructSerde {
330 #[inline]
342 pub fn new (size: UBaseType, message_size: UBaseType) -> Result<Self> {
343 Ok(Self (Queue::new(size, message_size)?, PhantomData))
344 }
345
346 #[inline]
347 fn fetch_with_to_tick(&self, buffer: &mut T, time: impl ToTick) -> Result<()> {
348 self.fetch(buffer, time.to_ticks())
349 }
350
351 #[inline]
352 fn post_with_to_tick(&self, item: &T, time: impl ToTick) -> Result<()> {
353 self.post(item, time.to_ticks())
354 }
355}
356
357#[cfg(not(feature = "serde"))]
358impl<T> QueueStreamedFn<T> for QueueStreamed<T>
359where
360 T: StructSerde {
361
362 fn fetch(&self, buffer: &mut T, time: TickType) -> Result<()> {
363 let mut buf_bytes = Vec::with_capacity(buffer.len());
364
365 if let Ok(()) = self.0.fetch(&mut buf_bytes, time) {
366 *buffer = T::from_bytes(&buf_bytes)?;
367 Ok(())
368 } else {
369 Err(Error::Timeout)
370 }
371 }
372
373 fn fetch_from_isr(&self, buffer: &mut T) -> Result<()> {
374 let mut buf_bytes = Vec::with_capacity(buffer.len());
375
376 if let Ok(()) = self.0.fetch_from_isr(&mut buf_bytes) {
377 *buffer = T::from_bytes(&buf_bytes)?;
378 Ok(())
379 } else {
380 Err(Error::Timeout)
381 }
382 }
383
384 #[inline]
385 fn post(&self, item: &T, time: TickType) -> Result<()> {
386 self.0.post(&item.to_bytes(), time)
387 }
388
389 #[inline]
390 fn post_from_isr(&self, item: &T) -> Result<()> {
391 self.0.post_from_isr(&item.to_bytes())
392 }
393
394 #[inline]
395 fn delete(&mut self) {
396 self.0.delete()
397 }
398}
399
400#[cfg(feature = "serde")]
401impl<T> QueueStreamedFn<T> for QueueStreamed<T>
402where
403 T: StructSerde {
404
405 fn fetch(&self, buffer: &mut T, time: TickType) -> Result<()> {
406 let mut buf_bytes = Vec::with_capacity(buffer.len());
407
408 if let Ok(()) = self.0.fetch(&mut buf_bytes, time) {
409
410 to_bytes(buffer, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
411
412 Ok(())
413 } else {
414 Err(Error::Timeout)
415 }
416 }
417
418 fn fetch_from_isr(&self, buffer: &mut T) -> Result<()> {
419 let mut buf_bytes = Vec::with_capacity(buffer.len());
420
421 if let Ok(()) = self.0.fetch_from_isr(&mut buf_bytes) {
422 to_bytes(buffer, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
423 Ok(())
424 } else {
425 Err(Error::Timeout)
426 }
427 }
428
429 fn post(&self, item: &T, time: TickType) -> Result<()> {
430
431
432 let mut buf_bytes = Vec::with_capacity(item.len());
433
434 to_bytes(item, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
435
436 self.0.post(&buf_bytes, time)
437 }
438
439 fn post_from_isr(&self, item: &T) -> Result<()> {
440
441 let mut buf_bytes = Vec::with_capacity(item.len());
442
443 to_bytes(item, &mut buf_bytes).map_err(|_| Error::Unhandled("Deserializiation error"))?;
444
445 self.0.post_from_isr(&buf_bytes)
446 }
447
448 #[inline]
449 fn delete(&mut self) {
450 self.0.delete()
451 }
452}
453
454impl<T> Deref for QueueStreamed<T>
455where
456 T: StructSerde {
457 type Target = QueueHandle;
458
459 fn deref(&self) -> &Self::Target {
460 &self.0.0
461 }
462}
463
464impl<T> Debug for QueueStreamed<T>
465where
466 T: StructSerde {
467 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
468 f.debug_struct("QueueStreamed")
469 .field("handle", &self.0.0)
470 .finish()
471 }
472}
473
474impl<T> Display for QueueStreamed<T>
475where
476 T: StructSerde {
477 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
478 write!(f, "QueueStreamed {{ handle: {:?} }}", self.0.0)
479 }
480}
481