use crate::ringbuf::{Consumer as RingBufConsumer, Producer as RingBufProducer, RingBuf};
use crate::shared_singleton::SharedSingleton;
#[derive(Debug)]
pub enum SharedPoolError {
PoolFull,
AllocBufFull,
ReturnBufFull,
AllocBufEmpty,
PayloadNotConsumerOwned,
AlreadySplit,
}
#[derive(Clone, Copy)]
pub struct PoolIndex<const N: usize>(u32);
impl<const N: usize> TryFrom<PoolIndex<N>> for usize {
type Error = ();
fn try_from(value: PoolIndex<N>) -> Result<Self, Self::Error> {
if value.0 >= N as u32 {
Err(())
} else {
Ok(value.0 as usize)
}
}
}
impl<const N: usize> PoolIndex<N> {
pub fn is_valid(&self) -> bool {
self.0 < N as u32
}
}
pub trait HasPoolIdx<const N: usize> {
fn get_pool_idx(&self) -> PoolIndex<N>;
fn set_pool_idx(&mut self, pindex: PoolIndex<N>);
}
pub struct Producer<'a, T, Q: HasPoolIdx<N>, const N: usize, const M: usize> {
pub alloc_prod: RingBufProducer<'a, Q, M>,
pub return_cons: RingBufConsumer<'a, Q, M>,
pool_ref: &'a [SharedSingleton<T>; N],
}
impl<'a, T, Q: HasPoolIdx<N>, const N: usize, const M: usize> Producer<'a, T, Q, N, M> {
pub const fn new(
alloc_prod: RingBufProducer<'a, Q, M>,
return_cons: RingBufConsumer<'a, Q, M>,
pool_ref: &'a [SharedSingleton<T>; N],
) -> Self {
Producer {
alloc_prod,
return_cons,
pool_ref,
}
}
fn take_pool_item(&mut self) -> PoolIndex<N> {
if let Some(item) = self.return_cons.reader_front() {
let payload_idx = usize::try_from(item.get_pool_idx()).unwrap();
assert!(self.pool_ref[payload_idx].is_vacant());
assert!(self.return_cons.pop().is_ok());
return PoolIndex(payload_idx as u32);
}
PoolIndex(N as u32)
}
pub fn stage(&mut self) -> Option<&mut Q> {
if let Some(item) = self.alloc_prod.writer_front() {
item.set_pool_idx(PoolIndex::<N>(N as u32));
Some(item)
} else {
None
}
}
pub fn stage_with_payload(&mut self) -> Result<(&mut Q, &SharedSingleton<T>), SharedPoolError> {
if let Ok(idx) = usize::try_from(self.take_pool_item()) {
let payload = &self.pool_ref[idx];
if let Some(item) = self.alloc_prod.writer_front() {
item.set_pool_idx(PoolIndex::<N>(idx as u32));
Ok((item, payload))
} else {
Err(SharedPoolError::AllocBufFull)
}
} else {
Err(SharedPoolError::PoolFull)
}
}
pub fn commit(&mut self) -> Result<(), SharedPoolError> {
if let Some(item) = self.alloc_prod.writer_front() {
if let Ok(idx) = usize::try_from(item.get_pool_idx()) {
if self.pool_ref[idx].try_read().is_none() {
return Err(SharedPoolError::PayloadNotConsumerOwned);
}
}
}
self.alloc_prod
.commit()
.map_err(|_| SharedPoolError::AllocBufFull)
}
}
pub struct Consumer<'a, T, Q: HasPoolIdx<N>, const N: usize, const M: usize> {
pub alloc_cons: RingBufConsumer<'a, Q, M>,
pub return_prod: RingBufProducer<'a, Q, M>,
pool_ref: &'a [SharedSingleton<T>; N],
}
impl<'a, T, Q: HasPoolIdx<N>, const N: usize, const M: usize> Consumer<'a, T, Q, N, M> {
pub fn peek_with_payload(&self) -> (Option<&Q>, Option<&SharedSingleton<T>>) {
let ret = self.alloc_cons.reader_front();
match ret {
Some(message) => {
let has_idx = message.get_pool_idx();
if let Ok(idx) = usize::try_from(has_idx) {
(ret, Some(&self.pool_ref[idx]))
}
else {
(ret, None)
}
}
_ => (None, None)
}
}
pub fn peek(&self) -> Option<&Q> {
self.alloc_cons.reader_front()
}
pub fn read_pool_item(&self, pidx: PoolIndex<N>) -> Option<&SharedSingleton<T>> {
if let Ok(idx) = usize::try_from(pidx) {
Some(&self.pool_ref[idx])
}
else {
None
}
}
pub fn pop(&mut self) -> Result<(), SharedPoolError> {
self.alloc_cons
.pop()
.map_err(|_| SharedPoolError::AllocBufEmpty)
}
pub fn return_payload(&mut self, pidx: PoolIndex<N>) -> Result<(), SharedPoolError> {
if let Some(re) = self.return_prod.writer_front() {
assert!(pidx.is_valid());
assert!(self.pool_ref[pidx.0 as usize].is_vacant());
re.set_pool_idx(pidx);
self.return_prod
.commit()
.map_err(|_| SharedPoolError::ReturnBufFull)
} else {
Err(SharedPoolError::ReturnBufFull)
}
}
}
pub struct SharedPool<T, Q: HasPoolIdx<N>, const N: usize, const M: usize> {
alloc_rbuf: RingBuf<Q, M>,
return_rbuf: RingBuf<Q, M>,
pool: [SharedSingleton<T>; N],
}
unsafe impl<T, Q: HasPoolIdx<N>, const N: usize, const M: usize> Sync for SharedPool<T, Q, N, M> {}
impl<T, Q: HasPoolIdx<N>, const N: usize, const M: usize> SharedPool<T, Q, N, M> {
const OK: () = assert!(M >= N, "Ringbuf capacity (M) must be >= Pool Capacity (N)");
#[allow(clippy::let_unit_value)]
pub const fn new() -> Self {
let _: () = SharedPool::<T, Q, N, M>::OK;
SharedPool {
alloc_rbuf: RingBuf::new(),
return_rbuf: RingBuf::new(),
pool: [SharedSingleton::INIT_0; N],
}
}
pub fn split_prod(&self) -> Result<Producer<'_, T, Q, N, M>, SharedPoolError> {
if self.alloc_rbuf.has_split_prod() || self.return_rbuf.has_split_cons() {
Err(SharedPoolError::AlreadySplit)
} else {
let alloc_p = self.alloc_rbuf.split_prod().unwrap();
let ret_c = self.return_rbuf.split_cons().unwrap();
let producer = Producer {
alloc_prod: alloc_p,
return_cons: ret_c,
pool_ref: &self.pool,
};
Ok(producer)
}
}
pub fn split_cons(&self) -> Result<Consumer<'_, T, Q, N, M>, SharedPoolError> {
if self.alloc_rbuf.has_split_cons() || self.return_rbuf.has_split_prod() {
Err(SharedPoolError::AlreadySplit)
} else {
let alloc_c = self.alloc_rbuf.split_cons().unwrap();
let mut ret_p = self.return_rbuf.split_prod().unwrap();
for i in 0..N {
let item = ret_p.writer_front().unwrap();
item.set_pool_idx(PoolIndex(i as u32));
ret_p.commit().unwrap();
}
let consumer = Consumer {
alloc_cons: alloc_c,
return_prod: ret_p,
pool_ref: &self.pool,
};
Ok(consumer)
}
}
pub fn split(&self) -> Result<(Producer<'_, T, Q, N, M>, Consumer<'_, T, Q, N, M>), SharedPoolError> {
match (self.split_prod(), self.split_cons()) {
(Ok(prod), Ok(cons)) => Ok((prod, cons)),
_ => Err(SharedPoolError::AlreadySplit)
}
}
pub fn num_free(&self) -> u32 {
self.return_rbuf.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
const POOL_DEPTH: usize = 16;
pub struct Message {
id: u32,
payload: PoolIndex<POOL_DEPTH>,
}
impl HasPoolIdx<POOL_DEPTH> for Message {
fn get_pool_idx(&self) -> PoolIndex<POOL_DEPTH> {
self.payload
}
fn set_pool_idx(&mut self, pindex: PoolIndex<POOL_DEPTH>) {
self.payload = pindex
}
}
pub struct Payload {
value: u32,
}
static SHARED_POOL: SharedPool<Payload, Message, 16, 32> = SharedPool {
alloc_rbuf: RingBuf::INIT_0,
return_rbuf: RingBuf::INIT_0,
pool: [SharedSingleton::<Payload>::INIT_0; 16],
};
#[test]
fn test_basic() {
if let Ok((mut producer, mut consumer)) = SHARED_POOL.split() {
let (message, payload) = producer.stage_with_payload().unwrap();
message.id = 41;
let raw = payload.try_write().unwrap();
raw.value = 42;
payload.write_done().unwrap();
assert!(producer.commit().is_ok());
assert!(consumer.peek_with_payload().0.is_some());
let (recvd, payload) = consumer.peek_with_payload();
assert!(recvd.unwrap().id == 41);
assert!(payload.unwrap().try_read().unwrap().value == 42);
assert!(payload.unwrap().read_done().is_ok());
assert!(consumer.return_payload(recvd.unwrap().get_pool_idx()).is_ok());
assert!(consumer.pop().is_ok());
let (message, payload) = producer.stage_with_payload().unwrap();
message.id = 43;
let raw = payload.try_write().unwrap();
raw.value = 44;
payload.write_done().unwrap();
assert!(producer.commit().is_ok());
let recvd = consumer.peek().unwrap();
let payload_idx = recvd.get_pool_idx();
assert!(consumer.pop().is_ok());
let payload = consumer.read_pool_item(payload_idx).unwrap();
assert!(payload.try_read().unwrap().value == 44);
assert!(payload.read_done().is_ok());
assert!(consumer.return_payload(payload_idx).is_ok());
} else {
panic!("first split failed!");
}
}
}