Skip to main content

signal_msg/
lib.rs

1#![cfg(unix)]
2//! Handle UNIX process signals with a shared channel.
3//!
4//! This crate provides a correct, ergonomic approach to UNIX signal handling.
5//! The self-pipe trick ensures that the actual signal handler only calls
6//! `write(2)` — the only async-signal-safe operation needed. A background
7//! thread named `signal-msg` performs all non-trivial work. Multiple
8//! independent subscribers are supported via [`Signals::subscribe`].
9//!
10//! Signal handlers are installed automatically when [`Signals::new`] returns,
11//! so it is impossible to forget to activate them.
12//!
13//! # Example
14//!
15//! ```no_run
16//! use signal_msg::Signals;
17//!
18//! let signals = Signals::new().expect("failed to create signal handler");
19//! for sig in signals.subscribe() {
20//!     println!("received: {}", sig);
21//!     if sig.is_terminating() { break; }
22//! }
23//! ```
24
25use std::os::unix::io::RawFd;
26use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
27use std::sync::{mpsc, Arc, Mutex};
28
29/// Global write end of the self-pipe. Set by [`Signals::new`], cleared on drop.
30/// Written to only by [`pipe_handler`], which is async-signal-safe.
31static WRITE_FD: AtomicI32 = AtomicI32::new(-1);
32
33/// Guards against creating multiple [`Signals`] instances simultaneously.
34static INITIALIZED: AtomicBool = AtomicBool::new(false);
35
36const HANDLED_SIGNALS: &[libc::c_int] = &[
37    libc::SIGHUP,
38    libc::SIGINT,
39    libc::SIGILL,
40    libc::SIGABRT,
41    libc::SIGFPE,
42    libc::SIGPIPE,
43    libc::SIGALRM,
44    libc::SIGTERM,
45    libc::SIGUSR1,
46    libc::SIGUSR2,
47    libc::SIGWINCH,
48    libc::SIGCONT,
49    libc::SIGURG,
50];
51
52/// OS-level signal handler. Must only call async-signal-safe functions.
53/// Writes the signal number as a single byte into the self-pipe.
54extern "C" fn pipe_handler(sig: libc::c_int) {
55    let fd = WRITE_FD.load(Ordering::Relaxed);
56    if fd >= 0 {
57        // Signal numbers fit in a u8 (POSIX signals are 1–31).
58        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
59        let byte = sig as u8;
60        // SAFETY: `fd` is a valid open file descriptor (the write end of the
61        // self-pipe, published by Signals::new before any signal handler is
62        // registered). `byte` is a pointer to a live stack variable of the
63        // correct size. `libc::write` is async-signal-safe per POSIX.2017 §2.4.3.
64        unsafe {
65            libc::write(fd, std::ptr::addr_of!(byte).cast::<libc::c_void>(), 1);
66        }
67    }
68}
69
70/// Installs `pipe_handler` for one signal number via `sigaction(2)`.
71///
72/// # Safety
73///
74/// `signum` must be a valid, catchable POSIX signal number. `pipe_handler` must
75/// remain a valid function pointer for the lifetime of the process (it is a
76/// `static extern "C" fn`, so this is always true). `pipe_handler` only calls
77/// `write(2)`, which is async-signal-safe per POSIX.2017 §2.4.3.
78unsafe fn install_handler(signum: libc::c_int) {
79    let mut sa: libc::sigaction = std::mem::zeroed();
80    sa.sa_sigaction = pipe_handler as *const () as libc::sighandler_t;
81    sa.sa_flags = libc::SA_RESTART;
82    libc::sigaction(signum, &sa, std::ptr::null_mut());
83}
84
85/// A UNIX signal that can be received through a [`Signals`] channel.
86///
87/// Signals that cannot be caught (`SIGKILL`), that generate core dumps by
88/// default (`SIGQUIT`), or that indicate unrecoverable program faults
89/// (`SIGSEGV`) are intentionally excluded.
90#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
91pub enum Signal {
92    /// `SIGHUP` — terminal hang-up or controlling process died.
93    Hup,
94    /// `SIGINT` — interactive interrupt (typically Ctrl-C).
95    Int,
96    /// `SIGILL` — illegal CPU instruction.
97    Ill,
98    /// `SIGABRT` — process abort.
99    Abrt,
100    /// `SIGFPE` — floating-point exception.
101    Fpe,
102    /// `SIGPIPE` — write to a broken pipe.
103    Pipe,
104    /// `SIGALRM` — alarm-clock timer expired.
105    Alrm,
106    /// `SIGTERM` — polite termination request.
107    Term,
108    /// `SIGUSR1` — user-defined signal 1.
109    Usr1,
110    /// `SIGUSR2` — user-defined signal 2.
111    Usr2,
112    /// `SIGWINCH` — terminal window-size change.
113    Winch,
114    /// `SIGCONT` — continue a stopped process.
115    Cont,
116    /// `SIGURG` — urgent data available on a socket.
117    Urg,
118}
119
120impl Signal {
121    /// Returns `true` if this signal's conventional action is to terminate
122    /// the process.
123    ///
124    /// The terminating signals are: [`Int`][Signal::Int], [`Term`][Signal::Term],
125    /// [`Ill`][Signal::Ill], [`Abrt`][Signal::Abrt], and [`Fpe`][Signal::Fpe].
126    ///
127    /// All other signals — [`Hup`][Signal::Hup], [`Pipe`][Signal::Pipe],
128    /// [`Alrm`][Signal::Alrm], [`Usr1`][Signal::Usr1], [`Usr2`][Signal::Usr2],
129    /// [`Winch`][Signal::Winch], [`Cont`][Signal::Cont], [`Urg`][Signal::Urg]
130    /// — are non-terminating and may be handled without exiting.
131    ///
132    /// # Examples
133    ///
134    /// ```
135    /// use signal_msg::Signal;
136    ///
137    /// assert!(Signal::Int.is_terminating());
138    /// assert!(Signal::Term.is_terminating());
139    /// assert!(!Signal::Usr1.is_terminating());
140    /// assert!(!Signal::Winch.is_terminating());
141    /// ```
142    pub fn is_terminating(&self) -> bool {
143        matches!(
144            self,
145            Signal::Int | Signal::Term | Signal::Ill | Signal::Abrt | Signal::Fpe
146        )
147    }
148
149    fn from_raw(n: u8) -> Option<Self> {
150        match libc::c_int::from(n) {
151            libc::SIGHUP => Some(Signal::Hup),
152            libc::SIGINT => Some(Signal::Int),
153            libc::SIGILL => Some(Signal::Ill),
154            libc::SIGABRT => Some(Signal::Abrt),
155            libc::SIGFPE => Some(Signal::Fpe),
156            libc::SIGPIPE => Some(Signal::Pipe),
157            libc::SIGALRM => Some(Signal::Alrm),
158            libc::SIGTERM => Some(Signal::Term),
159            libc::SIGUSR1 => Some(Signal::Usr1),
160            libc::SIGUSR2 => Some(Signal::Usr2),
161            libc::SIGWINCH => Some(Signal::Winch),
162            libc::SIGCONT => Some(Signal::Cont),
163            libc::SIGURG => Some(Signal::Urg),
164            _ => None,
165        }
166    }
167}
168
169impl std::fmt::Display for Signal {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        let name = match self {
172            Signal::Hup => "SIGHUP",
173            Signal::Int => "SIGINT",
174            Signal::Ill => "SIGILL",
175            Signal::Abrt => "SIGABRT",
176            Signal::Fpe => "SIGFPE",
177            Signal::Pipe => "SIGPIPE",
178            Signal::Alrm => "SIGALRM",
179            Signal::Term => "SIGTERM",
180            Signal::Usr1 => "SIGUSR1",
181            Signal::Usr2 => "SIGUSR2",
182            Signal::Winch => "SIGWINCH",
183            Signal::Cont => "SIGCONT",
184            Signal::Urg => "SIGURG",
185        };
186        f.write_str(name)
187    }
188}
189
190/// An error produced by signal channel operations.
191///
192/// `SignalError` implements `Clone` and `PartialEq`; for the
193/// [`OsError`][SignalError::OsError] variant only the
194/// [`io::ErrorKind`][std::io::ErrorKind] is compared and cloned, since
195/// [`io::Error`][std::io::Error] itself is neither `Clone` nor `PartialEq`.
196#[derive(Debug)]
197pub enum SignalError {
198    /// The channel is disconnected (the paired [`Signals`] handle was dropped).
199    Disconnected,
200    /// [`Signals::new`] was called while another `Signals` instance is active.
201    AlreadyInitialized,
202    /// An OS-level operation failed during signal channel setup.
203    OsError(std::io::Error),
204}
205
206impl Clone for SignalError {
207    fn clone(&self) -> Self {
208        match self {
209            Self::Disconnected => Self::Disconnected,
210            Self::AlreadyInitialized => Self::AlreadyInitialized,
211            // Preserve the error kind; the OS message is lost on clone.
212            Self::OsError(e) => Self::OsError(std::io::Error::from(e.kind())),
213        }
214    }
215}
216
217impl PartialEq for SignalError {
218    fn eq(&self, other: &Self) -> bool {
219        match (self, other) {
220            (Self::Disconnected, Self::Disconnected) => true,
221            (Self::AlreadyInitialized, Self::AlreadyInitialized) => true,
222            (Self::OsError(a), Self::OsError(b)) => a.kind() == b.kind(),
223            _ => false,
224        }
225    }
226}
227
228impl Eq for SignalError {}
229
230impl std::fmt::Display for SignalError {
231    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232        match self {
233            SignalError::Disconnected => f.write_str("signal channel disconnected"),
234            SignalError::AlreadyInitialized => f.write_str("a Signals instance is already active"),
235            SignalError::OsError(e) => write!(f, "OS error during signal channel setup: {e}"),
236        }
237    }
238}
239
240impl std::error::Error for SignalError {
241    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
242        match self {
243            SignalError::OsError(e) => Some(e),
244            _ => None,
245        }
246    }
247}
248
249type Senders = Arc<Mutex<Vec<mpsc::Sender<Signal>>>>;
250
251#[derive(Debug)]
252struct SignalsInner {
253    write_fd: RawFd,
254    senders: Senders,
255}
256
257impl Drop for SignalsInner {
258    fn drop(&mut self) {
259        // Closing write_fd causes the background thread's read() to return 0
260        // (EOF), signalling it to exit cleanly.
261        // SAFETY: `write_fd` is a valid open file descriptor owned exclusively
262        // by this struct. No other code closes it while SignalsInner is alive.
263        unsafe { libc::close(self.write_fd) };
264        WRITE_FD.store(-1, Ordering::Relaxed);
265        INITIALIZED.store(false, Ordering::Relaxed);
266    }
267}
268
269/// A handle for subscribing to OS signals delivered through a shared channel.
270///
271/// `Signals` is cheaply cloneable (backed by an [`Arc`]). Only one `Signals`
272/// instance may be active at a time per process; calling [`Signals::new`] while
273/// another instance exists returns [`SignalError::AlreadyInitialized`].
274///
275/// OS-level signal handlers are installed automatically by [`Signals::new`],
276/// so signals are delivered from the moment the value is returned. Call
277/// [`subscribe`][Signals::subscribe] to obtain a [`Receiver`].
278///
279/// Dropping the last clone releases all resources including the background
280/// thread.
281///
282/// # Example
283///
284/// ```no_run
285/// use signal_msg::Signals;
286///
287/// let signals = Signals::new().expect("failed to create signal handler");
288/// for sig in signals.subscribe() {
289///     println!("received: {}", sig);
290///     if sig.is_terminating() { break; }
291/// }
292/// ```
293#[derive(Clone, Debug)]
294pub struct Signals(Arc<SignalsInner>);
295
296impl Signals {
297    /// Creates a new signal channel and installs OS-level signal handlers.
298    ///
299    /// Allocates a self-pipe, spawns a background dispatch thread named
300    /// `signal-msg`, and registers handlers for all supported signals. Call
301    /// [`subscribe`][Signals::subscribe] to obtain a [`Receiver`].
302    ///
303    /// # Examples
304    ///
305    /// ```no_run
306    /// let signals = signal_msg::Signals::new().expect("signal setup failed");
307    /// let receiver = signals.subscribe();
308    /// ```
309    ///
310    /// # Errors
311    ///
312    /// Returns [`SignalError::AlreadyInitialized`] if another `Signals` instance
313    /// is already active. Returns [`SignalError::OsError`] if an OS-level
314    /// operation fails (pipe creation, `fcntl`, or thread spawn).
315    pub fn new() -> Result<Self, SignalError> {
316        if INITIALIZED
317            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
318            .is_err()
319        {
320            return Err(SignalError::AlreadyInitialized);
321        }
322        Self::try_init().map_err(|e| {
323            INITIALIZED.store(false, Ordering::Relaxed);
324            e
325        })
326    }
327
328    fn try_init() -> Result<Self, SignalError> {
329        let mut fds = [0i32; 2];
330        // SAFETY: `fds` is a valid mutable pointer to a 2-element `c_int`
331        // array, satisfying the requirements of `pipe(2)`.
332        if unsafe { libc::pipe(fds.as_mut_ptr()) } != 0 {
333            return Err(SignalError::OsError(std::io::Error::last_os_error()));
334        }
335        let read_fd = fds[0];
336        let write_fd = fds[1];
337
338        // SAFETY: `write_fd` and `read_fd` are valid open file descriptors
339        // just created by `pipe(2)` above.
340        unsafe {
341            let fl = libc::fcntl(write_fd, libc::F_GETFL);
342            if fl == -1
343                || libc::fcntl(write_fd, libc::F_SETFL, fl | libc::O_NONBLOCK) == -1
344                || libc::fcntl(write_fd, libc::F_SETFD, libc::FD_CLOEXEC) == -1
345                || libc::fcntl(read_fd, libc::F_SETFD, libc::FD_CLOEXEC) == -1
346            {
347                let e = std::io::Error::last_os_error();
348                libc::close(write_fd);
349                libc::close(read_fd);
350                return Err(SignalError::OsError(e));
351            }
352        }
353
354        // Publish write_fd before the thread starts so pipe_handler can use it.
355        WRITE_FD.store(write_fd, Ordering::Relaxed);
356
357        let senders: Senders = Arc::new(Mutex::new(Vec::new()));
358        let thread_senders = Arc::clone(&senders);
359
360        // Background thread: reads signal bytes from the pipe and fans them out
361        // to all registered senders. Exits when the write end is closed (EOF).
362        std::thread::Builder::new()
363            .name("signal-msg".into())
364            .spawn(move || {
365                let mut buf = [0u8; 64];
366                loop {
367                    // SAFETY: `read_fd` is a valid open file descriptor owned
368                    // exclusively by this thread. `buf` is valid for 64 bytes.
369                    let n =
370                        unsafe { libc::read(read_fd, buf.as_mut_ptr().cast::<libc::c_void>(), 64) };
371                    if n < 0 {
372                        if std::io::Error::last_os_error().kind() == std::io::ErrorKind::Interrupted
373                        {
374                            continue; // EINTR — retry the read
375                        }
376                        break; // unrecoverable OS error
377                    }
378                    if n == 0 {
379                        break; // EOF — write end was closed (Signals dropped)
380                    }
381                    #[allow(clippy::cast_sign_loss)]
382                    let received = &buf[..n as usize];
383                    let mut locked = thread_senders.lock().unwrap_or_else(|p| p.into_inner());
384                    for &byte in received {
385                        if let Some(sig) = Signal::from_raw(byte) {
386                            locked.retain(|s| s.send(sig).is_ok());
387                        }
388                    }
389                }
390                // SAFETY: `read_fd` is exclusively owned by this thread and
391                // has not been closed by any other code.
392                unsafe { libc::close(read_fd) };
393            })
394            .map_err(|e| {
395                // Thread spawn failed; clean up fds before propagating the error.
396                // SAFETY: `write_fd` and `read_fd` are valid open file descriptors.
397                unsafe {
398                    libc::close(write_fd);
399                    libc::close(read_fd);
400                }
401                WRITE_FD.store(-1, Ordering::Relaxed);
402                SignalError::OsError(e)
403            })?;
404
405        // Install OS-level handlers for all supported signals. This is done
406        // after the thread is running so no signals can be lost.
407        for &signum in HANDLED_SIGNALS {
408            // SAFETY: `signum` is a valid catchable signal number from
409            // `HANDLED_SIGNALS` and `pipe_handler` is async-signal-safe.
410            unsafe { install_handler(signum) };
411        }
412
413        Ok(Signals(Arc::new(SignalsInner { write_fd, senders })))
414    }
415
416    /// Returns a new [`Receiver`] that will receive all subsequent signals.
417    ///
418    /// Multiple independent receivers can be created from the same `Signals`
419    /// handle; each receives its own copy of every delivered signal.
420    ///
421    /// # Examples
422    ///
423    /// ```no_run
424    /// use signal_msg::Signals;
425    ///
426    /// let signals = Signals::new().expect("signal setup failed");
427    /// let r1 = signals.subscribe();
428    /// let r2 = signals.subscribe();
429    /// // r1 and r2 each receive independent copies of every signal.
430    /// # let _ = (r1, r2);
431    /// ```
432    #[must_use]
433    pub fn subscribe(&self) -> Receiver {
434        let (tx, rx) = mpsc::channel();
435        self.0
436            .senders
437            .lock()
438            .unwrap_or_else(|p| p.into_inner())
439            .push(tx);
440        Receiver(rx)
441    }
442}
443
444/// Receives UNIX signals forwarded through a [`Signals`] channel.
445///
446/// Obtained via [`Signals::subscribe`]. Use [`listen`][Receiver::listen] to
447/// block until a signal arrives, or [`try_listen`][Receiver::try_listen] to
448/// poll without blocking. `Receiver` also implements [`Iterator`], yielding
449/// each signal in turn until the backing [`Signals`] handle is dropped.
450#[derive(Debug)]
451pub struct Receiver(mpsc::Receiver<Signal>);
452
453impl Receiver {
454    /// Blocks until a signal is received and returns it.
455    ///
456    /// # Examples
457    ///
458    /// ```no_run
459    /// use signal_msg::Signals;
460    ///
461    /// let signals = Signals::new().expect("signal setup failed");
462    /// let receiver = signals.subscribe();
463    /// match receiver.listen() {
464    ///     Ok(sig) => println!("received: {}", sig),
465    ///     Err(e)  => eprintln!("channel closed: {}", e),
466    /// }
467    /// ```
468    ///
469    /// # Errors
470    ///
471    /// Returns [`SignalError::Disconnected`] if the backing [`Signals`] handle
472    /// has been dropped and no further signals can arrive.
473    pub fn listen(&self) -> Result<Signal, SignalError> {
474        self.0.recv().map_err(|_| SignalError::Disconnected)
475    }
476
477    /// Returns the next signal if one is immediately available, or `None` if
478    /// the channel is currently empty.
479    ///
480    /// Unlike [`listen`][Receiver::listen], this method never blocks.
481    ///
482    /// # Examples
483    ///
484    /// ```no_run
485    /// use signal_msg::Signals;
486    ///
487    /// let signals = Signals::new().expect("signal setup failed");
488    /// let receiver = signals.subscribe();
489    /// match receiver.try_listen() {
490    ///     Ok(Some(sig)) => println!("got signal: {}", sig),
491    ///     Ok(None)      => println!("no signal pending"),
492    ///     Err(e)        => eprintln!("channel closed: {}", e),
493    /// }
494    /// ```
495    ///
496    /// # Errors
497    ///
498    /// Returns [`SignalError::Disconnected`] if the backing [`Signals`] handle
499    /// has been dropped and no further signals can arrive.
500    pub fn try_listen(&self) -> Result<Option<Signal>, SignalError> {
501        match self.0.try_recv() {
502            Ok(sig) => Ok(Some(sig)),
503            Err(mpsc::TryRecvError::Empty) => Ok(None),
504            Err(mpsc::TryRecvError::Disconnected) => Err(SignalError::Disconnected),
505        }
506    }
507}
508
509/// Yields each arriving [`Signal`] in turn, blocking between deliveries.
510///
511/// The iterator ends naturally when the backing [`Signals`] handle is dropped.
512/// This enables idiomatic `for` loops and the full iterator combinator toolkit.
513///
514/// # Examples
515///
516/// ```no_run
517/// use signal_msg::Signals;
518///
519/// let signals = Signals::new().expect("signal setup failed");
520/// for sig in signals.subscribe() {
521///     println!("received: {}", sig);
522///     if sig.is_terminating() { break; }
523/// }
524/// ```
525impl Iterator for Receiver {
526    type Item = Signal;
527
528    fn next(&mut self) -> Option<Self::Item> {
529        self.0.recv().ok()
530    }
531}
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536
537    // --- Signal::is_terminating ---
538
539    #[test]
540    fn test_is_terminating_exit_signals() {
541        assert!(Signal::Int.is_terminating());
542        assert!(Signal::Term.is_terminating());
543        assert!(Signal::Ill.is_terminating());
544        assert!(Signal::Abrt.is_terminating());
545        assert!(Signal::Fpe.is_terminating());
546    }
547
548    #[test]
549    fn test_is_terminating_non_exit_signals() {
550        assert!(!Signal::Hup.is_terminating());
551        assert!(!Signal::Pipe.is_terminating());
552        assert!(!Signal::Alrm.is_terminating());
553        assert!(!Signal::Usr1.is_terminating());
554        assert!(!Signal::Usr2.is_terminating());
555        assert!(!Signal::Winch.is_terminating());
556        assert!(!Signal::Cont.is_terminating());
557        assert!(!Signal::Urg.is_terminating());
558    }
559
560    // --- Signal::Display ---
561
562    #[test]
563    fn test_display_new_signals() {
564        assert_eq!(Signal::Usr1.to_string(), "SIGUSR1");
565        assert_eq!(Signal::Usr2.to_string(), "SIGUSR2");
566        assert_eq!(Signal::Winch.to_string(), "SIGWINCH");
567        assert_eq!(Signal::Cont.to_string(), "SIGCONT");
568        assert_eq!(Signal::Urg.to_string(), "SIGURG");
569    }
570
571    #[test]
572    fn test_display_existing_signals() {
573        assert_eq!(Signal::Hup.to_string(), "SIGHUP");
574        assert_eq!(Signal::Int.to_string(), "SIGINT");
575        assert_eq!(Signal::Ill.to_string(), "SIGILL");
576        assert_eq!(Signal::Abrt.to_string(), "SIGABRT");
577        assert_eq!(Signal::Fpe.to_string(), "SIGFPE");
578        assert_eq!(Signal::Pipe.to_string(), "SIGPIPE");
579        assert_eq!(Signal::Alrm.to_string(), "SIGALRM");
580        assert_eq!(Signal::Term.to_string(), "SIGTERM");
581    }
582
583    // --- Signal::from_raw ---
584
585    #[test]
586    fn test_from_raw_new_signals() {
587        #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
588        {
589            assert_eq!(Signal::from_raw(libc::SIGUSR1 as u8), Some(Signal::Usr1));
590            assert_eq!(Signal::from_raw(libc::SIGUSR2 as u8), Some(Signal::Usr2));
591            assert_eq!(Signal::from_raw(libc::SIGWINCH as u8), Some(Signal::Winch));
592            assert_eq!(Signal::from_raw(libc::SIGCONT as u8), Some(Signal::Cont));
593            assert_eq!(Signal::from_raw(libc::SIGURG as u8), Some(Signal::Urg));
594        }
595    }
596
597    #[test]
598    fn test_from_raw_unknown_returns_none() {
599        assert_eq!(Signal::from_raw(0), None);
600        assert_eq!(Signal::from_raw(255), None);
601    }
602
603    // --- Iterator for Receiver ---
604
605    #[test]
606    fn test_receiver_iterator_yields_signals() {
607        let (tx, rx) = mpsc::channel();
608        let receiver = Receiver(rx);
609
610        tx.send(Signal::Usr1).unwrap();
611        tx.send(Signal::Winch).unwrap();
612        tx.send(Signal::Int).unwrap();
613        drop(tx);
614
615        let collected: Vec<Signal> = receiver.collect();
616        assert_eq!(collected, vec![Signal::Usr1, Signal::Winch, Signal::Int]);
617    }
618
619    #[test]
620    fn test_receiver_iterator_ends_on_disconnect() {
621        let (tx, rx) = mpsc::channel();
622        let mut receiver = Receiver(rx);
623
624        tx.send(Signal::Cont).unwrap();
625        drop(tx);
626
627        assert_eq!(receiver.next(), Some(Signal::Cont));
628        assert_eq!(receiver.next(), None);
629    }
630
631    #[test]
632    fn test_receiver_iterator_with_take_while() {
633        let (tx, rx) = mpsc::channel();
634        let receiver = Receiver(rx);
635
636        tx.send(Signal::Usr1).unwrap();
637        tx.send(Signal::Winch).unwrap();
638        tx.send(Signal::Int).unwrap(); // terminating — take_while stops before this
639        tx.send(Signal::Usr2).unwrap();
640        drop(tx);
641
642        let non_term: Vec<Signal> = receiver.take_while(|s| !s.is_terminating()).collect();
643        assert_eq!(non_term, vec![Signal::Usr1, Signal::Winch]);
644    }
645}