use core::cell::UnsafeCell;
use core::future::Future;
use core::ops::{Deref, DerefMut};
use core::pin::pin;
use embassy_futures::select::{select, Either};
use embassy_sync::blocking_mutex::raw::RawMutex;
use embassy_time::{Duration, Timer};
use crate::utils::init::{init, Init, InitDefault, UnsafeCellInit};
use crate::utils::sync::blocking::raw::MatterRawMutex;
use crate::utils::sync::Signal;
pub trait Buffers<T>
where
T: ?Sized,
{
type Buffer<'a>: DerefMut<Target = T>
where
Self: 'a;
async fn get(&self) -> Option<Self::Buffer<'_>>;
fn get_immediate(&self) -> Option<Self::Buffer<'_>>;
}
impl<B, T> Buffers<T> for &B
where
B: Buffers<T>,
T: ?Sized,
{
type Buffer<'a>
= B::Buffer<'a>
where
Self: 'a;
fn get(&self) -> impl Future<Output = Option<Self::Buffer<'_>>> {
(*self).get()
}
fn get_immediate(&self) -> Option<Self::Buffer<'_>> {
(*self).get_immediate()
}
}
pub const DEFAULT_BUFFER_POOL_SIZE: usize = 10;
pub struct PooledBuffers<T, const N: usize = DEFAULT_BUFFER_POOL_SIZE, M = MatterRawMutex> {
available: Signal<[bool; N], M>, pool: UnsafeCell<crate::utils::storage::Vec<T, N>>,
wait_timeout_ms: u32,
}
impl<T, const N: usize, M> PooledBuffers<T, N, M>
where
M: RawMutex,
{
#[inline(always)]
pub const fn new() -> Self {
Self::new_with_timeout(0)
}
#[inline(always)]
pub const fn new_with_timeout(wait_timeout_ms: u32) -> Self {
Self {
available: Signal::new([true; N]),
pool: UnsafeCell::new(crate::utils::storage::Vec::new()),
wait_timeout_ms,
}
}
pub fn init() -> impl Init<Self> {
Self::init_with_timeout(0)
}
pub fn init_with_timeout(wait_timeout_ms: u32) -> impl Init<Self> {
init!(Self {
available: Signal::new([true; N]),
pool <- UnsafeCell::init(crate::utils::storage::Vec::init()),
wait_timeout_ms,
})
}
fn init_buffers(pool: &UnsafeCell<crate::utils::storage::Vec<T, N>>)
where
T: InitDefault,
{
let buffers = unwrap!(unsafe { pool.get().as_mut() });
while buffers.len() < N {
unwrap!(buffers.push_init_unchecked(T::init_default()));
}
}
}
impl<T, const N: usize, M> Default for PooledBuffers<T, N, M>
where
M: RawMutex,
{
fn default() -> Self {
Self::new()
}
}
unsafe impl<T, const N: usize, M> Send for PooledBuffers<T, N, M>
where
T: Send,
M: RawMutex + Send,
{
}
unsafe impl<T, const N: usize, M> Sync for PooledBuffers<T, N, M>
where
T: Send,
M: RawMutex + Send + Sync,
{
}
impl<T, const N: usize, M> Buffers<T> for PooledBuffers<T, N, M>
where
T: InitDefault,
M: RawMutex,
{
type Buffer<'b>
= PooledBuffer<'b, T, N, M>
where
Self: 'b;
async fn get(&self) -> Option<Self::Buffer<'_>> {
if self.wait_timeout_ms > 0 {
let mut wait = pin!(self.available.wait(|available| {
Self::init_buffers(&self.pool);
if let Some(index) = available.iter().position(|a| *a) {
available[index] = false;
Some(index)
} else {
None
}
}));
let mut timeout = pin!(Timer::after(Duration::from_millis(
self.wait_timeout_ms as u64
)));
let result = select(&mut wait, &mut timeout).await;
match result {
Either::First(index) => {
let buffer = &mut unwrap!(unsafe { self.pool.get().as_mut() })[index];
Some(PooledBuffer {
index,
buffer,
access: self,
})
}
Either::Second(()) => None,
}
} else {
self.get_immediate()
}
}
fn get_immediate(&self) -> Option<Self::Buffer<'_>> {
let index = self.available.modify(|available| {
Self::init_buffers(&self.pool);
if let Some(index) = available.iter().position(|a| *a) {
available[index] = false;
(false, Some(index))
} else {
(false, None)
}
});
index.map(|index| {
let buffers = unwrap!(unsafe { self.pool.get().as_mut() });
let buffer = &mut buffers[index];
PooledBuffer {
index,
buffer,
access: self,
}
})
}
}
pub struct PooledBuffer<'a, T, const N: usize, M = MatterRawMutex>
where
M: RawMutex,
{
index: usize,
buffer: &'a mut T,
access: &'a PooledBuffers<T, N, M>,
}
impl<T, const N: usize, M> Drop for PooledBuffer<'_, T, N, M>
where
M: RawMutex,
{
fn drop(&mut self) {
self.access.available.modify(|available| {
available[self.index] = true;
(true, ())
});
}
}
impl<T, const N: usize, M> Deref for PooledBuffer<'_, T, N, M>
where
M: RawMutex,
{
type Target = T;
fn deref(&self) -> &Self::Target {
self.buffer.deref()
}
}
impl<T, const N: usize, M> DerefMut for PooledBuffer<'_, T, N, M>
where
M: RawMutex,
{
fn deref_mut(&mut self) -> &mut Self::Target {
self.buffer.deref_mut()
}
}