use core::{
cell::RefCell,
future::{poll_fn, Future},
ops::{Deref, Range},
task::Poll,
};
use embassy_sync::blocking_mutex;
use esp_hal::asynch::AtomicWaker;
use portable_atomic::{AtomicU8, AtomicUsize, Ordering};
use crate::DefaultRawMutex;
pub enum TxSlotStatus {
Done,
Timeout,
Collision,
}
pub struct TxSlotStateSignal {
state: AtomicU8,
waker: AtomicWaker,
}
impl TxSlotStateSignal {
const PENDING_OR_INACTIVE: u8 = 0;
const DONE: u8 = 1;
const TIMEOUT: u8 = 2;
const COLLISION: u8 = 3;
pub const fn new() -> Self {
Self {
state: AtomicU8::new(Self::PENDING_OR_INACTIVE),
waker: AtomicWaker::new(),
}
}
pub fn reset(&self) {
self.state
.store(Self::PENDING_OR_INACTIVE, Ordering::Relaxed);
}
pub fn signal(&self, slot_status: TxSlotStatus) {
self.state.store(
match slot_status {
TxSlotStatus::Done => Self::DONE,
TxSlotStatus::Timeout => Self::TIMEOUT,
TxSlotStatus::Collision => Self::COLLISION,
},
Ordering::Relaxed,
);
self.waker.wake();
}
pub fn wait(&self) -> impl Future<Output = TxSlotStatus> + use<'_> {
poll_fn(|cx| {
let state = self.state.load(Ordering::Acquire);
if state != Self::PENDING_OR_INACTIVE {
self.reset();
Poll::Ready(match state {
Self::DONE => TxSlotStatus::Done,
Self::TIMEOUT => TxSlotStatus::Timeout,
Self::COLLISION => TxSlotStatus::Collision,
_ => unreachable!(),
})
} else {
self.waker.register(cx.waker());
Poll::Pending
}
})
}
}
pub struct SignalQueue {
waker: AtomicWaker,
queued_signals: AtomicUsize,
}
impl SignalQueue {
pub const fn new() -> Self {
Self {
waker: AtomicWaker::new(),
queued_signals: AtomicUsize::new(0),
}
}
pub fn put(&self) {
self.queued_signals.fetch_add(1, Ordering::Relaxed);
self.waker.wake();
}
pub fn reset(&self) {
self.queued_signals.store(0, Ordering::Relaxed);
}
pub async fn next(&self) {
poll_fn(|cx| {
let queued_signals = self.queued_signals.load(Ordering::Relaxed);
if queued_signals == 0 {
self.waker.register(cx.waker());
Poll::Pending
} else {
self.queued_signals
.store(queued_signals - 1, Ordering::Relaxed);
Poll::Ready(())
}
})
.await
}
}
pub(crate) struct BorrowedTxSlot<'a> {
state: &'a blocking_mutex::Mutex<DefaultRawMutex, RefCell<u8>>,
slot: usize,
}
impl Deref for BorrowedTxSlot<'_> {
type Target = usize;
fn deref(&self) -> &Self::Target {
&self.slot
}
}
impl Drop for BorrowedTxSlot<'_> {
fn drop(&mut self) {
self.state.lock(|rc| {
*rc.borrow_mut() |= 1 << self.slot;
});
trace!("Slot {} is now free again.", self.slot);
}
}
pub(crate) struct TxSlotQueue {
state: blocking_mutex::Mutex<DefaultRawMutex, RefCell<u8>>,
waker: AtomicWaker,
}
impl TxSlotQueue {
pub fn new(slots: Range<usize>) -> Self {
assert!(slots.end <= 5);
Self {
state: blocking_mutex::Mutex::new(RefCell::new(
slots.fold(0u8, |acc, bit_index| acc | (1 << bit_index)),
)),
waker: AtomicWaker::new(),
}
}
pub async fn wait_for_slot(&self) -> BorrowedTxSlot<'_> {
let slot = poll_fn(|cx| {
self.state.lock(|rc| {
let mut state = rc.borrow_mut();
let trailing_zeros = state.trailing_zeros();
if trailing_zeros == 8 {
self.waker.register(cx.waker());
Poll::Pending
} else {
*state &= !(1 << trailing_zeros);
Poll::Ready(trailing_zeros as usize)
}
})
})
.await;
BorrowedTxSlot {
state: &self.state,
slot,
}
}
}