signal_msg/lib.rs
1#![cfg(unix)]
2//! Handle UNIX process signals with a shared channel.
3//!
4//! This crate provides a correct, ergonomic approach to UNIX signal handling.
5//! The self-pipe trick ensures that the actual signal handler only calls
6//! `write(2)` — the only async-signal-safe operation needed. A background
7//! thread named `signal-msg` performs all non-trivial work. Multiple
8//! independent subscribers are supported via [`Signals::subscribe`].
9//!
10//! Signal handlers are installed automatically when [`Signals::new`] returns,
11//! so it is impossible to forget to activate them.
12//!
13//! # Example
14//!
15//! ```no_run
16//! use signal_msg::Signals;
17//!
18//! let signals = Signals::new().expect("failed to create signal handler");
19//! let receiver = signals.subscribe();
20//! println!("Waiting for a signal...");
21//! match receiver.listen() {
22//! Ok(sig) => println!("received: {}", sig),
23//! Err(e) => eprintln!("channel error: {}", e),
24//! }
25//! ```
26
27use std::os::unix::io::RawFd;
28use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
29use std::sync::{mpsc, Arc, Mutex};
30
31/// Global write end of the self-pipe. Set by [`Signals::new`], cleared on drop.
32/// Written to only by [`pipe_handler`], which is async-signal-safe.
33static WRITE_FD: AtomicI32 = AtomicI32::new(-1);
34
35/// Guards against creating multiple [`Signals`] instances simultaneously.
36static INITIALIZED: AtomicBool = AtomicBool::new(false);
37
38const HANDLED_SIGNALS: &[libc::c_int] = &[
39 libc::SIGHUP,
40 libc::SIGINT,
41 libc::SIGILL,
42 libc::SIGABRT,
43 libc::SIGFPE,
44 libc::SIGPIPE,
45 libc::SIGALRM,
46 libc::SIGTERM,
47];
48
49/// OS-level signal handler. Must only call async-signal-safe functions.
50/// Writes the signal number as a single byte into the self-pipe.
51extern "C" fn pipe_handler(sig: libc::c_int) {
52 let fd = WRITE_FD.load(Ordering::Relaxed);
53 if fd >= 0 {
54 // Signal numbers fit in a u8 (POSIX signals are 1–31).
55 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
56 let byte = sig as u8;
57 // SAFETY: `fd` is a valid open file descriptor (the write end of the
58 // self-pipe, published by Signals::new before any signal handler is
59 // registered). `byte` is a pointer to a live stack variable of the
60 // correct size. `libc::write` is async-signal-safe per POSIX.2017 §2.4.3.
61 unsafe {
62 libc::write(fd, std::ptr::addr_of!(byte).cast::<libc::c_void>(), 1);
63 }
64 }
65}
66
67/// Installs `pipe_handler` for one signal number via `sigaction(2)`.
68///
69/// # Safety
70///
71/// `signum` must be a valid, catchable POSIX signal number. `pipe_handler` must
72/// remain a valid function pointer for the lifetime of the process (it is a
73/// `static extern "C" fn`, so this is always true). `pipe_handler` only calls
74/// `write(2)`, which is async-signal-safe per POSIX.2017 §2.4.3.
75unsafe fn install_handler(signum: libc::c_int) {
76 let mut sa: libc::sigaction = std::mem::zeroed();
77 sa.sa_sigaction = pipe_handler as *const () as libc::sighandler_t;
78 sa.sa_flags = libc::SA_RESTART;
79 libc::sigaction(signum, &sa, std::ptr::null_mut());
80}
81
82/// A UNIX signal that can be received through a [`Signals`] channel.
83///
84/// Signals that cannot be caught (`SIGKILL`), that generate core dumps by
85/// default (`SIGQUIT`), or that indicate unrecoverable program faults
86/// (`SIGSEGV`) are intentionally excluded.
87#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
88pub enum Signal {
89 /// `SIGHUP` — terminal hang-up or controlling process died.
90 Hup,
91 /// `SIGINT` — interactive interrupt (typically Ctrl-C).
92 Int,
93 /// `SIGILL` — illegal CPU instruction.
94 Ill,
95 /// `SIGABRT` — process abort.
96 Abrt,
97 /// `SIGFPE` — floating-point exception.
98 Fpe,
99 /// `SIGPIPE` — write to a broken pipe.
100 Pipe,
101 /// `SIGALRM` — alarm-clock timer expired.
102 Alrm,
103 /// `SIGTERM` — polite termination request.
104 Term,
105}
106
107impl Signal {
108 fn from_raw(n: u8) -> Option<Self> {
109 match libc::c_int::from(n) {
110 libc::SIGHUP => Some(Signal::Hup),
111 libc::SIGINT => Some(Signal::Int),
112 libc::SIGILL => Some(Signal::Ill),
113 libc::SIGABRT => Some(Signal::Abrt),
114 libc::SIGFPE => Some(Signal::Fpe),
115 libc::SIGPIPE => Some(Signal::Pipe),
116 libc::SIGALRM => Some(Signal::Alrm),
117 libc::SIGTERM => Some(Signal::Term),
118 _ => None,
119 }
120 }
121}
122
123impl std::fmt::Display for Signal {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 let name = match self {
126 Signal::Hup => "SIGHUP",
127 Signal::Int => "SIGINT",
128 Signal::Ill => "SIGILL",
129 Signal::Abrt => "SIGABRT",
130 Signal::Fpe => "SIGFPE",
131 Signal::Pipe => "SIGPIPE",
132 Signal::Alrm => "SIGALRM",
133 Signal::Term => "SIGTERM",
134 };
135 f.write_str(name)
136 }
137}
138
139/// An error produced by signal channel operations.
140///
141/// `SignalError` implements `Clone` and `PartialEq`; for the
142/// [`OsError`][SignalError::OsError] variant only the
143/// [`io::ErrorKind`][std::io::ErrorKind] is compared and cloned, since
144/// [`io::Error`][std::io::Error] itself is neither `Clone` nor `PartialEq`.
145#[derive(Debug)]
146pub enum SignalError {
147 /// The channel is disconnected (the paired [`Signals`] handle was dropped).
148 Disconnected,
149 /// [`Signals::new`] was called while another `Signals` instance is active.
150 AlreadyInitialized,
151 /// An OS-level operation failed during signal channel setup.
152 OsError(std::io::Error),
153}
154
155impl Clone for SignalError {
156 fn clone(&self) -> Self {
157 match self {
158 Self::Disconnected => Self::Disconnected,
159 Self::AlreadyInitialized => Self::AlreadyInitialized,
160 // Preserve the error kind; the OS message is lost on clone.
161 Self::OsError(e) => Self::OsError(std::io::Error::from(e.kind())),
162 }
163 }
164}
165
166impl PartialEq for SignalError {
167 fn eq(&self, other: &Self) -> bool {
168 match (self, other) {
169 (Self::Disconnected, Self::Disconnected) => true,
170 (Self::AlreadyInitialized, Self::AlreadyInitialized) => true,
171 (Self::OsError(a), Self::OsError(b)) => a.kind() == b.kind(),
172 _ => false,
173 }
174 }
175}
176
177impl Eq for SignalError {}
178
179impl std::fmt::Display for SignalError {
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 match self {
182 SignalError::Disconnected => f.write_str("signal channel disconnected"),
183 SignalError::AlreadyInitialized => {
184 f.write_str("a Signals instance is already active")
185 }
186 SignalError::OsError(e) => write!(f, "OS error during signal channel setup: {e}"),
187 }
188 }
189}
190
191impl std::error::Error for SignalError {
192 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
193 match self {
194 SignalError::OsError(e) => Some(e),
195 _ => None,
196 }
197 }
198}
199
200type Senders = Arc<Mutex<Vec<mpsc::Sender<Signal>>>>;
201
202#[derive(Debug)]
203struct SignalsInner {
204 write_fd: RawFd,
205 senders: Senders,
206}
207
208impl Drop for SignalsInner {
209 fn drop(&mut self) {
210 // Closing write_fd causes the background thread's read() to return 0
211 // (EOF), signalling it to exit cleanly.
212 // SAFETY: `write_fd` is a valid open file descriptor owned exclusively
213 // by this struct. No other code closes it while SignalsInner is alive.
214 unsafe { libc::close(self.write_fd) };
215 WRITE_FD.store(-1, Ordering::Relaxed);
216 INITIALIZED.store(false, Ordering::Relaxed);
217 }
218}
219
220/// A handle for subscribing to OS signals delivered through a shared channel.
221///
222/// `Signals` is cheaply cloneable (backed by an [`Arc`]). Only one `Signals`
223/// instance may be active at a time per process; calling [`Signals::new`] while
224/// another instance exists returns [`SignalError::AlreadyInitialized`].
225///
226/// OS-level signal handlers are installed automatically by [`Signals::new`],
227/// so signals are delivered from the moment the value is returned. Call
228/// [`subscribe`][Signals::subscribe] to obtain a [`Receiver`].
229///
230/// Dropping the last clone releases all resources including the background
231/// thread.
232///
233/// # Example
234///
235/// ```no_run
236/// use signal_msg::Signals;
237///
238/// let signals = Signals::new().expect("failed to create signal handler");
239/// let receiver = signals.subscribe();
240/// match receiver.listen() {
241/// Ok(sig) => println!("received: {}", sig),
242/// Err(e) => eprintln!("error: {}", e),
243/// }
244/// ```
245#[derive(Clone, Debug)]
246pub struct Signals(Arc<SignalsInner>);
247
248impl Signals {
249 /// Creates a new signal channel and installs OS-level signal handlers.
250 ///
251 /// Allocates a self-pipe, spawns a background dispatch thread named
252 /// `signal-msg`, and registers handlers for all supported signals. Call
253 /// [`subscribe`][Signals::subscribe] to obtain a [`Receiver`].
254 ///
255 /// # Examples
256 ///
257 /// ```no_run
258 /// let signals = signal_msg::Signals::new().expect("signal setup failed");
259 /// let receiver = signals.subscribe();
260 /// ```
261 ///
262 /// # Errors
263 ///
264 /// Returns [`SignalError::AlreadyInitialized`] if another `Signals` instance
265 /// is already active. Returns [`SignalError::OsError`] if an OS-level
266 /// operation fails (pipe creation, `fcntl`, or thread spawn).
267 pub fn new() -> Result<Self, SignalError> {
268 if INITIALIZED
269 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
270 .is_err()
271 {
272 return Err(SignalError::AlreadyInitialized);
273 }
274 Self::try_init().map_err(|e| {
275 INITIALIZED.store(false, Ordering::Relaxed);
276 e
277 })
278 }
279
280 fn try_init() -> Result<Self, SignalError> {
281 let mut fds = [0i32; 2];
282 // SAFETY: `fds` is a valid mutable pointer to a 2-element `c_int`
283 // array, satisfying the requirements of `pipe(2)`.
284 if unsafe { libc::pipe(fds.as_mut_ptr()) } != 0 {
285 return Err(SignalError::OsError(std::io::Error::last_os_error()));
286 }
287 let read_fd = fds[0];
288 let write_fd = fds[1];
289
290 // SAFETY: `write_fd` and `read_fd` are valid open file descriptors
291 // just created by `pipe(2)` above.
292 unsafe {
293 let fl = libc::fcntl(write_fd, libc::F_GETFL);
294 if fl == -1
295 || libc::fcntl(write_fd, libc::F_SETFL, fl | libc::O_NONBLOCK) == -1
296 || libc::fcntl(write_fd, libc::F_SETFD, libc::FD_CLOEXEC) == -1
297 || libc::fcntl(read_fd, libc::F_SETFD, libc::FD_CLOEXEC) == -1
298 {
299 let e = std::io::Error::last_os_error();
300 libc::close(write_fd);
301 libc::close(read_fd);
302 return Err(SignalError::OsError(e));
303 }
304 }
305
306 // Publish write_fd before the thread starts so pipe_handler can use it.
307 WRITE_FD.store(write_fd, Ordering::Relaxed);
308
309 let senders: Senders = Arc::new(Mutex::new(Vec::new()));
310 let thread_senders = Arc::clone(&senders);
311
312 // Background thread: reads signal bytes from the pipe and fans them out
313 // to all registered senders. Exits when the write end is closed (EOF).
314 std::thread::Builder::new()
315 .name("signal-msg".into())
316 .spawn(move || {
317 let mut buf = [0u8; 64];
318 loop {
319 // SAFETY: `read_fd` is a valid open file descriptor owned
320 // exclusively by this thread. `buf` is valid for 64 bytes.
321 let n = unsafe {
322 libc::read(read_fd, buf.as_mut_ptr().cast::<libc::c_void>(), 64)
323 };
324 if n < 0 {
325 if std::io::Error::last_os_error().kind()
326 == std::io::ErrorKind::Interrupted
327 {
328 continue; // EINTR — retry the read
329 }
330 break; // unrecoverable OS error
331 }
332 if n == 0 {
333 break; // EOF — write end was closed (Signals dropped)
334 }
335 #[allow(clippy::cast_sign_loss)]
336 let received = &buf[..n as usize];
337 let mut locked = thread_senders.lock().unwrap_or_else(|p| p.into_inner());
338 for &byte in received {
339 if let Some(sig) = Signal::from_raw(byte) {
340 locked.retain(|s| s.send(sig).is_ok());
341 }
342 }
343 }
344 // SAFETY: `read_fd` is exclusively owned by this thread and
345 // has not been closed by any other code.
346 unsafe { libc::close(read_fd) };
347 })
348 .map_err(|e| {
349 // Thread spawn failed; clean up fds before propagating the error.
350 // SAFETY: `write_fd` and `read_fd` are valid open file descriptors.
351 unsafe {
352 libc::close(write_fd);
353 libc::close(read_fd);
354 }
355 WRITE_FD.store(-1, Ordering::Relaxed);
356 SignalError::OsError(e)
357 })?;
358
359 // Install OS-level handlers for all supported signals. This is done
360 // after the thread is running so no signals can be lost.
361 for &signum in HANDLED_SIGNALS {
362 // SAFETY: `signum` is a valid catchable signal number from
363 // `HANDLED_SIGNALS` and `pipe_handler` is async-signal-safe.
364 unsafe { install_handler(signum) };
365 }
366
367 Ok(Signals(Arc::new(SignalsInner { write_fd, senders })))
368 }
369
370 /// Returns a new [`Receiver`] that will receive all subsequent signals.
371 ///
372 /// Multiple independent receivers can be created from the same `Signals`
373 /// handle; each receives its own copy of every delivered signal.
374 ///
375 /// # Examples
376 ///
377 /// ```no_run
378 /// use signal_msg::Signals;
379 ///
380 /// let signals = Signals::new().expect("signal setup failed");
381 /// let r1 = signals.subscribe();
382 /// let r2 = signals.subscribe();
383 /// // r1 and r2 each receive independent copies of every signal.
384 /// # let _ = (r1, r2);
385 /// ```
386 #[must_use]
387 pub fn subscribe(&self) -> Receiver {
388 let (tx, rx) = mpsc::channel();
389 self.0.senders.lock().unwrap_or_else(|p| p.into_inner()).push(tx);
390 Receiver(rx)
391 }
392}
393
394/// Receives UNIX signals forwarded through a [`Signals`] channel.
395///
396/// Obtained via [`Signals::subscribe`]. Use [`listen`][Receiver::listen] to
397/// block until a signal arrives, or [`try_listen`][Receiver::try_listen] to
398/// poll without blocking.
399#[derive(Debug)]
400pub struct Receiver(mpsc::Receiver<Signal>);
401
402impl Receiver {
403 /// Blocks until a signal is received and returns it.
404 ///
405 /// # Examples
406 ///
407 /// ```no_run
408 /// use signal_msg::Signals;
409 ///
410 /// let signals = Signals::new().expect("signal setup failed");
411 /// let receiver = signals.subscribe();
412 /// match receiver.listen() {
413 /// Ok(sig) => println!("received: {}", sig),
414 /// Err(e) => eprintln!("channel closed: {}", e),
415 /// }
416 /// ```
417 ///
418 /// # Errors
419 ///
420 /// Returns [`SignalError::Disconnected`] if the backing [`Signals`] handle
421 /// has been dropped and no further signals can arrive.
422 pub fn listen(&self) -> Result<Signal, SignalError> {
423 self.0.recv().map_err(|_| SignalError::Disconnected)
424 }
425
426 /// Returns the next signal if one is immediately available, or `None` if
427 /// the channel is currently empty.
428 ///
429 /// Unlike [`listen`][Receiver::listen], this method never blocks.
430 ///
431 /// # Examples
432 ///
433 /// ```no_run
434 /// use signal_msg::Signals;
435 ///
436 /// let signals = Signals::new().expect("signal setup failed");
437 /// let receiver = signals.subscribe();
438 /// match receiver.try_listen() {
439 /// Ok(Some(sig)) => println!("got signal: {}", sig),
440 /// Ok(None) => println!("no signal pending"),
441 /// Err(e) => eprintln!("channel closed: {}", e),
442 /// }
443 /// ```
444 ///
445 /// # Errors
446 ///
447 /// Returns [`SignalError::Disconnected`] if the backing [`Signals`] handle
448 /// has been dropped and no further signals can arrive.
449 pub fn try_listen(&self) -> Result<Option<Signal>, SignalError> {
450 match self.0.try_recv() {
451 Ok(sig) => Ok(Some(sig)),
452 Err(mpsc::TryRecvError::Empty) => Ok(None),
453 Err(mpsc::TryRecvError::Disconnected) => Err(SignalError::Disconnected),
454 }
455 }
456}