signal_msg/lib.rs
1#![cfg(unix)]
2//! Handle UNIX process signals with a shared channel.
3//!
4//! This crate provides a correct, ergonomic approach to UNIX signal handling.
5//! The self-pipe trick ensures that the actual signal handler only calls
6//! `write(2)` — the only async-signal-safe operation needed. A background
7//! thread named `signal-msg` performs all non-trivial work. Multiple
8//! independent subscribers are supported via [`Signals::subscribe`].
9//!
10//! Signal handlers are installed automatically when [`Signals::new`] returns,
11//! so it is impossible to forget to activate them.
12//!
13//! # Example
14//!
15//! ```no_run
16//! use signal_msg::Signals;
17//!
18//! let signals = Signals::new().expect("failed to create signal handler");
19//! for sig in signals.subscribe() {
20//! println!("received: {}", sig);
21//! if sig.is_terminating() { break; }
22//! }
23//! ```
24
25use std::os::unix::io::RawFd;
26use std::sync::atomic::{AtomicBool, AtomicI32, Ordering};
27use std::sync::{mpsc, Arc, Mutex};
28
29/// Global write end of the self-pipe. Set by [`Signals::new`], cleared on drop.
30/// Written to only by [`pipe_handler`], which is async-signal-safe.
31static WRITE_FD: AtomicI32 = AtomicI32::new(-1);
32
33/// Guards against creating multiple [`Signals`] instances simultaneously.
34static INITIALIZED: AtomicBool = AtomicBool::new(false);
35
36const HANDLED_SIGNALS: &[libc::c_int] = &[
37 libc::SIGHUP,
38 libc::SIGINT,
39 libc::SIGILL,
40 libc::SIGABRT,
41 libc::SIGFPE,
42 libc::SIGPIPE,
43 libc::SIGALRM,
44 libc::SIGTERM,
45 libc::SIGUSR1,
46 libc::SIGUSR2,
47 libc::SIGWINCH,
48 libc::SIGCONT,
49 libc::SIGURG,
50];
51
52/// OS-level signal handler. Must only call async-signal-safe functions.
53/// Writes the signal number as a single byte into the self-pipe.
54extern "C" fn pipe_handler(sig: libc::c_int) {
55 let fd = WRITE_FD.load(Ordering::Relaxed);
56 if fd >= 0 {
57 // Signal numbers fit in a u8 (POSIX signals are 1–31).
58 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
59 let byte = sig as u8;
60 // SAFETY: `fd` is a valid open file descriptor (the write end of the
61 // self-pipe, published by Signals::new before any signal handler is
62 // registered). `byte` is a pointer to a live stack variable of the
63 // correct size. `libc::write` is async-signal-safe per POSIX.2017 §2.4.3.
64 unsafe {
65 libc::write(fd, std::ptr::addr_of!(byte).cast::<libc::c_void>(), 1);
66 }
67 }
68}
69
70/// Installs `pipe_handler` for one signal number via `sigaction(2)`.
71///
72/// # Safety
73///
74/// `signum` must be a valid, catchable POSIX signal number. `pipe_handler` must
75/// remain a valid function pointer for the lifetime of the process (it is a
76/// `static extern "C" fn`, so this is always true). `pipe_handler` only calls
77/// `write(2)`, which is async-signal-safe per POSIX.2017 §2.4.3.
78unsafe fn install_handler(signum: libc::c_int) {
79 let mut sa: libc::sigaction = std::mem::zeroed();
80 sa.sa_sigaction = pipe_handler as *const () as libc::sighandler_t;
81 sa.sa_flags = libc::SA_RESTART;
82 libc::sigaction(signum, &sa, std::ptr::null_mut());
83}
84
85/// A UNIX signal that can be received through a [`Signals`] channel.
86///
87/// Signals that cannot be caught (`SIGKILL`), that generate core dumps by
88/// default (`SIGQUIT`), or that indicate unrecoverable program faults
89/// (`SIGSEGV`) are intentionally excluded.
90#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
91pub enum Signal {
92 /// `SIGHUP` — terminal hang-up or controlling process died.
93 Hup,
94 /// `SIGINT` — interactive interrupt (typically Ctrl-C).
95 Int,
96 /// `SIGILL` — illegal CPU instruction.
97 Ill,
98 /// `SIGABRT` — process abort.
99 Abrt,
100 /// `SIGFPE` — floating-point exception.
101 Fpe,
102 /// `SIGPIPE` — write to a broken pipe.
103 Pipe,
104 /// `SIGALRM` — alarm-clock timer expired.
105 Alrm,
106 /// `SIGTERM` — polite termination request.
107 Term,
108 /// `SIGUSR1` — user-defined signal 1.
109 Usr1,
110 /// `SIGUSR2` — user-defined signal 2.
111 Usr2,
112 /// `SIGWINCH` — terminal window-size change.
113 Winch,
114 /// `SIGCONT` — continue a stopped process.
115 Cont,
116 /// `SIGURG` — urgent data available on a socket.
117 Urg,
118}
119
120impl Signal {
121 /// Returns `true` if this signal's conventional action is to terminate
122 /// the process.
123 ///
124 /// The terminating signals are: [`Int`][Signal::Int], [`Term`][Signal::Term],
125 /// [`Ill`][Signal::Ill], [`Abrt`][Signal::Abrt], and [`Fpe`][Signal::Fpe].
126 ///
127 /// All other signals — [`Hup`][Signal::Hup], [`Pipe`][Signal::Pipe],
128 /// [`Alrm`][Signal::Alrm], [`Usr1`][Signal::Usr1], [`Usr2`][Signal::Usr2],
129 /// [`Winch`][Signal::Winch], [`Cont`][Signal::Cont], [`Urg`][Signal::Urg]
130 /// — are non-terminating and may be handled without exiting.
131 ///
132 /// # Examples
133 ///
134 /// ```
135 /// use signal_msg::Signal;
136 ///
137 /// assert!(Signal::Int.is_terminating());
138 /// assert!(Signal::Term.is_terminating());
139 /// assert!(!Signal::Usr1.is_terminating());
140 /// assert!(!Signal::Winch.is_terminating());
141 /// ```
142 pub fn is_terminating(&self) -> bool {
143 matches!(
144 self,
145 Signal::Int | Signal::Term | Signal::Ill | Signal::Abrt | Signal::Fpe
146 )
147 }
148
149 fn from_raw(n: u8) -> Option<Self> {
150 match libc::c_int::from(n) {
151 libc::SIGHUP => Some(Signal::Hup),
152 libc::SIGINT => Some(Signal::Int),
153 libc::SIGILL => Some(Signal::Ill),
154 libc::SIGABRT => Some(Signal::Abrt),
155 libc::SIGFPE => Some(Signal::Fpe),
156 libc::SIGPIPE => Some(Signal::Pipe),
157 libc::SIGALRM => Some(Signal::Alrm),
158 libc::SIGTERM => Some(Signal::Term),
159 libc::SIGUSR1 => Some(Signal::Usr1),
160 libc::SIGUSR2 => Some(Signal::Usr2),
161 libc::SIGWINCH => Some(Signal::Winch),
162 libc::SIGCONT => Some(Signal::Cont),
163 libc::SIGURG => Some(Signal::Urg),
164 _ => None,
165 }
166 }
167}
168
169impl std::fmt::Display for Signal {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 let name = match self {
172 Signal::Hup => "SIGHUP",
173 Signal::Int => "SIGINT",
174 Signal::Ill => "SIGILL",
175 Signal::Abrt => "SIGABRT",
176 Signal::Fpe => "SIGFPE",
177 Signal::Pipe => "SIGPIPE",
178 Signal::Alrm => "SIGALRM",
179 Signal::Term => "SIGTERM",
180 Signal::Usr1 => "SIGUSR1",
181 Signal::Usr2 => "SIGUSR2",
182 Signal::Winch => "SIGWINCH",
183 Signal::Cont => "SIGCONT",
184 Signal::Urg => "SIGURG",
185 };
186 f.write_str(name)
187 }
188}
189
190/// An error produced by signal channel operations.
191///
192/// `SignalError` implements `Clone` and `PartialEq`; for the
193/// [`OsError`][SignalError::OsError] variant only the
194/// [`io::ErrorKind`][std::io::ErrorKind] is compared and cloned, since
195/// [`io::Error`][std::io::Error] itself is neither `Clone` nor `PartialEq`.
196#[derive(Debug)]
197pub enum SignalError {
198 /// The channel is disconnected (the paired [`Signals`] handle was dropped).
199 Disconnected,
200 /// [`Signals::new`] was called while another `Signals` instance is active.
201 AlreadyInitialized,
202 /// An OS-level operation failed during signal channel setup.
203 OsError(std::io::Error),
204}
205
206impl Clone for SignalError {
207 fn clone(&self) -> Self {
208 match self {
209 Self::Disconnected => Self::Disconnected,
210 Self::AlreadyInitialized => Self::AlreadyInitialized,
211 // Preserve the error kind; the OS message is lost on clone.
212 Self::OsError(e) => Self::OsError(std::io::Error::from(e.kind())),
213 }
214 }
215}
216
217impl PartialEq for SignalError {
218 fn eq(&self, other: &Self) -> bool {
219 match (self, other) {
220 (Self::Disconnected, Self::Disconnected) => true,
221 (Self::AlreadyInitialized, Self::AlreadyInitialized) => true,
222 (Self::OsError(a), Self::OsError(b)) => a.kind() == b.kind(),
223 _ => false,
224 }
225 }
226}
227
228impl Eq for SignalError {}
229
230impl std::fmt::Display for SignalError {
231 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232 match self {
233 SignalError::Disconnected => f.write_str("signal channel disconnected"),
234 SignalError::AlreadyInitialized => f.write_str("a Signals instance is already active"),
235 SignalError::OsError(e) => write!(f, "OS error during signal channel setup: {e}"),
236 }
237 }
238}
239
240impl std::error::Error for SignalError {
241 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
242 match self {
243 SignalError::OsError(e) => Some(e),
244 _ => None,
245 }
246 }
247}
248
249type Senders = Arc<Mutex<Vec<mpsc::Sender<Signal>>>>;
250
251#[derive(Debug)]
252struct SignalsInner {
253 write_fd: RawFd,
254 senders: Senders,
255}
256
257impl Drop for SignalsInner {
258 fn drop(&mut self) {
259 // Closing write_fd causes the background thread's read() to return 0
260 // (EOF), signalling it to exit cleanly.
261 // SAFETY: `write_fd` is a valid open file descriptor owned exclusively
262 // by this struct. No other code closes it while SignalsInner is alive.
263 unsafe { libc::close(self.write_fd) };
264 WRITE_FD.store(-1, Ordering::Relaxed);
265 INITIALIZED.store(false, Ordering::Relaxed);
266 }
267}
268
269/// A handle for subscribing to OS signals delivered through a shared channel.
270///
271/// `Signals` is cheaply cloneable (backed by an [`Arc`]). Only one `Signals`
272/// instance may be active at a time per process; calling [`Signals::new`] while
273/// another instance exists returns [`SignalError::AlreadyInitialized`].
274///
275/// OS-level signal handlers are installed automatically by [`Signals::new`],
276/// so signals are delivered from the moment the value is returned. Call
277/// [`subscribe`][Signals::subscribe] to obtain a [`Receiver`].
278///
279/// Dropping the last clone releases all resources including the background
280/// thread.
281///
282/// # Example
283///
284/// ```no_run
285/// use signal_msg::Signals;
286///
287/// let signals = Signals::new().expect("failed to create signal handler");
288/// for sig in signals.subscribe() {
289/// println!("received: {}", sig);
290/// if sig.is_terminating() { break; }
291/// }
292/// ```
293#[derive(Clone, Debug)]
294pub struct Signals(Arc<SignalsInner>);
295
296impl Signals {
297 /// Creates a new signal channel and installs OS-level signal handlers.
298 ///
299 /// Allocates a self-pipe, spawns a background dispatch thread named
300 /// `signal-msg`, and registers handlers for all supported signals. Call
301 /// [`subscribe`][Signals::subscribe] to obtain a [`Receiver`].
302 ///
303 /// # Examples
304 ///
305 /// ```no_run
306 /// let signals = signal_msg::Signals::new().expect("signal setup failed");
307 /// let receiver = signals.subscribe();
308 /// ```
309 ///
310 /// # Errors
311 ///
312 /// Returns [`SignalError::AlreadyInitialized`] if another `Signals` instance
313 /// is already active. Returns [`SignalError::OsError`] if an OS-level
314 /// operation fails (pipe creation, `fcntl`, or thread spawn).
315 pub fn new() -> Result<Self, SignalError> {
316 if INITIALIZED
317 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
318 .is_err()
319 {
320 return Err(SignalError::AlreadyInitialized);
321 }
322 Self::try_init().map_err(|e| {
323 INITIALIZED.store(false, Ordering::Relaxed);
324 e
325 })
326 }
327
328 fn try_init() -> Result<Self, SignalError> {
329 let mut fds = [0i32; 2];
330 // SAFETY: `fds` is a valid mutable pointer to a 2-element `c_int`
331 // array, satisfying the requirements of `pipe(2)`.
332 if unsafe { libc::pipe(fds.as_mut_ptr()) } != 0 {
333 return Err(SignalError::OsError(std::io::Error::last_os_error()));
334 }
335 let read_fd = fds[0];
336 let write_fd = fds[1];
337
338 // SAFETY: `write_fd` and `read_fd` are valid open file descriptors
339 // just created by `pipe(2)` above.
340 unsafe {
341 let fl = libc::fcntl(write_fd, libc::F_GETFL);
342 if fl == -1
343 || libc::fcntl(write_fd, libc::F_SETFL, fl | libc::O_NONBLOCK) == -1
344 || libc::fcntl(write_fd, libc::F_SETFD, libc::FD_CLOEXEC) == -1
345 || libc::fcntl(read_fd, libc::F_SETFD, libc::FD_CLOEXEC) == -1
346 {
347 let e = std::io::Error::last_os_error();
348 libc::close(write_fd);
349 libc::close(read_fd);
350 return Err(SignalError::OsError(e));
351 }
352 }
353
354 // Publish write_fd before the thread starts so pipe_handler can use it.
355 WRITE_FD.store(write_fd, Ordering::Relaxed);
356
357 let senders: Senders = Arc::new(Mutex::new(Vec::new()));
358 let thread_senders = Arc::clone(&senders);
359
360 // Background thread: reads signal bytes from the pipe and fans them out
361 // to all registered senders. Exits when the write end is closed (EOF).
362 std::thread::Builder::new()
363 .name("signal-msg".into())
364 .spawn(move || {
365 let mut buf = [0u8; 64];
366 loop {
367 // SAFETY: `read_fd` is a valid open file descriptor owned
368 // exclusively by this thread. `buf` is valid for 64 bytes.
369 let n =
370 unsafe { libc::read(read_fd, buf.as_mut_ptr().cast::<libc::c_void>(), 64) };
371 if n < 0 {
372 if std::io::Error::last_os_error().kind() == std::io::ErrorKind::Interrupted
373 {
374 continue; // EINTR — retry the read
375 }
376 break; // unrecoverable OS error
377 }
378 if n == 0 {
379 break; // EOF — write end was closed (Signals dropped)
380 }
381 #[allow(clippy::cast_sign_loss)]
382 let received = &buf[..n as usize];
383 let mut locked = thread_senders.lock().unwrap_or_else(|p| p.into_inner());
384 for &byte in received {
385 if let Some(sig) = Signal::from_raw(byte) {
386 locked.retain(|s| s.send(sig).is_ok());
387 }
388 }
389 }
390 // SAFETY: `read_fd` is exclusively owned by this thread and
391 // has not been closed by any other code.
392 unsafe { libc::close(read_fd) };
393 })
394 .map_err(|e| {
395 // Thread spawn failed; clean up fds before propagating the error.
396 // SAFETY: `write_fd` and `read_fd` are valid open file descriptors.
397 unsafe {
398 libc::close(write_fd);
399 libc::close(read_fd);
400 }
401 WRITE_FD.store(-1, Ordering::Relaxed);
402 SignalError::OsError(e)
403 })?;
404
405 // Install OS-level handlers for all supported signals. This is done
406 // after the thread is running so no signals can be lost.
407 for &signum in HANDLED_SIGNALS {
408 // SAFETY: `signum` is a valid catchable signal number from
409 // `HANDLED_SIGNALS` and `pipe_handler` is async-signal-safe.
410 unsafe { install_handler(signum) };
411 }
412
413 Ok(Signals(Arc::new(SignalsInner { write_fd, senders })))
414 }
415
416 /// Returns a new [`Receiver`] that will receive all subsequent signals.
417 ///
418 /// Multiple independent receivers can be created from the same `Signals`
419 /// handle; each receives its own copy of every delivered signal.
420 ///
421 /// # Examples
422 ///
423 /// ```no_run
424 /// use signal_msg::Signals;
425 ///
426 /// let signals = Signals::new().expect("signal setup failed");
427 /// let r1 = signals.subscribe();
428 /// let r2 = signals.subscribe();
429 /// // r1 and r2 each receive independent copies of every signal.
430 /// # let _ = (r1, r2);
431 /// ```
432 #[must_use]
433 pub fn subscribe(&self) -> Receiver {
434 let (tx, rx) = mpsc::channel();
435 self.0
436 .senders
437 .lock()
438 .unwrap_or_else(|p| p.into_inner())
439 .push(tx);
440 Receiver(rx)
441 }
442}
443
444/// Receives UNIX signals forwarded through a [`Signals`] channel.
445///
446/// Obtained via [`Signals::subscribe`]. Use [`listen`][Receiver::listen] to
447/// block until a signal arrives, or [`try_listen`][Receiver::try_listen] to
448/// poll without blocking. `Receiver` also implements [`Iterator`], yielding
449/// each signal in turn until the backing [`Signals`] handle is dropped.
450#[derive(Debug)]
451pub struct Receiver(mpsc::Receiver<Signal>);
452
453impl Receiver {
454 /// Blocks until a signal is received and returns it.
455 ///
456 /// # Examples
457 ///
458 /// ```no_run
459 /// use signal_msg::Signals;
460 ///
461 /// let signals = Signals::new().expect("signal setup failed");
462 /// let receiver = signals.subscribe();
463 /// match receiver.listen() {
464 /// Ok(sig) => println!("received: {}", sig),
465 /// Err(e) => eprintln!("channel closed: {}", e),
466 /// }
467 /// ```
468 ///
469 /// # Errors
470 ///
471 /// Returns [`SignalError::Disconnected`] if the backing [`Signals`] handle
472 /// has been dropped and no further signals can arrive.
473 pub fn listen(&self) -> Result<Signal, SignalError> {
474 self.0.recv().map_err(|_| SignalError::Disconnected)
475 }
476
477 /// Returns the next signal if one is immediately available, or `None` if
478 /// the channel is currently empty.
479 ///
480 /// Unlike [`listen`][Receiver::listen], this method never blocks.
481 ///
482 /// # Examples
483 ///
484 /// ```no_run
485 /// use signal_msg::Signals;
486 ///
487 /// let signals = Signals::new().expect("signal setup failed");
488 /// let receiver = signals.subscribe();
489 /// match receiver.try_listen() {
490 /// Ok(Some(sig)) => println!("got signal: {}", sig),
491 /// Ok(None) => println!("no signal pending"),
492 /// Err(e) => eprintln!("channel closed: {}", e),
493 /// }
494 /// ```
495 ///
496 /// # Errors
497 ///
498 /// Returns [`SignalError::Disconnected`] if the backing [`Signals`] handle
499 /// has been dropped and no further signals can arrive.
500 pub fn try_listen(&self) -> Result<Option<Signal>, SignalError> {
501 match self.0.try_recv() {
502 Ok(sig) => Ok(Some(sig)),
503 Err(mpsc::TryRecvError::Empty) => Ok(None),
504 Err(mpsc::TryRecvError::Disconnected) => Err(SignalError::Disconnected),
505 }
506 }
507}
508
509/// Yields each arriving [`Signal`] in turn, blocking between deliveries.
510///
511/// The iterator ends naturally when the backing [`Signals`] handle is dropped.
512/// This enables idiomatic `for` loops and the full iterator combinator toolkit.
513///
514/// # Examples
515///
516/// ```no_run
517/// use signal_msg::Signals;
518///
519/// let signals = Signals::new().expect("signal setup failed");
520/// for sig in signals.subscribe() {
521/// println!("received: {}", sig);
522/// if sig.is_terminating() { break; }
523/// }
524/// ```
525impl Iterator for Receiver {
526 type Item = Signal;
527
528 fn next(&mut self) -> Option<Self::Item> {
529 self.0.recv().ok()
530 }
531}
532
533#[cfg(test)]
534mod tests {
535 use super::*;
536
537 // --- Signal::is_terminating ---
538
539 #[test]
540 fn test_is_terminating_exit_signals() {
541 assert!(Signal::Int.is_terminating());
542 assert!(Signal::Term.is_terminating());
543 assert!(Signal::Ill.is_terminating());
544 assert!(Signal::Abrt.is_terminating());
545 assert!(Signal::Fpe.is_terminating());
546 }
547
548 #[test]
549 fn test_is_terminating_non_exit_signals() {
550 assert!(!Signal::Hup.is_terminating());
551 assert!(!Signal::Pipe.is_terminating());
552 assert!(!Signal::Alrm.is_terminating());
553 assert!(!Signal::Usr1.is_terminating());
554 assert!(!Signal::Usr2.is_terminating());
555 assert!(!Signal::Winch.is_terminating());
556 assert!(!Signal::Cont.is_terminating());
557 assert!(!Signal::Urg.is_terminating());
558 }
559
560 // --- Signal::Display ---
561
562 #[test]
563 fn test_display_new_signals() {
564 assert_eq!(Signal::Usr1.to_string(), "SIGUSR1");
565 assert_eq!(Signal::Usr2.to_string(), "SIGUSR2");
566 assert_eq!(Signal::Winch.to_string(), "SIGWINCH");
567 assert_eq!(Signal::Cont.to_string(), "SIGCONT");
568 assert_eq!(Signal::Urg.to_string(), "SIGURG");
569 }
570
571 #[test]
572 fn test_display_existing_signals() {
573 assert_eq!(Signal::Hup.to_string(), "SIGHUP");
574 assert_eq!(Signal::Int.to_string(), "SIGINT");
575 assert_eq!(Signal::Ill.to_string(), "SIGILL");
576 assert_eq!(Signal::Abrt.to_string(), "SIGABRT");
577 assert_eq!(Signal::Fpe.to_string(), "SIGFPE");
578 assert_eq!(Signal::Pipe.to_string(), "SIGPIPE");
579 assert_eq!(Signal::Alrm.to_string(), "SIGALRM");
580 assert_eq!(Signal::Term.to_string(), "SIGTERM");
581 }
582
583 // --- Signal::from_raw ---
584
585 #[test]
586 fn test_from_raw_new_signals() {
587 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
588 {
589 assert_eq!(Signal::from_raw(libc::SIGUSR1 as u8), Some(Signal::Usr1));
590 assert_eq!(Signal::from_raw(libc::SIGUSR2 as u8), Some(Signal::Usr2));
591 assert_eq!(Signal::from_raw(libc::SIGWINCH as u8), Some(Signal::Winch));
592 assert_eq!(Signal::from_raw(libc::SIGCONT as u8), Some(Signal::Cont));
593 assert_eq!(Signal::from_raw(libc::SIGURG as u8), Some(Signal::Urg));
594 }
595 }
596
597 #[test]
598 fn test_from_raw_unknown_returns_none() {
599 assert_eq!(Signal::from_raw(0), None);
600 assert_eq!(Signal::from_raw(255), None);
601 }
602
603 // --- Iterator for Receiver ---
604
605 #[test]
606 fn test_receiver_iterator_yields_signals() {
607 let (tx, rx) = mpsc::channel();
608 let receiver = Receiver(rx);
609
610 tx.send(Signal::Usr1).unwrap();
611 tx.send(Signal::Winch).unwrap();
612 tx.send(Signal::Int).unwrap();
613 drop(tx);
614
615 let collected: Vec<Signal> = receiver.collect();
616 assert_eq!(collected, vec![Signal::Usr1, Signal::Winch, Signal::Int]);
617 }
618
619 #[test]
620 fn test_receiver_iterator_ends_on_disconnect() {
621 let (tx, rx) = mpsc::channel();
622 let mut receiver = Receiver(rx);
623
624 tx.send(Signal::Cont).unwrap();
625 drop(tx);
626
627 assert_eq!(receiver.next(), Some(Signal::Cont));
628 assert_eq!(receiver.next(), None);
629 }
630
631 #[test]
632 fn test_receiver_iterator_with_take_while() {
633 let (tx, rx) = mpsc::channel();
634 let receiver = Receiver(rx);
635
636 tx.send(Signal::Usr1).unwrap();
637 tx.send(Signal::Winch).unwrap();
638 tx.send(Signal::Int).unwrap(); // terminating — take_while stops before this
639 tx.send(Signal::Usr2).unwrap();
640 drop(tx);
641
642 let non_term: Vec<Signal> = receiver.take_while(|s| !s.is_terminating()).collect();
643 assert_eq!(non_term, vec![Signal::Usr1, Signal::Winch]);
644 }
645}