spacetoken 0.1.0

Space allocator that is decoupled from actual storage.
Documentation
//! [`Space`] can be used to track how much has been reserved of a (limited)
//! amount of space.
//!
//! The idea is that the `Space` is decoupled from the actual storage space it
//! is meant to represent.  (It can refer to persistent disk storage,
//! in-process memory, or any other sized storage).
//!
//! # Usage
//! An application creates a `Space` object, passing into it the amount of
//! total space it can hold.
//!
//! Once the `Space` object has been created (representing the total amount of
//! space), the application can reserve space from this pool using:
//! - [`Space::reserve_blocking()`] will block the calling thread and wait for
//!   the requested amount of space to become available.
//! - [`Space::reserve_async()`] is similar to  `Space::reserve_blocking`, but
//!   is intended for `async` contexts.
//! - [`Space::try_reserve`] will immediately fail if the reservation request
//!   can not be fulfilled.
//! - [`Space::force_reserve`] will always immediately succeed (this will allow
//!   the used storage space to overflow).
//!
//! All of the reservation functions return a [`SpaceToken`] instance, which
//! represents the allocated space.  This is returned to the `Space` context
//! when the `SpaceToken` is dropped.

mod err;

use std::{
  future::Future,
  pin::Pin,
  sync::Arc,
  task::{Context, Poll, Waker}
};

use parking_lot::{Condvar, Mutex};

use hashbrown::HashMap;

pub use err::Error;


struct ReadWrite {
  max: u64,
  used: u64,
  idgen: usize,
  wakers: HashMap<usize, Waker>
}

impl ReadWrite {
  fn new(max: u64) -> Self {
    Self {
      max,
      used: 0,
      idgen: 0,
      wakers: HashMap::new()
    }
  }
}


struct Shared {
  rw: Mutex<ReadWrite>,
  signal: Condvar
}

impl Shared {
  pub fn new(max: u64) -> Self {
    Self {
      rw: Mutex::new(ReadWrite::new(max)),
      signal: Condvar::new()
    }
  }
}


/// A representation of a limited storage space that can be reserved.
pub struct Space {
  sh: Arc<Shared>
}

impl Space {
  /// Create a new space allocation tracker with an initial amount of total
  /// unallocated space of `size` bytes.
  #[must_use]
  pub fn new(size: u64) -> Self {
    Self {
      sh: Arc::new(Shared::new(size))
    }
  }

  /// Return how much space is remaining for reservation.
  ///
  /// Returns `0` if the `Space` is full or over-committed.
  #[must_use]
  pub fn remaining(&self) -> u64 {
    let rw = self.sh.rw.lock();
    if rw.used > rw.max {
      return 0;
    }
    rw.max - rw.used
  }

  /// Return the maximum space limit.
  #[must_use]
  pub fn size(&self) -> u64 {
    self.sh.rw.lock().max
  }

  /// Set new maximum size limit.
  ///
  /// Setting a new maximum size that is smaller than the current total size is
  /// allowed; it will just cause new reservations to fail until enough tokens
  /// has been released.
  pub fn set_limit(&self, size: u64) {
    let mut rw = self.sh.rw.lock();
    rw.max = size;
  }

  /// Reserve space, block and wait for space to become available if there's no
  /// room.
  ///
  /// # Errors
  /// Returns [`Error::OutOfBounds`] is the size requested is larger than the
  /// total storage space.
  pub fn reserve_blocking(&self, size: u64) -> Result<SpaceToken, Error> {
    let mut rw = self.sh.rw.lock();
    loop {
      if size > rw.max {
        // The requested size is greater than the max size
        break Err(Error::OutOfBounds);
      }

      // If overflowed or the request size can not currently be fulfilled, then
      // wait for some token to release its allocation.
      if rw.used >= rw.max || rw.max - rw.used < size {
        self.sh.signal.wait(&mut rw);
        continue;
      }

      rw.used += size;

      drop(rw);

      break Ok(SpaceToken {
        sh: Arc::clone(&self.sh),
        space: size
      });
    }
  }

  /// Reserve space, block and wait (in an `async` context) for space to become
  /// available if there's no room.
  #[must_use]
  pub fn reserve_async(&self, size: u64) -> ReserveFuture {
    ReserveFuture {
      want_space: size,
      sh: Arc::clone(&self.sh),
      waker_id: None
    }
  }

  /// Reserve space.  Returns immediately.  Returns `Ok(None)` if the space is
  /// not currently available to fulfill the request.
  ///
  /// # Errors
  /// Returns [`Error::OutOfBounds`] if the requested size is larger than the
  /// total alloctable space.
  pub fn try_reserve(&self, size: u64) -> Result<Option<SpaceToken>, Error> {
    let mut rw = self.sh.rw.lock();
    if size > rw.max {
      // The requested size is greater than the max size
      return Err(Error::OutOfBounds);
    }

    // If overflowed or the request size can not currently be fulfilled, then
    // wait for some token to release its allocation.
    if rw.used >= rw.max || rw.max - rw.used < size {
      return Ok(None);
    }

    rw.used += size;

    drop(rw);

    Ok(Some(SpaceToken {
      sh: Arc::clone(&self.sh),
      space: size
    }))
  }

  /// Forcibly reserve space.
  ///
  /// This method will always succeed immediately.  By design it can
  /// allocate more space than the limit set by the `Space` object.
  #[must_use]
  pub fn force_reserve(&self, size: u64) -> SpaceToken {
    let mut rw = self.sh.rw.lock();
    rw.used += size;

    drop(rw);

    SpaceToken {
      sh: Arc::clone(&self.sh),
      space: size
    }
  }
}


pub struct ReserveFuture {
  want_space: u64,
  sh: Arc<Shared>,
  waker_id: Option<usize>
}

impl Future for ReserveFuture {
  type Output = Result<SpaceToken, Error>;
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    let mut rw = self.sh.rw.lock();

    if self.want_space > rw.max {
      return Poll::Ready(Err(Error::OutOfBounds));
    }

    if rw.used >= rw.max || rw.max - rw.used < self.want_space {
      let id = loop {
        rw.idgen = rw.idgen.wrapping_add(1);
        if !rw.wakers.contains_key(&rw.idgen) {
          break rw.idgen;
        }
      };
      rw.wakers.insert(id, ctx.waker().clone());

      drop(rw);

      self.waker_id = Some(id);

      return Poll::Pending;
    }

    rw.used += self.want_space;

    drop(rw);

    Poll::Ready(Ok(SpaceToken {
      sh: Arc::clone(&self.sh),
      space: self.want_space
    }))
  }
}

impl Drop for ReserveFuture {
  fn drop(&mut self) {
    let mut rw = self.sh.rw.lock();
    if let Some(id) = self.waker_id {
      rw.wakers.remove(&id);
    }
  }
}


/// Representation of space reserved from a [`Space`] context.
pub struct SpaceToken {
  sh: Arc<Shared>,

  /// The amount of space reserved by this token.
  space: u64
}

impl SpaceToken {
  #[must_use]
  pub const fn size(&self) -> u64 {
    self.space
  }
}

impl Drop for SpaceToken {
  fn drop(&mut self) {
    let mut rw = self.sh.rw.lock();

    // Return space
    rw.used -= self.space;

    // Wake up all threads waiting for space to become available
    self.sh.signal.notify_all();

    // Wake up all tasks that are waiting for space
    for (_, waker) in rw.wakers.drain() {
      waker.wake();
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :