use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::{Mutex, OnceLock};
use std::task::{Context, Poll, Waker};
use crate::platform::sys::{create_pipe, Interest};
use crate::reactor::with_reactor;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SignalKind {
Interrupt,
Terminate,
}
pub(crate) const SIGNAL_TOKEN: usize = usize::MAX - 1;
struct SignalState {
write_fd: i32,
read_fd: i32,
waiters: Vec<(SignalKind, Waker)>,
pending: Vec<SignalKind>,
}
static SIGNAL_STATE: OnceLock<Mutex<SignalState>> = OnceLock::new();
fn ensure_init() -> io::Result<()> {
let mut init_err: Option<io::Error> = None;
SIGNAL_STATE.get_or_init(|| {
match init_signal_state() {
Ok(state) => Mutex::new(state),
Err(e) => {
init_err = Some(e);
Mutex::new(SignalState {
write_fd: -1,
read_fd: -1,
waiters: Vec::new(),
pending: Vec::new(),
})
}
}
});
if let Some(e) = init_err {
return Err(e);
}
Ok(())
}
fn init_signal_state() -> io::Result<SignalState> {
let (read_fd, write_fd) = create_pipe()?;
with_reactor(|r| r.register(read_fd, SIGNAL_TOKEN, Interest::READABLE))?;
install_handler(libc::SIGINT, write_fd)?;
install_handler(libc::SIGTERM, write_fd)?;
Ok(SignalState {
write_fd,
read_fd,
waiters: Vec::new(),
pending: Vec::new(),
})
}
fn install_handler(signum: libc::c_int, write_fd: i32) -> io::Result<()> {
SIGNAL_WRITE_FD.store(write_fd, std::sync::atomic::Ordering::Relaxed);
let mut sa: libc::sigaction = unsafe { std::mem::zeroed() };
sa.sa_sigaction = signal_handler as *const () as libc::sighandler_t;
unsafe { libc::sigemptyset(&mut sa.sa_mask) };
sa.sa_flags = libc::SA_RESTART;
let rc = unsafe { libc::sigaction(signum, &sa, std::ptr::null_mut()) };
if rc == -1 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
static SIGNAL_WRITE_FD: std::sync::atomic::AtomicI32 = std::sync::atomic::AtomicI32::new(-1);
extern "C" fn signal_handler(signum: libc::c_int) {
let fd = SIGNAL_WRITE_FD.load(std::sync::atomic::Ordering::Relaxed);
if fd == -1 {
return;
}
let b: u8 = signum as u8;
unsafe { libc::write(fd, &b as *const u8 as *const libc::c_void, 1) };
}
#[allow(dead_code)] pub(crate) fn on_signal_readable() {
let state_lock = match SIGNAL_STATE.get() {
Some(s) => s,
None => return,
};
let mut state = state_lock.lock().unwrap();
let mut buf = [0u8; 64];
let mut received: Vec<SignalKind> = Vec::new();
loop {
let n = unsafe {
libc::read(
state.read_fd,
buf.as_mut_ptr() as *mut libc::c_void,
buf.len(),
)
};
if n <= 0 {
break; }
for &sigbyte in &buf[..n as usize] {
let kind = match sigbyte as libc::c_int {
libc::SIGINT => Some(SignalKind::Interrupt),
libc::SIGTERM => Some(SignalKind::Terminate),
_ => None, };
if let Some(k) = kind {
received.push(k);
}
}
}
for kind in received {
state.pending.push(kind);
}
let pending_snapshot = state.pending.clone();
let mut wakers: Vec<Waker> = Vec::new();
state.waiters.retain(|(kind, waker)| {
if pending_snapshot.contains(kind) {
wakers.push(waker.clone());
false } else {
true }
});
drop(state);
for w in wakers {
w.wake();
}
}
pub struct Signal {
kind: SignalKind,
done: bool,
}
impl Future for Signal {
type Output = SignalKind;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.done {
return Poll::Ready(self.kind);
}
let state_lock = match SIGNAL_STATE.get() {
Some(s) => s,
None => return Poll::Pending,
};
let mut state = state_lock.lock().unwrap();
if let Some(pos) = state.pending.iter().position(|k| *k == self.kind) {
state.pending.remove(pos);
self.done = true;
return Poll::Ready(self.kind);
}
let kind = self.kind;
let new_waker = cx.waker().clone();
let existing = state
.waiters
.iter_mut()
.find(|(k, w)| *k == kind && w.will_wake(&new_waker));
if let Some((_, w)) = existing {
*w = new_waker;
} else {
state.waiters.push((kind, new_waker));
}
Poll::Pending
}
}
pub fn signal(kind: SignalKind) -> io::Result<Signal> {
ensure_init()?;
Ok(Signal { kind, done: false })
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn signal_kind_equality() {
assert_eq!(SignalKind::Interrupt, SignalKind::Interrupt);
assert_ne!(SignalKind::Interrupt, SignalKind::Terminate);
}
#[test]
fn signal_future_creation_succeeds() {
let _sig = signal(SignalKind::Interrupt).expect("signal init failed");
}
#[test]
fn signal_write_fd_set_after_init() {
let _sig = signal(SignalKind::Terminate).expect("init");
assert!(
SIGNAL_WRITE_FD.load(std::sync::atomic::Ordering::Relaxed) >= 0,
"write_fd not set after init"
);
}
#[test]
fn self_pipe_read_fd_registered() {
let _sig = signal(SignalKind::Interrupt).expect("init");
}
#[allow(dead_code)]
fn _manual_signal_delivery_test() {}
}