killswitch 0.4.2

Killswitch used to broadcast a shutdown request.
Documentation
//! The killswitch waiter half implementation

use std::{
  future::Future,
  num::NonZeroUsize,
  pin::Pin,
  sync::atomic::Ordering,
  sync::Arc,
  task::{Context, Poll}
};

use super::Shared;


/// Receiver object for [`super::KillTrig`] killswitches.
pub struct KillWait {
  pub(super) ctx: Arc<Shared>
}

impl KillWait {
  /// Return a future which waits for the kill switch to trigger.
  pub fn wait(&self) -> KillWaitFuture {
    KillWaitFuture {
      ctx: Arc::clone(&self.ctx),
      id: None
    }
  }
}

impl Clone for KillWait {
  fn clone(&self) -> KillWait {
    KillWait {
      ctx: Arc::clone(&self.ctx)
    }
  }
}

impl Drop for KillWait {
  fn drop(&mut self) {
    let mut state = self.ctx.state.lock();
    // This is a safe check as long as it is done within a lock.
    //
    // If there are two strong references and there's a waker set, then it is
    // assumed that the trigger if waiting (if there's a waker set in the
    // context), and which accounts for one of the strong referenced.  The
    // other strong reference is this KillWait object.
    if Arc::<super::Shared>::strong_count(&self.ctx) == 2
      && state.waker.is_some()
    {
      if let Some(waker) = state.waker.take() {
        waker.wake();
      }
    }
  }
}


/// Future returned by [`KillWait::wait()`].
pub struct KillWaitFuture {
  ctx: Arc<Shared>,
  id: Option<NonZeroUsize>
}

impl Future for KillWaitFuture {
  type Output = ();
  fn poll(
    mut self: Pin<&mut Self>,
    ctx: &mut Context<'_>
  ) -> Poll<Self::Output> {
    match self.ctx.triggered.load(Ordering::SeqCst) {
      true => Poll::Ready(()),
      false => {
        if self.id.is_none() {
          let mut state = self.ctx.state.lock();

          // Find a unique (unused) identifier.
          let id = loop {
            // .id() will never return 0.
            let id = self.ctx.id();
            if !state.waiting.contains_key(&id) {
              break id;
            }
          };

          state.waiting.insert(id, ctx.waker().clone());

          // Drop the guard so self can be updated
          drop(state);
          self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
        }
        Poll::Pending
      }
    }
  }
}


/// If a KillWaitFuture object is released and it has a Waker in the internal
/// hashmap, then release that Waker object.
impl Drop for KillWaitFuture {
  fn drop(&mut self) {
    if let Some(id) = self.id {
      let mut state = self.ctx.state.lock();
      state.waiting.remove(&id.get());
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :