#![warn(missing_docs)]
use std::future::Future;
use std::sync::{Arc, Mutex};
mod shutdown_complete;
pub use shutdown_complete::ShutdownComplete;
mod shutdown_signal;
pub use shutdown_signal::ShutdownSignal;
mod wrap_cancel;
use waker_list::WakerList;
pub use wrap_cancel::WrapCancel;
mod wrap_trigger_shutdown;
pub use wrap_trigger_shutdown::WrapTriggerShutdown;
mod wrap_delay_shutdown;
pub use wrap_delay_shutdown::WrapDelayShutdown;
mod waker_list;
#[derive(Clone)]
pub struct ShutdownManager<T: Clone> {
inner: Arc<Mutex<ShutdownManagerInner<T>>>,
}
impl<T: Clone> ShutdownManager<T> {
#[inline]
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(ShutdownManagerInner::new())),
}
}
#[inline]
pub fn is_shutdown_triggered(&self) -> bool {
self.inner.lock().unwrap().shutdown_reason.is_some()
}
#[inline]
pub fn is_shutdown_completed(&self) -> bool {
let inner = self.inner.lock().unwrap();
inner.shutdown_reason.is_some() && inner.delay_tokens == 0
}
#[inline]
pub fn shutdown_reason(&self) -> Option<T> {
self.inner.lock().unwrap().shutdown_reason.clone()
}
#[inline]
pub fn wait_shutdown_triggered(&self) -> ShutdownSignal<T> {
ShutdownSignal {
inner: self.inner.clone(),
waker_token: None,
}
}
#[inline]
pub fn wait_shutdown_complete(&self) -> ShutdownComplete<T> {
ShutdownComplete {
inner: self.inner.clone(),
waker_token: None,
}
}
#[inline]
pub fn trigger_shutdown(&self, reason: T) -> Result<(), ShutdownAlreadyStarted<T>> {
self.inner.lock().unwrap().shutdown(reason)
}
#[inline]
pub fn wrap_cancel<F: Future>(&self, future: F) -> WrapCancel<T, F> {
self.wait_shutdown_triggered().wrap_cancel(future)
}
#[inline]
pub fn wrap_trigger_shutdown<F: Future>(&self, shutdown_reason: T, future: F) -> WrapTriggerShutdown<T, F> {
self.trigger_shutdown_token(shutdown_reason).wrap_future(future)
}
#[inline]
pub fn wrap_delay_shutdown<F: Future>(&self, future: F) -> Result<WrapDelayShutdown<T, F>, ShutdownAlreadyCompleted<T>> {
Ok(self.delay_shutdown_token()?.wrap_future(future))
}
#[inline]
pub fn delay_shutdown_token(&self) -> Result<DelayShutdownToken<T>, ShutdownAlreadyCompleted<T>> {
let mut inner = self.inner.lock().unwrap();
if inner.delay_tokens == 0 {
if let Some(reason) = &inner.shutdown_reason {
return Err(ShutdownAlreadyCompleted::new(reason.clone()));
}
}
inner.increase_delay_count();
Ok(DelayShutdownToken {
inner: self.inner.clone(),
})
}
#[inline]
pub fn trigger_shutdown_token(&self, shutdown_reason: T) -> TriggerShutdownToken<T> {
TriggerShutdownToken {
shutdown_reason: Arc::new(Mutex::new(Some(shutdown_reason))),
inner: self.inner.clone(),
}
}
}
impl<T: Clone> Default for ShutdownManager<T> {
#[inline]
fn default() -> Self {
Self::new()
}
}
pub struct DelayShutdownToken<T: Clone> {
inner: Arc<Mutex<ShutdownManagerInner<T>>>,
}
impl<T: Clone> DelayShutdownToken<T> {
#[inline]
pub fn wrap_future<F: Future>(self, future: F) -> WrapDelayShutdown<T, F> {
WrapDelayShutdown {
delay_token: Some(self),
future,
}
}
}
impl<T: Clone> Clone for DelayShutdownToken<T> {
#[inline]
fn clone(&self) -> Self {
self.inner.lock().unwrap().increase_delay_count();
DelayShutdownToken {
inner: self.inner.clone(),
}
}
}
impl<T: Clone> Drop for DelayShutdownToken<T> {
#[inline]
fn drop(&mut self) {
self.inner.lock().unwrap().decrease_delay_count();
}
}
#[derive(Clone)]
pub struct TriggerShutdownToken<T: Clone> {
shutdown_reason: Arc<Mutex<Option<T>>>,
inner: Arc<Mutex<ShutdownManagerInner<T>>>,
}
impl<T: Clone> TriggerShutdownToken<T> {
#[inline]
pub fn wrap_future<F: Future>(self, future: F) -> WrapTriggerShutdown<T, F> {
WrapTriggerShutdown {
trigger_shutdown_token: Some(self),
future,
}
}
#[inline]
pub fn forget(self) {
std::mem::forget(self)
}
}
impl<T: Clone> Drop for TriggerShutdownToken<T> {
#[inline]
fn drop(&mut self) {
let mut inner = self.inner.lock().unwrap();
let reason = self.shutdown_reason.lock().unwrap().take();
if let Some(reason) = reason {
inner.shutdown(reason).ok();
}
}
}
struct ShutdownManagerInner<T> {
shutdown_reason: Option<T>,
delay_tokens: usize,
on_shutdown: WakerList,
on_shutdown_complete: WakerList,
}
impl<T: Clone> ShutdownManagerInner<T> {
fn new() -> Self {
Self {
shutdown_reason: None,
delay_tokens: 0,
on_shutdown_complete: WakerList::new(),
on_shutdown: WakerList::new(),
}
}
fn increase_delay_count(&mut self) {
self.delay_tokens += 1;
}
fn decrease_delay_count(&mut self) {
self.delay_tokens -= 1;
if self.delay_tokens == 0 {
self.notify_shutdown_complete();
}
}
fn shutdown(&mut self, reason: T) -> Result<(), ShutdownAlreadyStarted<T>> {
match &self.shutdown_reason {
Some(original_reason) => {
Err(ShutdownAlreadyStarted::new(original_reason.clone(), reason))
},
None => {
self.shutdown_reason = Some(reason);
self.on_shutdown.wake_all();
if self.delay_tokens == 0 {
self.notify_shutdown_complete()
}
Ok(())
},
}
}
fn notify_shutdown_complete(&mut self) {
self.on_shutdown_complete.wake_all();
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ShutdownAlreadyStarted<T> {
pub shutdown_reason: T,
pub ignored_reason: T,
}
impl<T> ShutdownAlreadyStarted<T> {
pub(crate) const fn new(shutdown_reason: T, ignored_reason:T ) -> Self {
Self { shutdown_reason, ignored_reason }
}
}
impl<T: std::fmt::Debug> std::error::Error for ShutdownAlreadyStarted<T> {}
impl<T> std::fmt::Display for ShutdownAlreadyStarted<T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "shutdown has already started, can not delay shutdown completion")
}
}
#[derive(Debug)]
#[non_exhaustive]
pub struct ShutdownAlreadyCompleted<T> {
pub shutdown_reason: T,
}
impl<T> ShutdownAlreadyCompleted<T> {
pub(crate) const fn new(shutdown_reason: T) -> Self {
Self { shutdown_reason }
}
}
impl<T: std::fmt::Debug> std::error::Error for ShutdownAlreadyCompleted<T> {}
impl<T> std::fmt::Display for ShutdownAlreadyCompleted<T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "shutdown has already completed, can not delay shutdown completion")
}
}