use alloc::sync::Arc;
use core::ffi::CStr;
use core::future::poll_fn;
use core::marker::PhantomData;
use core::mem::{ManuallyDrop, MaybeUninit};
use core::task::Poll;
use atomic_waker::AtomicWaker;
use veecle_freertos_sys::bindings::{
QueueHandle_t, StackType_t, UBaseType_t, pdTRUE, shim_xQueueCreate, shim_xQueueReceive,
shim_xQueueSendToBack, uxQueueMessagesWaiting, uxQueueSpacesAvailable, vQueueDelete,
};
use crate::isr::InterruptContext;
use crate::units::Duration;
use crate::{FreeRtosError, Task, TaskPriority};
#[derive(Debug)]
pub struct Queue<T> {
handle: QueueHandle_t,
item_type: PhantomData<T>,
}
unsafe impl<T> Send for Queue<T> {}
unsafe impl<T> Sync for Queue<T> {}
impl<T> Unpin for Queue<T> {}
impl<T> Queue<T>
where
T: Send + Sized + 'static,
{
pub fn new(max_size: UBaseType_t) -> Result<Queue<T>, FreeRtosError> {
let item_size = size_of::<T>();
let handle = unsafe { shim_xQueueCreate(max_size, item_size as UBaseType_t) };
if handle.is_null() {
return Err(FreeRtosError::OutOfMemory);
}
Ok(Queue {
handle,
item_type: PhantomData,
})
}
#[inline]
pub unsafe fn from_raw_handle(handle: QueueHandle_t) -> Self {
Self {
handle,
item_type: PhantomData,
}
}
#[inline]
pub fn raw_handle(&self) -> QueueHandle_t {
self.handle
}
pub fn send(&self, item: T, max_wait: Duration) -> Result<(), T> {
let item = ManuallyDrop::new(item);
if unsafe {
shim_xQueueSendToBack(self.handle, (&raw const *item).cast(), max_wait.ticks())
} == pdTRUE()
{
Ok(())
} else {
Err(ManuallyDrop::into_inner(item))
}
}
pub fn send_from_isr(&self, context: &mut InterruptContext, item: T) -> Result<(), T> {
let item = ManuallyDrop::new(item);
if unsafe {
veecle_freertos_sys::bindings::shim_xQueueSendToBackFromISR(
self.handle,
(&raw const *item).cast(),
context.get_task_field_mut(),
)
} == pdTRUE()
{
Ok(())
} else {
Err(ManuallyDrop::into_inner(item))
}
}
pub fn receive(&self, max_wait: Duration) -> Result<T, FreeRtosError> {
let mut buffer = MaybeUninit::<T>::uninit();
if unsafe { shim_xQueueReceive(self.handle, buffer.as_mut_ptr().cast(), max_wait.ticks()) }
== pdTRUE()
{
Ok(unsafe { buffer.assume_init() })
} else {
Err(FreeRtosError::QueueReceiveTimeout)
}
}
pub fn messages_waiting(&self) -> UBaseType_t {
unsafe { uxQueueMessagesWaiting(self.handle) }
}
pub fn spaces_available(&self) -> UBaseType_t {
unsafe { uxQueueSpacesAvailable(self.handle) }
}
}
impl<T> Clone for Queue<T> {
fn clone(&self) -> Self {
Self {
handle: self.handle,
item_type: self.item_type,
}
}
}
#[derive(Debug)]
struct AsyncQueue<T> {
send_waker: AtomicWaker,
receive_waker: AtomicWaker,
queue: Queue<T>,
}
impl<T> AsyncQueue<T>
where
T: Send + Sized + 'static,
{
pub fn new(length: UBaseType_t) -> Result<Self, FreeRtosError> {
Ok(AsyncQueue {
send_waker: AtomicWaker::default(),
receive_waker: AtomicWaker::default(),
queue: Queue::new(length)?,
})
}
#[inline]
pub fn messages_waiting(&self) -> UBaseType_t {
self.queue.messages_waiting()
}
}
impl<T> Drop for AsyncQueue<T> {
fn drop(&mut self) {
unsafe {
vQueueDelete(self.queue.handle);
}
}
}
#[derive(Debug)]
pub struct AsyncQueueSender<T>(Arc<AsyncQueue<T>>);
impl<T> AsyncQueueSender<T>
where
T: Send + Sized + 'static,
{
#[inline]
pub fn messages_waiting(&self) -> UBaseType_t {
self.0.messages_waiting()
}
#[inline]
pub fn send_blocking(&mut self, item: T, max_wait: Duration) -> Result<(), T> {
let result = self.0.queue.send(item, max_wait);
if result.is_ok() {
self.0.receive_waker.wake();
}
result
}
#[inline]
pub fn send_from_isr(&mut self, context: &mut InterruptContext, item: T) -> Result<(), T> {
let result = self.0.queue.send_from_isr(context, item);
if result.is_ok() {
self.0.receive_waker.wake();
}
result
}
async fn poll_ready(&mut self) {
poll_fn(|cx| {
self.0.send_waker.register(cx.waker());
let result = self.0.queue.spaces_available();
if result == 0 {
Poll::Pending
} else {
Poll::Ready(())
}
})
.await;
}
pub async fn send(&mut self, item: T) {
self.poll_ready().await;
if self.0.queue.send(item, Duration::zero()).is_err() {
unreachable!("sending failed unexpectedly");
};
self.0.receive_waker.wake();
}
}
#[derive(Debug)]
pub struct AsyncQueueReceiver<T>(Arc<AsyncQueue<T>>);
impl<T> AsyncQueueReceiver<T>
where
T: Send + Sized + 'static,
{
#[inline]
pub fn messages_waiting(&self) -> UBaseType_t {
self.0.messages_waiting()
}
pub fn receive_blocking(&mut self, max_wait: Duration) -> Result<T, FreeRtosError> {
let result = self.0.queue.receive(max_wait);
if result.is_ok() {
self.0.send_waker.wake();
}
result
}
pub async fn receive(&mut self) -> T {
poll_fn(|cx| {
let result = self.0.queue.receive(Duration::zero());
if let Ok(item) = result {
self.0.send_waker.wake();
Poll::Ready(item)
} else {
self.0.receive_waker.register(cx.waker());
Poll::Pending
}
})
.await
}
}
pub fn channel<T>(
max_size: UBaseType_t,
) -> Result<(AsyncQueueSender<T>, AsyncQueueReceiver<T>), FreeRtosError>
where
T: Send + Sized + 'static,
{
let queue = Arc::new(AsyncQueue::new(max_size)?);
let sender = AsyncQueueSender(queue.clone());
let receiver = AsyncQueueReceiver(queue);
Ok((sender, receiver))
}
#[derive(Debug)]
pub struct BlockingToAsyncQueueTaskBuilder<T> {
name: &'static CStr,
queue: Queue<T>,
priority: TaskPriority,
capacity: UBaseType_t,
stack_size: StackType_t,
}
impl<T> BlockingToAsyncQueueTaskBuilder<T>
where
T: Send + Sized + 'static,
{
pub fn new(name: &'static CStr, queue: Queue<T>, capacity: UBaseType_t) -> Self {
const BASE_STACK_SIZE: StackType_t = 256;
let data_size = size_of::<T>() as StackType_t * 2;
Self {
name,
queue,
capacity,
priority: TaskPriority(1),
stack_size: BASE_STACK_SIZE + data_size,
}
}
pub fn priority(mut self, priority: TaskPriority) -> Self {
self.priority = priority;
self
}
pub fn stack_size(mut self, stack_size: StackType_t) -> Self {
self.stack_size = stack_size;
self
}
pub fn create(self) -> Result<AsyncQueueReceiver<T>, FreeRtosError> {
let (mut sender, receiver) = channel(self.capacity)?;
Task::new()
.name(self.name)
.stack_size(self.stack_size)
.priority(self.priority)
.start(move |_| {
loop {
let duration = Duration::max();
if let Ok(mut data) = self.queue.receive(duration) {
while let Err(saved_data) = sender.send_blocking(data, duration) {
data = saved_data;
}
}
}
})?;
Ok(receiver)
}
}
#[derive(Debug)]
pub struct AsyncToBlockingQueueTaskBuilder<T> {
name: &'static CStr,
queue: Queue<T>,
priority: TaskPriority,
capacity: UBaseType_t,
stack_size: StackType_t,
}
impl<T> AsyncToBlockingQueueTaskBuilder<T>
where
T: Send + Sized + 'static,
{
pub fn new(name: &'static CStr, queue: Queue<T>, capacity: UBaseType_t) -> Self {
const BASE_STACK_SIZE: StackType_t = 256;
let data_size = size_of::<T>() as StackType_t * 2;
Self {
name,
queue,
priority: TaskPriority(1),
capacity,
stack_size: BASE_STACK_SIZE + data_size,
}
}
pub fn priority(mut self, priority: TaskPriority) -> Self {
self.priority = priority;
self
}
pub fn stack_size(mut self, stack_size: StackType_t) -> Self {
self.stack_size = stack_size;
self
}
pub fn create(self) -> Result<AsyncQueueSender<T>, FreeRtosError> {
let (sender, mut receiver) = channel(self.capacity)?;
Task::new()
.name(self.name)
.stack_size(self.stack_size)
.priority(self.priority)
.start(move |_| {
loop {
let duration = Duration::max();
if let Ok(mut data) = receiver.receive_blocking(duration) {
while let Err(saved_data) = self.queue.send(data, duration) {
data = saved_data;
}
}
}
})?;
Ok(sender)
}
}