mod err;
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll, Waker}
};
use parking_lot::{Condvar, Mutex};
use hashbrown::HashMap;
pub use err::Error;
struct ReadWrite {
max: u64,
used: u64,
idgen: usize,
wakers: HashMap<usize, Waker>
}
impl ReadWrite {
fn new(max: u64) -> Self {
Self {
max,
used: 0,
idgen: 0,
wakers: HashMap::new()
}
}
}
struct Shared {
rw: Mutex<ReadWrite>,
signal: Condvar
}
impl Shared {
pub fn new(max: u64) -> Self {
Self {
rw: Mutex::new(ReadWrite::new(max)),
signal: Condvar::new()
}
}
}
pub struct Space {
sh: Arc<Shared>
}
impl Space {
#[must_use]
pub fn new(size: u64) -> Self {
Self {
sh: Arc::new(Shared::new(size))
}
}
#[must_use]
pub fn remaining(&self) -> u64 {
let rw = self.sh.rw.lock();
if rw.used > rw.max {
return 0;
}
rw.max - rw.used
}
#[must_use]
pub fn size(&self) -> u64 {
self.sh.rw.lock().max
}
pub fn set_limit(&self, size: u64) {
let mut rw = self.sh.rw.lock();
rw.max = size;
}
pub fn reserve_blocking(&self, size: u64) -> Result<SpaceToken, Error> {
let mut rw = self.sh.rw.lock();
loop {
if size > rw.max {
break Err(Error::OutOfBounds);
}
if rw.used >= rw.max || rw.max - rw.used < size {
self.sh.signal.wait(&mut rw);
continue;
}
rw.used += size;
drop(rw);
break Ok(SpaceToken {
sh: Arc::clone(&self.sh),
space: size
});
}
}
#[must_use]
pub fn reserve_async(&self, size: u64) -> ReserveFuture {
ReserveFuture {
want_space: size,
sh: Arc::clone(&self.sh),
waker_id: None
}
}
pub fn try_reserve(&self, size: u64) -> Result<Option<SpaceToken>, Error> {
let mut rw = self.sh.rw.lock();
if size > rw.max {
return Err(Error::OutOfBounds);
}
if rw.used >= rw.max || rw.max - rw.used < size {
return Ok(None);
}
rw.used += size;
drop(rw);
Ok(Some(SpaceToken {
sh: Arc::clone(&self.sh),
space: size
}))
}
#[must_use]
pub fn force_reserve(&self, size: u64) -> SpaceToken {
let mut rw = self.sh.rw.lock();
rw.used += size;
drop(rw);
SpaceToken {
sh: Arc::clone(&self.sh),
space: size
}
}
}
pub struct ReserveFuture {
want_space: u64,
sh: Arc<Shared>,
waker_id: Option<usize>
}
impl Future for ReserveFuture {
type Output = Result<SpaceToken, Error>;
fn poll(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>
) -> Poll<Self::Output> {
let mut rw = self.sh.rw.lock();
if self.want_space > rw.max {
return Poll::Ready(Err(Error::OutOfBounds));
}
if rw.used >= rw.max || rw.max - rw.used < self.want_space {
let id = loop {
rw.idgen = rw.idgen.wrapping_add(1);
if !rw.wakers.contains_key(&rw.idgen) {
break rw.idgen;
}
};
rw.wakers.insert(id, ctx.waker().clone());
drop(rw);
self.waker_id = Some(id);
return Poll::Pending;
}
rw.used += self.want_space;
drop(rw);
Poll::Ready(Ok(SpaceToken {
sh: Arc::clone(&self.sh),
space: self.want_space
}))
}
}
impl Drop for ReserveFuture {
fn drop(&mut self) {
let mut rw = self.sh.rw.lock();
if let Some(id) = self.waker_id {
rw.wakers.remove(&id);
}
}
}
pub struct SpaceToken {
sh: Arc<Shared>,
space: u64
}
impl SpaceToken {
#[must_use]
pub const fn size(&self) -> u64 {
self.space
}
}
impl Drop for SpaceToken {
fn drop(&mut self) {
let mut rw = self.sh.rw.lock();
rw.used -= self.space;
self.sh.signal.notify_all();
for (_, waker) in rw.wakers.drain() {
waker.wake();
}
}
}