use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll, Waker};
#[derive(Clone)]
pub struct ShutdownHandle {
flag: Arc<AtomicBool>,
mio_waker: Option<Arc<mio::Waker>>,
pub(crate) task_waker: Arc<std::sync::Mutex<Option<Waker>>>,
}
impl ShutdownHandle {
pub(crate) fn new() -> Self {
Self {
flag: Arc::new(AtomicBool::new(false)),
mio_waker: None,
task_waker: Arc::new(std::sync::Mutex::new(None)),
}
}
pub(crate) fn set_mio_waker(&mut self, waker: Arc<mio::Waker>) {
self.mio_waker = Some(waker);
}
pub fn trigger(&self) {
self.flag.store(true, Ordering::Release);
if let Ok(mut guard) = self.task_waker.lock() {
if let Some(w) = guard.take() {
w.wake();
}
}
if let Some(w) = &self.mio_waker {
let _ = w.wake();
}
}
pub fn is_shutdown(&self) -> bool {
self.flag.load(Ordering::Acquire)
}
pub(crate) fn flag_ptr(&self) -> Arc<AtomicBool> {
Arc::clone(&self.flag)
}
pub fn signal(&self) -> ShutdownSignal {
ShutdownSignal {
flag: Arc::as_ptr(&self.flag),
task_waker: self.task_waker.clone(),
}
}
}
pub struct ShutdownSignal {
pub(crate) flag: *const AtomicBool,
pub(crate) task_waker: Arc<std::sync::Mutex<Option<Waker>>>,
}
impl ShutdownSignal {
#[must_use]
pub fn current() -> ShutdownSignal {
let (flag, waker_ptr) = crate::context::current_shutdown_ptrs();
assert!(
!flag.is_null(),
"ShutdownSignal::current() called outside Runtime::block_on"
);
assert!(
!waker_ptr.is_null(),
"ShutdownSignal::current(): waker_ptr null while flag non-null (runtime install bug)"
);
let task_waker = unsafe { (*waker_ptr).clone() };
ShutdownSignal { flag, task_waker }
}
}
impl Future for ShutdownSignal {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if unsafe { &*self.flag }.load(Ordering::Acquire) {
return Poll::Ready(());
}
if let Ok(mut guard) = self.task_waker.lock() {
*guard = Some(cx.waker().clone());
}
if unsafe { &*self.flag }.load(Ordering::Acquire) {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
pub fn install_signal_handlers(flag: &Arc<AtomicBool>, mio_waker: &Arc<mio::Waker>) {
let waker_ref = Arc::clone(mio_waker);
signal_hook::flag::register(signal_hook::consts::SIGTERM, Arc::clone(flag))
.expect("failed to register SIGTERM handler");
signal_hook::flag::register(signal_hook::consts::SIGINT, Arc::clone(flag))
.expect("failed to register SIGINT handler");
unsafe {
signal_hook::low_level::register(signal_hook::consts::SIGTERM, move || {
let _ = waker_ref.wake();
})
.expect("failed to register SIGTERM waker");
}
let waker_ref2 = Arc::clone(mio_waker);
unsafe {
signal_hook::low_level::register(signal_hook::consts::SIGINT, move || {
let _ = waker_ref2.wake();
})
.expect("failed to register SIGINT waker");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn shutdown_handle_trigger() {
let handle = ShutdownHandle::new();
assert!(!handle.is_shutdown());
handle.trigger();
assert!(handle.is_shutdown());
}
#[test]
fn shutdown_signal_resolves_after_trigger() {
use crate::{Runtime, spawn_boxed};
use nexus_rt::WorldBuilder;
use std::cell::Cell;
use std::rc::Rc;
let wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let shutdown = rt.shutdown_handle();
let done = Rc::new(Cell::new(false));
let flag = done.clone();
let sh = shutdown.clone();
rt.block_on(async move {
spawn_boxed(async move {
crate::context::sleep(std::time::Duration::from_millis(50)).await;
sh.trigger();
});
shutdown.signal().await;
flag.set(true);
});
assert!(done.get());
}
#[test]
#[should_panic(expected = "called outside Runtime::block_on")]
fn shutdown_signal_current_panics_outside_runtime() {
let _ = ShutdownSignal::current();
}
#[test]
fn shutdown_signal_current_resolves_after_trigger() {
use crate::{Runtime, spawn_boxed};
use nexus_rt::WorldBuilder;
use std::cell::Cell;
use std::rc::Rc;
let wb = WorldBuilder::new();
let mut world = wb.build();
let mut rt = Runtime::new(&mut world);
let shutdown = rt.shutdown_handle();
let done = Rc::new(Cell::new(false));
let flag = done.clone();
let sh = shutdown.clone();
rt.block_on(async move {
spawn_boxed(async move {
crate::context::sleep(std::time::Duration::from_millis(50)).await;
sh.trigger();
});
ShutdownSignal::current().await;
flag.set(true);
});
assert!(done.get());
}
#[test]
fn shutdown_signal_waker_updates_on_repoll() {
use std::task::{RawWaker, RawWakerVTable, Waker};
let handle = ShutdownHandle::new();
let mut signal = Box::pin(handle.signal());
let noop = unsafe {
static V: RawWakerVTable =
RawWakerVTable::new(|p| RawWaker::new(p, &V), |_| {}, |_| {}, |_| {});
Waker::from_raw(RawWaker::new(std::ptr::null(), &V))
};
let mut cx = Context::from_waker(&noop);
assert_eq!(signal.as_mut().poll(&mut cx), Poll::Pending);
let woke = std::cell::Cell::new(false);
let flag_ptr = &woke as *const std::cell::Cell<bool> as *const ();
let tracking = unsafe {
static V2: RawWakerVTable = RawWakerVTable::new(
|p| RawWaker::new(p, &V2),
|p| unsafe { (*(p as *const std::cell::Cell<bool>)).set(true) },
|p| unsafe { (*(p as *const std::cell::Cell<bool>)).set(true) },
|_| {},
);
Waker::from_raw(RawWaker::new(flag_ptr, &V2))
};
let mut cx2 = Context::from_waker(&tracking);
assert_eq!(signal.as_mut().poll(&mut cx2), Poll::Pending);
handle.trigger();
assert!(woke.get(), "latest waker must fire on trigger");
}
#[test]
fn shutdown_signal_already_triggered() {
use std::task::{RawWaker, RawWakerVTable, Waker};
let handle = ShutdownHandle::new();
handle.trigger();
let mut signal = Box::pin(handle.signal());
let waker = unsafe {
static V: RawWakerVTable =
RawWakerVTable::new(|p| RawWaker::new(p, &V), |_| {}, |_| {}, |_| {});
Waker::from_raw(RawWaker::new(std::ptr::null(), &V))
};
let mut cx = Context::from_waker(&waker);
assert_eq!(signal.as_mut().poll(&mut cx), Poll::Ready(()));
}
}