wakerizer 0.2.0

Helpers for resources that may have multiple concurrent wakers.
Documentation
//! _wakerizer_ is intended to be used to keep track of multiple `Future`s
//! waiting for a single (shared) resource.
//!
//! It can assist in developing behaviors vaguely similar to `Condvar`'s
//! `notify_all()`.
//!
//! # Usage
//! A resource that may be waited on creates and stores a [`Wakers`] object.
//!
//! Each time a `Future` is created that will be waiting for the resource, its
//! `Wakers` spawns a [`Waiter`], which is stored with the `Future`.
//!
//! If the Future's `poll()` function returns `Poll::Pending`, it calls its
//! `Waiter::prime()` to indicate that it is a Future that is actively waiting.
//!
//! Whenever the resource is ready, it can signal waiting futures using
//! [`Wakers::wake_all()`].

use std::{
  sync::Arc,
  task::{Context, Waker}
};

use parking_lot::Mutex;

use rustc_hash::FxHashMap;

struct Inner {
  wake_on_drop: bool,
  wakers: FxHashMap<u32, Waker>,
  idgen: u32
}


/// A set of wakers that can be used to wake up pending futures.
#[repr(transparent)]
#[derive(Clone)]
pub struct Wakers(Arc<Mutex<Inner>>);

impl Wakers {
  #[must_use]
  pub fn new() -> Self {
    Self::default()
  }

  /// Make `Wakers` wake all registered [`Waiter`]s when it's dropped.
  pub fn wake_on_drop(&self) {
    let mut inner = self.0.lock();
    inner.wake_on_drop = true;
  }

  /// Wake all waiting tasks.
  pub fn wake_all(&self) {
    self
      .0
      .lock()
      .wakers
      .drain()
      .for_each(|(_, waker)| waker.wake());
  }

  /// Allocate a new, unprimed, [`Waiter`].
  ///
  /// Call `Waiter::prime()` to "activate" the `Waiter`.
  ///
  /// The [`Wakers::waiter_ctx()`] can be called to create a primed `Waiter`
  /// immediately.
  #[must_use]
  pub fn waiter(&self) -> Waiter {
    Waiter {
      sh: Arc::clone(&self.0),
      id: None
    }
  }

  /// Allocate a new [`Waiter`]
  ///
  /// This is called just before a poll method is about to return
  /// `Poll::Pending`.  The returned [`Waiter`] should be stored within the
  /// object that implements `Future`.
  pub fn waiter_ctx(&self, ctx: &mut Context<'_>) -> Waiter {
    let mut waiter = self.waiter();
    waiter.prime(ctx);
    waiter
  }
}

impl Default for Wakers {
  fn default() -> Self {
    let inner = Inner {
      wake_on_drop: false,
      wakers: FxHashMap::default(),
      idgen: 0
    };
    Self(Arc::new(Mutex::new(inner)))
  }
}

impl Drop for Wakers {
  fn drop(&mut self) {
    let mut inner = self.0.lock();
    if inner.wake_on_drop {
      inner.wakers.drain().for_each(|(_, waker)| waker.wake());
    }
  }
}


/// Representation of a waker in waiting state.
///
/// Instance of this should be created in `Future`'s `poll()` method when
/// returning `Poll::Pending`.  The object should be stored in the same object
/// that implements `Future`.
///
/// This ensures that the waker is automatically removed from the collection of
/// wakers when it is dropped.
pub struct Waiter {
  sh: Arc<Mutex<Inner>>,
  id: Option<u32>
}

impl Waiter {
  /// Prime this waiter for waiting for a Waker.
  ///
  /// This function is typically called just before returning `Poll::Pending`.
  pub fn prime(&mut self, ctx: &mut Context<'_>) {
    let mut g = self.sh.lock();
    let id = loop {
      g.idgen = g.idgen.wrapping_add(1);
      if !g.wakers.contains_key(&g.idgen) {
        break g.idgen;
      }
    };
    self.id = Some(id);
    g.wakers.insert(id, ctx.waker().clone());
  }
}

impl Drop for Waiter {
  fn drop(&mut self) {
    let mut g = self.sh.lock();
    if let Some(id) = self.id {
      g.wakers.remove(&id);
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :