Skip to main content

signal_msg/
lib.rs

1#![cfg(unix)]
2//! Handle UNIX process signals with a shared channel.
3//!
4//! This crate provides a correct, ergonomic approach to UNIX signal handling.
5//! The self-pipe trick ensures that the actual signal handler only calls
6//! `write(2)` — the only async-signal-safe operation needed. A background
7//! thread named `signal-msg` performs all non-trivial work. Multiple
8//! independent subscribers are supported via [`Signals::subscribe`].
9//!
10//! Signal handlers are installed automatically when [`Signals::new`] returns,
11//! so it is impossible to forget to activate them.
12//!
13//! # Example
14//!
15//! ```no_run
16//! use signal_msg::Signals;
17//!
18//! let signals = Signals::new().expect("failed to create signal handler");
19//! let receiver = signals.subscribe();
20//! println!("Waiting for a signal...");
21//! match receiver.listen() {
22//!     Ok(sig) => println!("received: {}", sig),
23//!     Err(e)  => eprintln!("channel error: {}", e),
24//! }
25//! ```
26
27use std::os::unix::io::RawFd;
28use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
29use std::sync::{mpsc, Arc, Mutex};
30
31/// Global write end of the self-pipe. Set by [`Signals::new`], cleared on drop.
32/// Written to only by [`pipe_handler`], which is async-signal-safe.
33static WRITE_FD: AtomicI32 = AtomicI32::new(-1);
34
35/// Guards against creating multiple [`Signals`] instances simultaneously.
36static INITIALIZED: AtomicBool = AtomicBool::new(false);
37
38const HANDLED_SIGNALS: &[libc::c_int] = &[
39    libc::SIGHUP,
40    libc::SIGINT,
41    libc::SIGILL,
42    libc::SIGABRT,
43    libc::SIGFPE,
44    libc::SIGPIPE,
45    libc::SIGALRM,
46    libc::SIGTERM,
47];
48
49/// OS-level signal handler. Must only call async-signal-safe functions.
50/// Writes the signal number as a single byte into the self-pipe.
51extern "C" fn pipe_handler(sig: libc::c_int) {
52    let fd = WRITE_FD.load(Ordering::Relaxed);
53    if fd >= 0 {
54        // Signal numbers fit in a u8 (POSIX signals are 1–31).
55        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
56        let byte = sig as u8;
57        // SAFETY: `fd` is a valid open file descriptor (the write end of the
58        // self-pipe, published by Signals::new before any signal handler is
59        // registered). `byte` is a pointer to a live stack variable of the
60        // correct size. `libc::write` is async-signal-safe per POSIX.2017 §2.4.3.
61        unsafe {
62            libc::write(fd, std::ptr::addr_of!(byte).cast::<libc::c_void>(), 1);
63        }
64    }
65}
66
67/// Installs `pipe_handler` for one signal number via `sigaction(2)`.
68///
69/// # Safety
70///
71/// `signum` must be a valid, catchable POSIX signal number. `pipe_handler` must
72/// remain a valid function pointer for the lifetime of the process (it is a
73/// `static extern "C" fn`, so this is always true). `pipe_handler` only calls
74/// `write(2)`, which is async-signal-safe per POSIX.2017 §2.4.3.
75unsafe fn install_handler(signum: libc::c_int) {
76    let mut sa: libc::sigaction = std::mem::zeroed();
77    sa.sa_sigaction = pipe_handler as *const () as libc::sighandler_t;
78    sa.sa_flags = libc::SA_RESTART;
79    libc::sigaction(signum, &sa, std::ptr::null_mut());
80}
81
82/// A UNIX signal that can be received through a [`Signals`] channel.
83///
84/// Signals that cannot be caught (`SIGKILL`), that generate core dumps by
85/// default (`SIGQUIT`), or that indicate unrecoverable program faults
86/// (`SIGSEGV`) are intentionally excluded.
87#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
88pub enum Signal {
89    /// `SIGHUP` — terminal hang-up or controlling process died.
90    Hup,
91    /// `SIGINT` — interactive interrupt (typically Ctrl-C).
92    Int,
93    /// `SIGILL` — illegal CPU instruction.
94    Ill,
95    /// `SIGABRT` — process abort.
96    Abrt,
97    /// `SIGFPE` — floating-point exception.
98    Fpe,
99    /// `SIGPIPE` — write to a broken pipe.
100    Pipe,
101    /// `SIGALRM` — alarm-clock timer expired.
102    Alrm,
103    /// `SIGTERM` — polite termination request.
104    Term,
105}
106
107impl Signal {
108    fn from_raw(n: u8) -> Option<Self> {
109        match libc::c_int::from(n) {
110            libc::SIGHUP => Some(Signal::Hup),
111            libc::SIGINT => Some(Signal::Int),
112            libc::SIGILL => Some(Signal::Ill),
113            libc::SIGABRT => Some(Signal::Abrt),
114            libc::SIGFPE => Some(Signal::Fpe),
115            libc::SIGPIPE => Some(Signal::Pipe),
116            libc::SIGALRM => Some(Signal::Alrm),
117            libc::SIGTERM => Some(Signal::Term),
118            _ => None,
119        }
120    }
121}
122
123impl std::fmt::Display for Signal {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        let name = match self {
126            Signal::Hup => "SIGHUP",
127            Signal::Int => "SIGINT",
128            Signal::Ill => "SIGILL",
129            Signal::Abrt => "SIGABRT",
130            Signal::Fpe => "SIGFPE",
131            Signal::Pipe => "SIGPIPE",
132            Signal::Alrm => "SIGALRM",
133            Signal::Term => "SIGTERM",
134        };
135        f.write_str(name)
136    }
137}
138
139/// An error produced by signal channel operations.
140///
141/// `SignalError` implements `Clone` and `PartialEq`; for the
142/// [`OsError`][SignalError::OsError] variant only the
143/// [`io::ErrorKind`][std::io::ErrorKind] is compared and cloned, since
144/// [`io::Error`][std::io::Error] itself is neither `Clone` nor `PartialEq`.
145#[derive(Debug)]
146pub enum SignalError {
147    /// The channel is disconnected (the paired [`Signals`] handle was dropped).
148    Disconnected,
149    /// [`Signals::new`] was called while another `Signals` instance is active.
150    AlreadyInitialized,
151    /// An OS-level operation failed during signal channel setup.
152    OsError(std::io::Error),
153}
154
155impl Clone for SignalError {
156    fn clone(&self) -> Self {
157        match self {
158            Self::Disconnected => Self::Disconnected,
159            Self::AlreadyInitialized => Self::AlreadyInitialized,
160            // Preserve the error kind; the OS message is lost on clone.
161            Self::OsError(e) => Self::OsError(std::io::Error::from(e.kind())),
162        }
163    }
164}
165
166impl PartialEq for SignalError {
167    fn eq(&self, other: &Self) -> bool {
168        match (self, other) {
169            (Self::Disconnected, Self::Disconnected) => true,
170            (Self::AlreadyInitialized, Self::AlreadyInitialized) => true,
171            (Self::OsError(a), Self::OsError(b)) => a.kind() == b.kind(),
172            _ => false,
173        }
174    }
175}
176
177impl Eq for SignalError {}
178
179impl std::fmt::Display for SignalError {
180    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181        match self {
182            SignalError::Disconnected => f.write_str("signal channel disconnected"),
183            SignalError::AlreadyInitialized => {
184                f.write_str("a Signals instance is already active")
185            }
186            SignalError::OsError(e) => write!(f, "OS error during signal channel setup: {e}"),
187        }
188    }
189}
190
191impl std::error::Error for SignalError {
192    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
193        match self {
194            SignalError::OsError(e) => Some(e),
195            _ => None,
196        }
197    }
198}
199
200type Senders = Arc<Mutex<Vec<mpsc::Sender<Signal>>>>;
201
202#[derive(Debug)]
203struct SignalsInner {
204    write_fd: RawFd,
205    senders: Senders,
206}
207
208impl Drop for SignalsInner {
209    fn drop(&mut self) {
210        // Closing write_fd causes the background thread's read() to return 0
211        // (EOF), signalling it to exit cleanly.
212        // SAFETY: `write_fd` is a valid open file descriptor owned exclusively
213        // by this struct. No other code closes it while SignalsInner is alive.
214        unsafe { libc::close(self.write_fd) };
215        WRITE_FD.store(-1, Ordering::Relaxed);
216        INITIALIZED.store(false, Ordering::Relaxed);
217    }
218}
219
220/// A handle for subscribing to OS signals delivered through a shared channel.
221///
222/// `Signals` is cheaply cloneable (backed by an [`Arc`]). Only one `Signals`
223/// instance may be active at a time per process; calling [`Signals::new`] while
224/// another instance exists returns [`SignalError::AlreadyInitialized`].
225///
226/// OS-level signal handlers are installed automatically by [`Signals::new`],
227/// so signals are delivered from the moment the value is returned. Call
228/// [`subscribe`][Signals::subscribe] to obtain a [`Receiver`].
229///
230/// Dropping the last clone releases all resources including the background
231/// thread.
232///
233/// # Example
234///
235/// ```no_run
236/// use signal_msg::Signals;
237///
238/// let signals = Signals::new().expect("failed to create signal handler");
239/// let receiver = signals.subscribe();
240/// match receiver.listen() {
241///     Ok(sig) => println!("received: {}", sig),
242///     Err(e)  => eprintln!("error: {}", e),
243/// }
244/// ```
245#[derive(Clone, Debug)]
246pub struct Signals(Arc<SignalsInner>);
247
248impl Signals {
249    /// Creates a new signal channel and installs OS-level signal handlers.
250    ///
251    /// Allocates a self-pipe, spawns a background dispatch thread named
252    /// `signal-msg`, and registers handlers for all supported signals. Call
253    /// [`subscribe`][Signals::subscribe] to obtain a [`Receiver`].
254    ///
255    /// # Examples
256    ///
257    /// ```no_run
258    /// let signals = signal_msg::Signals::new().expect("signal setup failed");
259    /// let receiver = signals.subscribe();
260    /// ```
261    ///
262    /// # Errors
263    ///
264    /// Returns [`SignalError::AlreadyInitialized`] if another `Signals` instance
265    /// is already active. Returns [`SignalError::OsError`] if an OS-level
266    /// operation fails (pipe creation, `fcntl`, or thread spawn).
267    pub fn new() -> Result<Self, SignalError> {
268        if INITIALIZED
269            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
270            .is_err()
271        {
272            return Err(SignalError::AlreadyInitialized);
273        }
274        Self::try_init().map_err(|e| {
275            INITIALIZED.store(false, Ordering::Relaxed);
276            e
277        })
278    }
279
280    fn try_init() -> Result<Self, SignalError> {
281        let mut fds = [0i32; 2];
282        // SAFETY: `fds` is a valid mutable pointer to a 2-element `c_int`
283        // array, satisfying the requirements of `pipe(2)`.
284        if unsafe { libc::pipe(fds.as_mut_ptr()) } != 0 {
285            return Err(SignalError::OsError(std::io::Error::last_os_error()));
286        }
287        let read_fd = fds[0];
288        let write_fd = fds[1];
289
290        // SAFETY: `write_fd` and `read_fd` are valid open file descriptors
291        // just created by `pipe(2)` above.
292        unsafe {
293            let fl = libc::fcntl(write_fd, libc::F_GETFL);
294            if fl == -1
295                || libc::fcntl(write_fd, libc::F_SETFL, fl | libc::O_NONBLOCK) == -1
296                || libc::fcntl(write_fd, libc::F_SETFD, libc::FD_CLOEXEC) == -1
297                || libc::fcntl(read_fd, libc::F_SETFD, libc::FD_CLOEXEC) == -1
298            {
299                let e = std::io::Error::last_os_error();
300                libc::close(write_fd);
301                libc::close(read_fd);
302                return Err(SignalError::OsError(e));
303            }
304        }
305
306        // Publish write_fd before the thread starts so pipe_handler can use it.
307        WRITE_FD.store(write_fd, Ordering::Relaxed);
308
309        let senders: Senders = Arc::new(Mutex::new(Vec::new()));
310        let thread_senders = Arc::clone(&senders);
311
312        // Background thread: reads signal bytes from the pipe and fans them out
313        // to all registered senders. Exits when the write end is closed (EOF).
314        std::thread::Builder::new()
315            .name("signal-msg".into())
316            .spawn(move || {
317                let mut buf = [0u8; 64];
318                loop {
319                    // SAFETY: `read_fd` is a valid open file descriptor owned
320                    // exclusively by this thread. `buf` is valid for 64 bytes.
321                    let n = unsafe {
322                        libc::read(read_fd, buf.as_mut_ptr().cast::<libc::c_void>(), 64)
323                    };
324                    if n < 0 {
325                        if std::io::Error::last_os_error().kind()
326                            == std::io::ErrorKind::Interrupted
327                        {
328                            continue; // EINTR — retry the read
329                        }
330                        break; // unrecoverable OS error
331                    }
332                    if n == 0 {
333                        break; // EOF — write end was closed (Signals dropped)
334                    }
335                    #[allow(clippy::cast_sign_loss)]
336                    let received = &buf[..n as usize];
337                    let mut locked = thread_senders.lock().unwrap_or_else(|p| p.into_inner());
338                    for &byte in received {
339                        if let Some(sig) = Signal::from_raw(byte) {
340                            locked.retain(|s| s.send(sig).is_ok());
341                        }
342                    }
343                }
344                // SAFETY: `read_fd` is exclusively owned by this thread and
345                // has not been closed by any other code.
346                unsafe { libc::close(read_fd) };
347            })
348            .map_err(|e| {
349                // Thread spawn failed; clean up fds before propagating the error.
350                // SAFETY: `write_fd` and `read_fd` are valid open file descriptors.
351                unsafe {
352                    libc::close(write_fd);
353                    libc::close(read_fd);
354                }
355                WRITE_FD.store(-1, Ordering::Relaxed);
356                SignalError::OsError(e)
357            })?;
358
359        // Install OS-level handlers for all supported signals. This is done
360        // after the thread is running so no signals can be lost.
361        for &signum in HANDLED_SIGNALS {
362            // SAFETY: `signum` is a valid catchable signal number from
363            // `HANDLED_SIGNALS` and `pipe_handler` is async-signal-safe.
364            unsafe { install_handler(signum) };
365        }
366
367        Ok(Signals(Arc::new(SignalsInner { write_fd, senders })))
368    }
369
370    /// Returns a new [`Receiver`] that will receive all subsequent signals.
371    ///
372    /// Multiple independent receivers can be created from the same `Signals`
373    /// handle; each receives its own copy of every delivered signal.
374    ///
375    /// # Examples
376    ///
377    /// ```no_run
378    /// use signal_msg::Signals;
379    ///
380    /// let signals = Signals::new().expect("signal setup failed");
381    /// let r1 = signals.subscribe();
382    /// let r2 = signals.subscribe();
383    /// // r1 and r2 each receive independent copies of every signal.
384    /// # let _ = (r1, r2);
385    /// ```
386    #[must_use]
387    pub fn subscribe(&self) -> Receiver {
388        let (tx, rx) = mpsc::channel();
389        self.0.senders.lock().unwrap_or_else(|p| p.into_inner()).push(tx);
390        Receiver(rx)
391    }
392}
393
394/// Receives UNIX signals forwarded through a [`Signals`] channel.
395///
396/// Obtained via [`Signals::subscribe`]. Use [`listen`][Receiver::listen] to
397/// block until a signal arrives, or [`try_listen`][Receiver::try_listen] to
398/// poll without blocking.
399#[derive(Debug)]
400pub struct Receiver(mpsc::Receiver<Signal>);
401
402impl Receiver {
403    /// Blocks until a signal is received and returns it.
404    ///
405    /// # Examples
406    ///
407    /// ```no_run
408    /// use signal_msg::Signals;
409    ///
410    /// let signals = Signals::new().expect("signal setup failed");
411    /// let receiver = signals.subscribe();
412    /// match receiver.listen() {
413    ///     Ok(sig) => println!("received: {}", sig),
414    ///     Err(e)  => eprintln!("channel closed: {}", e),
415    /// }
416    /// ```
417    ///
418    /// # Errors
419    ///
420    /// Returns [`SignalError::Disconnected`] if the backing [`Signals`] handle
421    /// has been dropped and no further signals can arrive.
422    pub fn listen(&self) -> Result<Signal, SignalError> {
423        self.0.recv().map_err(|_| SignalError::Disconnected)
424    }
425
426    /// Returns the next signal if one is immediately available, or `None` if
427    /// the channel is currently empty.
428    ///
429    /// Unlike [`listen`][Receiver::listen], this method never blocks.
430    ///
431    /// # Examples
432    ///
433    /// ```no_run
434    /// use signal_msg::Signals;
435    ///
436    /// let signals = Signals::new().expect("signal setup failed");
437    /// let receiver = signals.subscribe();
438    /// match receiver.try_listen() {
439    ///     Ok(Some(sig)) => println!("got signal: {}", sig),
440    ///     Ok(None)      => println!("no signal pending"),
441    ///     Err(e)        => eprintln!("channel closed: {}", e),
442    /// }
443    /// ```
444    ///
445    /// # Errors
446    ///
447    /// Returns [`SignalError::Disconnected`] if the backing [`Signals`] handle
448    /// has been dropped and no further signals can arrive.
449    pub fn try_listen(&self) -> Result<Option<Signal>, SignalError> {
450        match self.0.try_recv() {
451            Ok(sig) => Ok(Some(sig)),
452            Err(mpsc::TryRecvError::Empty) => Ok(None),
453            Err(mpsc::TryRecvError::Disconnected) => Err(SignalError::Disconnected),
454        }
455    }
456}