#![cfg_attr(not(feature = "std"), no_std)]
#[cfg(not(feature = "std"))]
extern crate alloc;
#[cfg(not(feature = "std"))]
use core::
{
fmt,
mem,
ops::{Deref, DerefMut},
ptr::{self, NonNull},
sync::atomic::{AtomicU64, Ordering}
};
#[cfg(not(feature = "std"))]
use core::{marker::PhantomData, task::Poll, time::Duration};
#[cfg(not(feature = "std"))]
use alloc::{boxed::Box, collections::vec_deque::VecDeque, vec::Vec};
#[cfg(not(feature = "std"))]
use alloc::vec;
use crossbeam_utils::Backoff;
#[cfg(feature = "std")]
use std::
{
collections::VecDeque,
ops::{Deref, DerefMut},
ptr::{self, NonNull},
sync::atomic::{AtomicU64, Ordering},
fmt,
mem
};
#[cfg(feature = "std")]
use std::{marker::PhantomData, task::Poll, time::Duration};
extern crate crossbeam_utils;
pub trait TryClone: Sized
{
type Error;
fn try_clone(&self) -> Result<Self, Self::Error>;
}
pub trait LocalAsyncDrop: Send + Sync + 'static
{
fn async_drop(&mut self) -> impl std::future::Future<Output = ()>;
}
pub trait LocalAsyncClone: Send + Sync + 'static
{
fn async_clone(&self) -> impl std::future::Future<Output = Self>;
}
pub async
fn async_drop<LAD: LocalAsyncDrop + Send + Sync>(mut lad: LAD)
{
lad.async_drop().await;
drop(lad);
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RwBufferError
{
TooManyRead,
TooManyBase,
ReadTryAgianLater,
WriteTryAgianLater,
BaseTryAgainLater,
OutOfBuffers,
DowngradeFailed,
InvalidArguments,
Busy,
}
impl fmt::Display for RwBufferError
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
match self
{
Self::TooManyRead =>
write!(f, "TooManyRead: read soft limit reached"),
Self::TooManyBase =>
write!(f, "TooManyBase: base soft limit reached"),
Self::ReadTryAgianLater =>
write!(f, "ReadTryAgianLater: shared access not available, try again later"),
Self::WriteTryAgianLater =>
write!(f, "WriteTryAgianLater: exclusive access not available, try again later"),
Self::BaseTryAgainLater =>
write!(f, "BaseTryAgainLater: failed to obtain a clone in reasonable time"),
Self::OutOfBuffers =>
write!(f, "OutOfBuffers: no more free bufers are left"),
Self::DowngradeFailed =>
write!(f, "DowngradeFailed: can not downgrade exclusive to shared, race condition"),
Self::InvalidArguments =>
write!(f, "InvalidArguments: arguments are not valid"),
Self::Busy =>
write!(f, "RwBuffer is busy and cannot be acquired"),
}
}
}
pub type RwBufferRes<T> = Result<T, RwBufferError>;
#[derive(Debug)]
pub struct RBuffer
{
inner: NonNull<RwBufferInner>,
a_dropped: bool,
}
unsafe impl Send for RBuffer {}
unsafe impl Sync for RBuffer {}
impl RwBufType for RBuffer {}
impl Eq for RBuffer {}
impl PartialEq for RBuffer
{
fn eq(&self, other: &Self) -> bool
{
return self.inner == other.inner;
}
}
impl RBuffer
{
#[inline]
fn new(inner: NonNull<RwBufferInner>) -> Self
{
return Self{ inner, a_dropped: false };
}
#[cfg(test)]
fn get_flags(&self) -> RwBufferFlags<Self>
{
use core::sync::atomic::Ordering;
let inner = unsafe{ self.inner.as_ref() };
let flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
return flags;
}
pub
fn as_slice(&self) -> &[u8]
{
let inner = unsafe { self.inner.as_ref() };
return inner.buf.as_ref().unwrap().as_slice();
}
pub
fn try_inner(mut self) -> Result<Vec<u8>, Self>
{
let inner = unsafe { self.inner.as_ref() };
let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
if current_flags.try_inner_check() == true
{
let inner = unsafe { self.inner.as_mut() };
let buf = inner.buf.take().unwrap();
drop(self);
return Ok(buf);
}
return Err(self);
}
fn inner(&self) -> &RwBufferInner
{
return unsafe { self.inner.as_ref() };
}
}
impl Deref for RBuffer
{
type Target = Vec<u8>;
fn deref(&self) -> &Vec<u8>
{
let inner = self.inner();
return inner.buf.as_ref().unwrap();
}
}
impl LocalAsyncClone for RBuffer
{
fn async_clone(&self) -> impl std::future::Future<Output = Self>
{
return
std::future::poll_fn(
|cx|
{
let inner = self.inner();
let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
let mut new_flags = current_flags.clone();
new_flags.read().unwrap();
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(_) = res
{
return Poll::Ready( Self{ inner: self.inner, a_dropped: self.a_dropped } );
}
return Poll::Pending;
}
);
}
}
impl Clone for RBuffer
{
fn clone(&self) -> Self
{
let inner = self.inner();
let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
let mut new_flags = current_flags.clone();
new_flags.read().unwrap();
let backoff = Backoff::new();
let mut parked = false;
loop
{
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(_) = res
{
return Self{ inner: self.inner, a_dropped: self.a_dropped };
}
current_flags = res.err().unwrap().into();
new_flags = current_flags.clone();
new_flags.read().unwrap();
if backoff.is_completed() == false
{
backoff.snooze();
}
else
{
if parked == false
{
std::thread::park_timeout(Duration::from_millis(1));
parked = true;
}
else
{
panic!("can not obtain a clone of RBuffer!");
}
}
}
}
}
impl TryClone for RBuffer
{
type Error = RwBufferError;
fn try_clone(&self) -> Result<Self, Self::Error>
{
let inner = self.inner();
let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
let mut new_flags = current_flags.clone();
new_flags.read()?;
let backoff = Backoff::new();
loop
{
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(_) = res
{
return Ok(Self{ inner: self.inner, a_dropped: self.a_dropped });
}
current_flags = res.err().unwrap().into();
new_flags = current_flags.clone();
new_flags.read()?;
if backoff.is_completed() == false
{
backoff.snooze();
}
else
{
break;
}
}
return Err(RwBufferError::ReadTryAgianLater);
}
}
impl LocalAsyncDrop for RBuffer
{
fn async_drop(&mut self) -> impl std::future::Future<Output = ()>
{
self.a_dropped = true;
return
std::future::poll_fn(
|cx|
{
let inner = self.inner();
let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
let mut new_flags = current_flags.clone();
new_flags.unread();
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
{
if flags.is_drop_inplace() == true
{
unsafe { ptr::drop_in_place(self.inner.as_ptr()) };
}
return Poll::Ready(());
}
cx.waker().wake_by_ref();
return Poll::Pending;
}
);
}
}
impl Drop for RBuffer
{
fn drop(&mut self)
{
if self.a_dropped == true
{
return;
}
let inner = self.inner();
let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
let mut new_flags = current_flags.clone();
new_flags.unread();
let backoff = Backoff::new();
for _ in 0..1000
{
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
{
if flags.is_drop_inplace() == true
{
unsafe { ptr::drop_in_place(self.inner.as_ptr()) };
}
return;
}
current_flags = res.err().unwrap().into();
new_flags = current_flags.clone();
new_flags.unread();
backoff.snooze();
}
panic!("assertion trap: RBuffer::drop can not drop RBuffer in reasonable time!");
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct WBuffer
{
buf: NonNull<RwBufferInner>,
downgraded: bool,
}
unsafe impl Send for WBuffer{}
unsafe impl Sync for WBuffer{}
impl RwBufType for WBuffer{}
impl WBuffer
{
#[inline]
fn new(inner: NonNull<RwBufferInner>) -> Self
{
return Self{ buf: inner, downgraded: false };
}
pub
fn downgrade(mut self) -> Result<RBuffer, Self>
{
let inner = unsafe { self.buf.as_ref() };
let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
let mut new_flags = current_flags.clone();
new_flags.downgrade();
let backoff = Backoff::new();
while backoff.is_completed() == false
{
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(_) = res
{
self.downgraded = true;
return Ok(RBuffer::new(self.buf.clone()));
}
current_flags = res.err().unwrap().into();
new_flags = current_flags.clone();
new_flags.downgrade();
backoff.snooze();
}
return Err(self);
}
pub
fn as_slice(&self) -> &[u8]
{
let inner = unsafe { self.buf.as_ref() };
return inner.buf.as_ref().unwrap()
}
}
impl Deref for WBuffer
{
type Target = Vec<u8>;
fn deref(&self) -> &Vec<u8>
{
let inner = unsafe { self.buf.as_ref() };
return inner.buf.as_ref().unwrap();
}
}
impl DerefMut for WBuffer
{
fn deref_mut(&mut self) -> &mut Vec<u8>
{
let inner = unsafe { self.buf.as_mut() };
return inner.buf.as_mut().unwrap();
}
}
impl Drop for WBuffer
{
fn drop(&mut self)
{
if self.downgraded == true
{
return;
}
let inner = unsafe { self.buf.as_ref() };
let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
let mut new_flags = current_flags.clone();
new_flags.unwrite();
let backoff = Backoff::new();
for _ in 0..1000
{
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
{
if flags.is_drop_inplace() == true
{
unsafe { ptr::drop_in_place(self.buf.as_ptr()) };
}
return;
}
current_flags = res.err().unwrap().into();
new_flags = current_flags.clone();
new_flags.unwrite();
backoff.snooze();
}
panic!("assertion trap: WBuffer::drop can not drop RBuffer in reasonable time!");
}
}
impl LocalAsyncDrop for WBuffer
{
fn async_drop(&mut self) -> impl std::future::Future<Output = ()>
{
self.downgraded = true;
return
std::future::poll_fn(
move |cx|
{
let inner = unsafe { self.buf.as_ref() };
let current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
let mut new_flags = current_flags.clone();
new_flags.unwrite();
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
{
if flags.is_drop_inplace() == true
{
unsafe { ptr::drop_in_place(self.buf.as_ptr()) };
}
return Poll::Ready(());
}
cx.waker().wake_by_ref();
return Poll::Pending;
}
);
}
}
trait RwBufType {}
#[repr(align(8))]
#[derive(Debug, PartialEq, Eq)]
struct RwBufferFlags<TP>
{
read: u32,
write: bool,
base: u16,
unused0: u8,
_p: PhantomData<TP>,
}
impl<TP: RwBufType> From<u64> for RwBufferFlags<TP>
{
fn from(value: u64) -> Self
{
return unsafe { mem::transmute(value) };
}
}
impl<TP: RwBufType> From<RwBufferFlags<TP>> for u64
{
fn from(value: RwBufferFlags<TP>) -> Self
{
return unsafe { mem::transmute(value) };
}
}
impl<TP: RwBufType> Default for RwBufferFlags<TP>
{
fn default() -> RwBufferFlags<TP>
{
return
Self
{
read: 0,
write: false,
base: 1,
unused0: 0,
_p: PhantomData
};
}
}
impl Copy for RwBufferFlags<WBuffer>{}
impl Clone for RwBufferFlags<WBuffer>
{
fn clone(&self) -> Self
{
return
Self
{
read: self.read.clone(),
write: self.write.clone(),
base: self.base.clone(),
unused0: self.unused0.clone(),
_p: PhantomData
}
}
}
impl Copy for RwBufferFlags<RBuffer>{}
impl Clone for RwBufferFlags<RBuffer>
{
fn clone(&self) -> Self
{
return
Self
{
read: self.read.clone(),
write: self.write.clone(),
base: self.base.clone(),
unused0: self.unused0.clone(),
_p: PhantomData
}
}
}
impl Copy for RwBufferFlags<RwBuffer>{}
impl Clone for RwBufferFlags<RwBuffer>
{
fn clone(&self) -> Self
{
return
Self
{
read: self.read.clone(),
write: self.write.clone(),
base: self.base.clone(),
unused0: self.unused0.clone(),
_p: PhantomData
}
}
}
impl RwBufferFlags<WBuffer>
{
#[inline]
fn write(&mut self) -> RwBufferRes<()>
{
if self.read == 0
{
self.write = true;
return Ok(());
}
else
{
return Err(RwBufferError::WriteTryAgianLater);
}
}
#[inline]
fn downgrade(&mut self)
{
self.write = false;
self.read += 1;
}
#[inline]
fn unwrite(&mut self)
{
self.write = false;
}
}
impl RwBufferFlags<RBuffer>
{
#[inline]
fn try_inner_check(&self) -> bool
{
return self.read == 1 && self.write == false && self.base == 0;
}
#[inline]
fn unread(&mut self)
{
self.read -= 1;
}
#[inline]
fn read(&mut self) -> RwBufferRes<()>
{
if self.write == false
{
self.read += 1;
if self.read <= Self::MAX_READ_REFS
{
return Ok(());
}
return Err(RwBufferError::TooManyRead);
}
return Err(RwBufferError::ReadTryAgianLater);
}
}
impl RwBufferFlags<RwBuffer>
{
#[inline]
fn make_pre_unused() -> Self
{
return Self{ read: 0, write: false, base: 1, unused0: 0, _p: PhantomData };
}
#[inline]
fn read(&mut self) -> RwBufferRes<()>
{
if self.write == false
{
self.read += 1;
if self.read <= Self::MAX_READ_REFS
{
return Ok(());
}
return Err(RwBufferError::TooManyRead);
}
return Err(RwBufferError::ReadTryAgianLater);
}
#[inline]
fn write(&mut self) -> RwBufferRes<()>
{
if self.read == 0
{
self.write = true;
return Ok(());
}
else
{
return Err(RwBufferError::WriteTryAgianLater);
}
}
#[inline]
fn base(&mut self) -> RwBufferRes<()>
{
self.base += 1;
if self.base <= Self::MAX_BASE_REFS
{
return Ok(());
}
return Err(RwBufferError::TooManyBase);
}
#[inline]
fn unbase(&mut self) -> bool
{
self.base -= 1;
return self.base != 0;
}
}
impl<TP: RwBufType> RwBufferFlags<TP>
{
pub const MAX_READ_REFS: u32 = u32::MAX - 2;
pub const MAX_BASE_REFS: u16 = u16::MAX - 2;
#[inline]
fn is_free(&self) -> bool
{
return self.write == false && self.read == 0 && self.base == 1;
}
#[inline]
fn is_drop_inplace(&self) -> bool
{
return self.read == 0 && self.write == false && self.base == 0;
}
}
#[derive(Debug)]
pub struct RwBufferInner
{
flags: AtomicU64,
buf: Option<Vec<u8>>,
}
impl RwBufferInner
{
fn new(buf_size: usize) -> Self
{
return
Self
{
flags:
AtomicU64::new(RwBufferFlags::<RwBuffer>::default().into()),
buf:
Some(vec![0_u8; buf_size])
};
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct RwBuffer(NonNull<RwBufferInner>);
unsafe impl Send for RwBuffer {}
unsafe impl Sync for RwBuffer {}
impl RwBufType for RwBuffer {}
impl RwBuffer
{
#[inline]
fn new(buf_size: usize) -> Self
{
let status = Box::new(RwBufferInner::new(buf_size));
return Self(Box::leak(status).into());
}
#[inline]
fn inner(&self) -> &RwBufferInner
{
return unsafe { self.0.as_ref() };
}
#[inline]
pub
fn is_free(&self) -> bool
{
let inner = self.inner();
let flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
return flags.is_free();
}
#[inline]
pub(crate)
fn acqiure_if_free(&self) -> RwBufferRes<Self>
{
let inner = self.inner();
let current_flags: RwBufferFlags<Self> = RwBufferFlags::make_pre_unused();
let mut new_flags = current_flags.clone();
new_flags.base()?;
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(_) = res
{
return Ok(Self(self.0.clone()));
}
return Err(RwBufferError::Busy);
}
pub
fn write(&self) -> RwBufferRes<WBuffer>
{
let inner = self.inner();
let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
let mut new_flags = current_flags.clone();
new_flags.write()?;
let backoff = Backoff::new();
while backoff.is_completed() == false
{
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(_) = res
{
return Ok(WBuffer::new(self.0.clone()));
}
current_flags = res.err().unwrap().into();
new_flags = current_flags.clone();
new_flags.write()?;
backoff.snooze();
}
return Err(RwBufferError::WriteTryAgianLater);
}
pub async
fn write_async(&self) -> RwBufferRes<WBuffer>
{
return
std::future::poll_fn(
|cx|
{
let inner = self.inner();
let current_flags: RwBufferFlags<WBuffer> = inner.flags.load(Ordering::SeqCst).into();
let mut new_flags = current_flags.clone();
if let Err(e) = new_flags.write()
{
return Poll::Ready(Err(e));
}
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(_) = res
{
return Poll::Ready( Ok( WBuffer::new(self.0.clone()) ) );
}
cx.waker().wake_by_ref();
return Poll::Pending;
}
)
.await;
}
pub
fn read(&self) -> RwBufferRes<RBuffer>
{
let inner = self.inner();
let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
let mut new_flags = current_flags.clone();
new_flags.read()?;
let backoff = Backoff::new();
while backoff.is_completed() == false
{
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(_) = res
{
return Ok(RBuffer::new(self.0.clone()));
}
current_flags = res.err().unwrap().into();
new_flags = current_flags.clone();
new_flags.read()?;
backoff.snooze();
}
return Err(RwBufferError::ReadTryAgianLater);
}
pub async
fn read_async(&self) -> RwBufferRes<RBuffer>
{
return
std::future::poll_fn(
|cx|
{
let inner = self.inner();
let current_flags: RwBufferFlags<RBuffer> = inner.flags.load(Ordering::SeqCst).into();
let mut new_flags = current_flags.clone();
match new_flags.read()
{
Ok(_) => {},
Err(RwBufferError::TooManyRead) =>
return Poll::Ready(Err(RwBufferError::TooManyRead)),
Err(RwBufferError::ReadTryAgianLater) =>
{
cx.waker().wake_by_ref();
return Poll::Pending;
},
Err(e) =>
panic!("assertion trap: unknown error {} in Future for AsyncRBuffer", e)
}
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(_) = res
{
return Poll::Ready( Ok( RBuffer::new(self.0.clone()) ) );
}
cx.waker().wake_by_ref();
return Poll::Pending;
}
)
.await;
}
#[cfg(test)]
fn get_flags(&self) -> RwBufferFlags<Self>
{
let inner = self.inner();
let flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Acquire).into();
return flags;
}
fn clone_single(&self) -> RwBufferRes<Self>
{
let inner = self.inner();
let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
current_flags.base()?;
inner.flags.store(current_flags.into(), Ordering::Relaxed);
return Ok(Self(self.0));
}
}
impl Clone for RwBuffer
{
fn clone(&self) -> Self
{
let inner = self.inner();
let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
let mut new_flags = current_flags.clone();
new_flags.base().unwrap();
let backoff = Backoff::new();
let mut parked = false;
loop
{
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(_) = res
{
return Self(self.0);
}
current_flags = res.err().unwrap().into();
new_flags = current_flags.clone();
new_flags.base().unwrap();
if backoff.is_completed() == false
{
backoff.snooze();
}
else
{
if parked == false
{
std::thread::park_timeout(Duration::from_millis(1));
parked = true;
}
else
{
panic!("can not obtain a clone of RBuffer!");
}
}
}
}
}
impl TryClone for RwBuffer
{
type Error = RwBufferError;
fn try_clone(&self) -> Result<Self, Self::Error>
{
let inner = self.inner();
let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::SeqCst).into();
let mut new_flags = current_flags.clone();
new_flags.base()?;
let backoff = Backoff::new();
while backoff.is_completed() == false
{
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(_) = res
{
return Ok(Self(self.0));
}
current_flags = res.err().unwrap().into();
new_flags = current_flags.clone();
new_flags.base()?;
backoff.snooze();
}
return Err(RwBufferError::BaseTryAgainLater);
}
}
impl Drop for RwBuffer
{
fn drop(&mut self)
{
let inner = self.inner();
let mut current_flags: RwBufferFlags<Self> = inner.flags.load(Ordering::Relaxed).into();
let mut new_flags = current_flags.clone();
new_flags.unbase();
let backoff = Backoff::new();
for _ in 0..1000
{
let res =
inner
.flags
.compare_exchange_weak(current_flags.into(), new_flags.into(), Ordering::SeqCst, Ordering::Acquire);
if let Ok(flags) = res.map(|v| <u64 as Into<RwBufferFlags<Self>>>::into(v))
{
if flags.is_drop_inplace() == true
{
unsafe { ptr::drop_in_place(self.0.as_ptr()) };
}
return;
}
current_flags = res.err().unwrap().into();
new_flags = current_flags.clone();
new_flags.unbase();
backoff.snooze();
}
panic!("assertion trap: RwBuffer::drop can not drop RwBuffer in reasonable time!");
}
}
#[derive(Debug)]
pub struct RwBuffers
{
buf_len: usize,
bufs_cnt_lim: usize,
buffs: VecDeque<RwBuffer>
}
impl RwBuffers
{
pub
fn new(buf_len: usize, pre_init_cnt: usize, bufs_cnt_lim: usize) -> RwBufferRes<Self>
{
if pre_init_cnt > bufs_cnt_lim
{
return Err(RwBufferError::InvalidArguments);
}
else if buf_len == 0
{
return Err(RwBufferError::InvalidArguments);
}
let buffs: VecDeque<RwBuffer> =
if pre_init_cnt > 0
{
let mut buffs = VecDeque::with_capacity(bufs_cnt_lim);
for _ in 0..pre_init_cnt
{
buffs.push_back(RwBuffer::new(buf_len));
}
buffs
}
else
{
VecDeque::with_capacity(bufs_cnt_lim)
};
return Ok(
Self
{
buf_len: buf_len,
bufs_cnt_lim: bufs_cnt_lim,
buffs: buffs,
}
)
}
pub
fn new_unbounded(buf_len: usize, pre_init_cnt: usize) -> Self
{
let mut buffs = VecDeque::with_capacity(pre_init_cnt);
for _ in 0..pre_init_cnt
{
buffs.push_back(RwBuffer::new(buf_len));
}
return
Self
{
buf_len: buf_len,
bufs_cnt_lim: 0,
buffs: buffs,
};
}
pub
fn allocate(&mut self) -> RwBufferRes<RwBuffer>
{
for buf in self.buffs.iter()
{
if let Ok(rwbuf) = buf.acqiure_if_free()
{
return Ok(rwbuf);
}
}
if self.bufs_cnt_lim == 0 || self.buffs.len() < self.bufs_cnt_lim
{
let buf = RwBuffer::new(self.buf_len);
let c_buf = buf.clone_single()?;
self.buffs.push_back(buf);
return Ok(c_buf);
}
return Err(RwBufferError::OutOfBuffers);
}
pub
fn allocate_in_place(&mut self) -> RwBuffer
{
let mut idx = Option::None;
for (i, item) in self.buffs.iter().enumerate()
{
if let Ok(_) = self.buffs[i].acqiure_if_free()
{
idx = Some(i);
break;
}
}
return
idx
.map_or(
RwBuffer::new(self.buf_len),
|f| self.buffs.remove(f).unwrap()
);
}
pub
fn compact(&mut self, mut cnt: usize) -> usize
{
let p_cnt = cnt;
self
.buffs
.retain(
|buf|
{
if buf.is_free() == true
{
cnt -= 1;
return false;
}
return true;
}
);
return p_cnt - cnt;
}
#[cfg(test)]
fn get_flags_by_index(&self, index: usize) -> Option<RwBufferFlags<RwBuffer>>
{
return Some(self.buffs.get(index)?.get_flags());
}
}
#[cfg(feature = "std")]
#[cfg(test)]
mod tests;