Skip to main content

tun_rs/platform/unix/
interrupt.rs

1/*!
2# Interruptible I/O Module
3
4This module provides support for interruptible I/O operations on Unix platforms.
5
6## Overview
7
8Interruptible I/O allows you to cancel blocking read/write operations on a TUN/TAP device
9from another thread. This is useful for graceful shutdown, implementing timeouts, or
10responding to signals.
11
12## Availability
13
14This module is only available when the `interruptible` feature flag is enabled:
15
16```toml
17[dependencies]
18tun-rs = { version = "2", features = ["interruptible"] }
19```
20
21## How It Works
22
23The implementation uses a pipe-based signaling mechanism:
24- An `InterruptEvent` creates a pipe internally
25- When triggered, it writes to the pipe
26- I/O operations use `poll()` (or `select()` on macOS) to wait on both the device fd and the pipe
27- If the pipe becomes readable, the I/O operation returns with `ErrorKind::Interrupted`
28
29## Usage
30
31```no_run
32# #[cfg(all(unix, feature = "interruptible"))]
33# {
34use tun_rs::{DeviceBuilder, InterruptEvent};
35use std::sync::Arc;
36use std::thread;
37
38let dev = DeviceBuilder::new()
39    .ipv4("10.0.0.1", 24, None)
40    .build_sync()?;
41
42let event = Arc::new(InterruptEvent::new()?);
43let event_clone = event.clone();
44
45// Spawn a thread that will read from the device
46let handle = thread::spawn(move || {
47    let mut buf = vec![0u8; 1500];
48    match dev.recv_intr(&mut buf, &event_clone) {
49        Ok(n) => println!("Received {} bytes", n),
50        Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
51            println!("Read was interrupted");
52        }
53        Err(e) => eprintln!("Error: {}", e),
54    }
55});
56
57// From the main thread, trigger the interrupt
58thread::sleep(std::time::Duration::from_secs(1));
59event.trigger()?;
60
61handle.join().unwrap();
62# }
63# Ok::<(), std::io::Error>(())
64```
65
66## Performance Considerations
67
68- Interruptible I/O has slightly more overhead than regular I/O due to the additional poll() fd
69- The pipe is created once and reused across all operations
70- Non-blocking mode is set on the pipe fds to prevent deadlocks
71
72## Platform Support
73
74- **Linux**: Uses `poll()` with two file descriptors
75- **macOS**: Uses `select()` with fd_set
76- **FreeBSD/OpenBSD/NetBSD**: Uses `poll()` like Linux
77- **Windows**: Not supported (would need IOCP or overlapped I/O)
78
79## Thread Safety
80
81`InterruptEvent` is thread-safe and can be shared across threads using `Arc`.
82*/
83
84use crate::platform::unix::Fd;
85use std::io;
86use std::io::{IoSlice, IoSliceMut};
87use std::os::fd::AsRawFd;
88use std::sync::Mutex;
89
90impl Fd {
91    pub(crate) fn read_interruptible(
92        &self,
93        buf: &mut [u8],
94        event: &InterruptEvent,
95        timeout: Option<std::time::Duration>,
96    ) -> io::Result<usize> {
97        loop {
98            self.wait_readable_interruptible(event, timeout)?;
99            return match self.read(buf) {
100                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
101                    continue;
102                }
103                rs => rs,
104            };
105        }
106    }
107    pub(crate) fn readv_interruptible(
108        &self,
109        bufs: &mut [IoSliceMut<'_>],
110        event: &InterruptEvent,
111        timeout: Option<std::time::Duration>,
112    ) -> io::Result<usize> {
113        loop {
114            self.wait_readable_interruptible(event, timeout)?;
115            return match self.readv(bufs) {
116                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
117                    continue;
118                }
119
120                rs => rs,
121            };
122        }
123    }
124    pub(crate) fn write_interruptible(
125        &self,
126        buf: &[u8],
127        event: &InterruptEvent,
128    ) -> io::Result<usize> {
129        loop {
130            self.wait_writable_interruptible(event)?;
131            return match self.write(buf) {
132                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
133                    continue;
134                }
135                rs => rs,
136            };
137        }
138    }
139    pub fn writev_interruptible(
140        &self,
141        bufs: &[IoSlice<'_>],
142        event: &InterruptEvent,
143    ) -> io::Result<usize> {
144        loop {
145            self.wait_writable_interruptible(event)?;
146            return match self.writev(bufs) {
147                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
148                    continue;
149                }
150                rs => rs,
151            };
152        }
153    }
154    pub fn wait_readable_interruptible(
155        &self,
156        interrupted_event: &InterruptEvent,
157        timeout: Option<std::time::Duration>,
158    ) -> io::Result<()> {
159        let fd = self.as_raw_fd() as libc::c_int;
160        let event_fd = interrupted_event.as_event_fd();
161
162        let mut fds = [
163            libc::pollfd {
164                fd,
165                events: libc::POLLIN,
166                revents: 0,
167            },
168            libc::pollfd {
169                fd: event_fd,
170                events: libc::POLLIN,
171                revents: 0,
172            },
173        ];
174
175        let result = unsafe {
176            libc::poll(
177                fds.as_mut_ptr(),
178                fds.len() as libc::nfds_t,
179                timeout
180                    .map(|t| t.as_millis().min(i32::MAX as _) as _)
181                    .unwrap_or(-1),
182            )
183        };
184
185        if result == -1 {
186            return Err(io::Error::last_os_error());
187        }
188        if result == 0 {
189            return Err(io::Error::from(io::ErrorKind::TimedOut));
190        }
191        if fds[0].revents & libc::POLLIN != 0 {
192            return Ok(());
193        }
194
195        if fds[1].revents & libc::POLLIN != 0 {
196            return Err(io::Error::new(
197                io::ErrorKind::Interrupted,
198                "trigger interrupt",
199            ));
200        }
201
202        Err(io::Error::other("fd error"))
203    }
204    pub fn wait_writable_interruptible(
205        &self,
206        interrupted_event: &InterruptEvent,
207    ) -> io::Result<()> {
208        let fd = self.as_raw_fd() as libc::c_int;
209        let event_fd = interrupted_event.as_event_fd();
210
211        let mut fds = [
212            libc::pollfd {
213                fd,
214                events: libc::POLLOUT,
215                revents: 0,
216            },
217            libc::pollfd {
218                fd: event_fd,
219                events: libc::POLLIN,
220                revents: 0,
221            },
222        ];
223
224        let result = unsafe { libc::poll(fds.as_mut_ptr(), fds.len() as libc::nfds_t, -1) };
225
226        if result == -1 {
227            return Err(io::Error::last_os_error());
228        }
229        if fds[0].revents & libc::POLLOUT != 0 {
230            return Ok(());
231        }
232
233        if fds[1].revents & libc::POLLIN != 0 {
234            return Err(io::Error::new(
235                io::ErrorKind::Interrupted,
236                "trigger interrupt",
237            ));
238        }
239
240        Err(io::Error::other("fd error"))
241    }
242}
243
244#[cfg(target_os = "macos")]
245impl Fd {
246    pub fn wait_writable(
247        &self,
248        interrupt_event: Option<&InterruptEvent>,
249        timeout: Option<std::time::Duration>,
250    ) -> io::Result<()> {
251        let readfds: libc::fd_set = unsafe { std::mem::zeroed() };
252        let mut writefds: libc::fd_set = unsafe { std::mem::zeroed() };
253        let fd = self.as_raw_fd();
254        unsafe {
255            libc::FD_SET(fd, &mut writefds);
256        }
257        self.wait(readfds, Some(writefds), interrupt_event, timeout)
258    }
259    pub fn wait_readable(
260        &self,
261        interrupt_event: Option<&InterruptEvent>,
262        timeout: Option<std::time::Duration>,
263    ) -> io::Result<()> {
264        let mut readfds: libc::fd_set = unsafe { std::mem::zeroed() };
265        let fd = self.as_raw_fd();
266        unsafe {
267            libc::FD_SET(fd, &mut readfds);
268        }
269        self.wait(readfds, None, interrupt_event, timeout)
270    }
271    fn wait(
272        &self,
273        mut readfds: libc::fd_set,
274        mut writefds: Option<libc::fd_set>,
275        interrupt_event: Option<&InterruptEvent>,
276        timeout: Option<std::time::Duration>,
277    ) -> io::Result<()> {
278        let fd = self.as_raw_fd();
279        let mut errorfds: libc::fd_set = unsafe { std::mem::zeroed() };
280        let mut nfds = fd;
281
282        if let Some(interrupt_event) = interrupt_event {
283            unsafe {
284                libc::FD_SET(interrupt_event.as_event_fd(), &mut readfds);
285            }
286            nfds = nfds.max(interrupt_event.as_event_fd());
287        }
288        let mut tv = libc::timeval {
289            tv_sec: 0,
290            tv_usec: 0,
291        };
292        let tv_ptr = if let Some(timeout) = timeout {
293            let secs = timeout.as_secs().min(libc::time_t::MAX as u64) as libc::time_t;
294            let usecs = (timeout.subsec_micros()) as libc::suseconds_t;
295            tv.tv_sec = secs;
296            tv.tv_usec = usecs;
297            &mut tv as *mut libc::timeval
298        } else {
299            std::ptr::null_mut()
300        };
301
302        let result = unsafe {
303            libc::select(
304                nfds + 1,
305                &mut readfds,
306                writefds
307                    .as_mut()
308                    .map(|p| p as *mut _)
309                    .unwrap_or_else(std::ptr::null_mut),
310                &mut errorfds,
311                tv_ptr,
312            )
313        };
314        if result < 0 {
315            return Err(io::Error::last_os_error());
316        }
317        if result == 0 {
318            return Err(io::Error::from(io::ErrorKind::TimedOut));
319        }
320        unsafe {
321            if let Some(cancel_event) = interrupt_event {
322                if libc::FD_ISSET(cancel_event.as_event_fd(), &readfds) {
323                    return Err(io::Error::new(
324                        io::ErrorKind::Interrupted,
325                        "trigger interrupt",
326                    ));
327                }
328            }
329        }
330        Ok(())
331    }
332}
333/// Event object for interrupting blocking I/O operations.
334///
335/// `InterruptEvent` provides a mechanism to cancel blocking read/write operations
336/// from another thread. It uses a pipe-based signaling mechanism internally.
337///
338/// # Thread Safety
339///
340/// This type is thread-safe and can be shared across threads using `Arc<InterruptEvent>`.
341///
342/// # Examples
343///
344/// Basic usage with interruptible read:
345///
346/// ```no_run
347/// # #[cfg(all(unix, feature = "interruptible"))]
348/// # {
349/// use std::sync::Arc;
350/// use std::thread;
351/// use std::time::Duration;
352/// use tun_rs::{DeviceBuilder, InterruptEvent};
353///
354/// let device = DeviceBuilder::new()
355///     .ipv4("10.0.0.1", 24, None)
356///     .build_sync()?;
357///
358/// let event = Arc::new(InterruptEvent::new()?);
359/// let event_clone = event.clone();
360///
361/// let reader = thread::spawn(move || {
362///     let mut buf = vec![0u8; 1500];
363///     device.recv_intr(&mut buf, &event_clone)
364/// });
365///
366/// // Give the reader time to start blocking
367/// thread::sleep(Duration::from_millis(100));
368///
369/// // Trigger the interrupt
370/// event.trigger()?;
371///
372/// match reader.join().unwrap() {
373///     Ok(n) => println!("Read {} bytes", n),
374///     Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
375///         println!("Successfully interrupted!");
376///     }
377///     Err(e) => eprintln!("Error: {}", e),
378/// }
379/// # }
380/// # Ok::<(), std::io::Error>(())
381/// ```
382///
383/// Using with a timeout:
384///
385/// ```no_run
386/// # #[cfg(all(unix, feature = "interruptible"))]
387/// # {
388/// use std::time::Duration;
389/// use tun_rs::{DeviceBuilder, InterruptEvent};
390///
391/// let device = DeviceBuilder::new()
392///     .ipv4("10.0.0.1", 24, None)
393///     .build_sync()?;
394///
395/// let event = InterruptEvent::new()?;
396/// let mut buf = vec![0u8; 1500];
397///
398/// // Will return an error if no data arrives within 5 seconds
399/// match device.recv_intr_timeout(&mut buf, &event, Some(Duration::from_secs(5))) {
400///     Ok(n) => println!("Read {} bytes", n),
401///     Err(e) if e.kind() == std::io::ErrorKind::TimedOut => {
402///         println!("Timed out waiting for data");
403///     }
404///     Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
405///         println!("Operation was interrupted");
406///     }
407///     Err(e) => eprintln!("Error: {}", e),
408/// }
409/// # }
410/// # Ok::<(), std::io::Error>(())
411/// ```
412///
413/// # Implementation Details
414///
415/// - Uses a Unix pipe for signaling
416/// - Both read and write ends are set to non-blocking mode
417/// - State is protected by a mutex to prevent race conditions
418/// - Once triggered, the event remains triggered until reset
419pub struct InterruptEvent {
420    state: Mutex<i32>,
421    read_fd: Fd,
422    write_fd: Fd,
423}
424impl InterruptEvent {
425    /// Creates a new `InterruptEvent`.
426    ///
427    /// This allocates a Unix pipe for signaling and sets both ends to non-blocking mode.
428    ///
429    /// # Errors
430    ///
431    /// Returns an error if:
432    /// - The pipe creation fails (e.g., out of file descriptors)
433    /// - Setting non-blocking mode fails
434    ///
435    /// # Example
436    ///
437    /// ```no_run
438    /// # #[cfg(all(unix, feature = "interruptible"))]
439    /// # {
440    /// use tun_rs::InterruptEvent;
441    ///
442    /// let event = InterruptEvent::new()?;
443    /// # }
444    /// # Ok::<(), std::io::Error>(())
445    /// ```
446    pub fn new() -> io::Result<Self> {
447        let mut fds: [libc::c_int; 2] = [0; 2];
448
449        unsafe {
450            if libc::pipe(fds.as_mut_ptr()) == -1 {
451                return Err(io::Error::last_os_error());
452            }
453            let read_fd = Fd::new_unchecked(fds[0]);
454            let write_fd = Fd::new_unchecked(fds[1]);
455            write_fd.set_nonblocking(true)?;
456            read_fd.set_nonblocking(true)?;
457            Ok(Self {
458                state: Mutex::new(0),
459                read_fd,
460                write_fd,
461            })
462        }
463    }
464
465    /// Triggers the interrupt event with value 1.
466    ///
467    /// This will cause any blocking I/O operations waiting on this event to return
468    /// with `ErrorKind::Interrupted`.
469    ///
470    /// Calling `trigger()` multiple times before `reset()` has no additional effect -
471    /// the event remains triggered.
472    ///
473    /// # Errors
474    ///
475    /// Returns an error if writing to the internal pipe fails, though this is rare
476    /// in practice (pipe write errors are usually `WouldBlock`, which is handled).
477    ///
478    /// # Example
479    ///
480    /// ```no_run
481    /// # #[cfg(all(unix, feature = "interruptible"))]
482    /// # {
483    /// use std::sync::Arc;
484    /// use std::thread;
485    /// use tun_rs::{DeviceBuilder, InterruptEvent};
486    ///
487    /// let event = Arc::new(InterruptEvent::new()?);
488    /// let event_clone = event.clone();
489    ///
490    /// thread::spawn(move || {
491    ///     // ... blocking I/O with event_clone ...
492    /// });
493    ///
494    /// // Interrupt the operation
495    /// event.trigger()?;
496    /// # }
497    /// # Ok::<(), std::io::Error>(())
498    /// ```
499    pub fn trigger(&self) -> io::Result<()> {
500        self.trigger_value(1)
501    }
502
503    /// Triggers the interrupt event with a specific value.
504    ///
505    /// Similar to [`trigger()`](Self::trigger), but allows specifying a custom value
506    /// that can be retrieved with [`value()`](Self::value).
507    ///
508    /// # Arguments
509    ///
510    /// * `val` - The value to store (must be non-zero)
511    ///
512    /// # Errors
513    ///
514    /// - Returns `ErrorKind::InvalidInput` if `val` is 0
515    /// - Returns an error if writing to the pipe fails
516    ///
517    /// # Example
518    ///
519    /// ```no_run
520    /// # #[cfg(all(unix, feature = "interruptible"))]
521    /// # {
522    /// use tun_rs::InterruptEvent;
523    ///
524    /// let event = InterruptEvent::new()?;
525    ///
526    /// // Trigger with a custom value (e.g., signal number)
527    /// event.trigger_value(15)?; // SIGTERM
528    ///
529    /// // Later, check what value was used
530    /// assert_eq!(event.value(), 15);
531    /// # }
532    /// # Ok::<(), std::io::Error>(())
533    /// ```
534    pub fn trigger_value(&self, val: i32) -> io::Result<()> {
535        if val == 0 {
536            return Err(io::Error::new(
537                io::ErrorKind::InvalidInput,
538                "value cannot be 0",
539            ));
540        }
541        let mut guard = self.state.lock().unwrap();
542        if *guard != 0 {
543            return Ok(());
544        }
545        *guard = val;
546        let buf: [u8; 8] = 1u64.to_ne_bytes();
547        let res = unsafe {
548            libc::write(
549                self.write_fd.as_raw_fd(),
550                buf.as_ptr() as *const _,
551                buf.len(),
552            )
553        };
554        if res == -1 {
555            let e = io::Error::last_os_error();
556            if e.kind() == io::ErrorKind::WouldBlock {
557                return Ok(());
558            }
559            Err(e)
560        } else {
561            Ok(())
562        }
563    }
564    /// Checks if the event has been triggered.
565    ///
566    /// Returns `true` if the event is currently in the triggered state (value is non-zero).
567    ///
568    /// # Example
569    ///
570    /// ```no_run
571    /// # #[cfg(all(unix, feature = "interruptible"))]
572    /// # {
573    /// use tun_rs::InterruptEvent;
574    ///
575    /// let event = InterruptEvent::new()?;
576    /// assert!(!event.is_trigger());
577    ///
578    /// event.trigger()?;
579    /// assert!(event.is_trigger());
580    ///
581    /// event.reset()?;
582    /// assert!(!event.is_trigger());
583    /// # }
584    /// # Ok::<(), std::io::Error>(())
585    /// ```
586    pub fn is_trigger(&self) -> bool {
587        *self.state.lock().unwrap() != 0
588    }
589
590    /// Returns the current trigger value.
591    ///
592    /// Returns 0 if the event is not triggered, or the value passed to
593    /// [`trigger_value()`](Self::trigger_value) if triggered.
594    ///
595    /// # Example
596    ///
597    /// ```no_run
598    /// # #[cfg(all(unix, feature = "interruptible"))]
599    /// # {
600    /// use tun_rs::InterruptEvent;
601    ///
602    /// let event = InterruptEvent::new()?;
603    /// assert_eq!(event.value(), 0);
604    ///
605    /// event.trigger_value(42)?;
606    /// assert_eq!(event.value(), 42);
607    /// # }
608    /// # Ok::<(), std::io::Error>(())
609    /// ```
610    pub fn value(&self) -> i32 {
611        *self.state.lock().unwrap()
612    }
613
614    /// Resets the event to the non-triggered state.
615    ///
616    /// This drains any pending data from the internal pipe and sets the state back to 0.
617    /// After calling `reset()`, the event can be triggered again.
618    ///
619    /// # Errors
620    ///
621    /// Returns an error if reading from the pipe fails (other than `WouldBlock`).
622    ///
623    /// # Example
624    ///
625    /// ```no_run
626    /// # #[cfg(all(unix, feature = "interruptible"))]
627    /// # {
628    /// use tun_rs::InterruptEvent;
629    ///
630    /// let event = InterruptEvent::new()?;
631    ///
632    /// event.trigger()?;
633    /// assert!(event.is_trigger());
634    ///
635    /// event.reset()?;
636    /// assert!(!event.is_trigger());
637    ///
638    /// // Can trigger again after reset
639    /// event.trigger()?;
640    /// assert!(event.is_trigger());
641    /// # }
642    /// # Ok::<(), std::io::Error>(())
643    /// ```
644    pub fn reset(&self) -> io::Result<()> {
645        let mut buf = [0; 8];
646        let mut guard = self.state.lock().unwrap();
647        *guard = 0;
648        loop {
649            unsafe {
650                let res = libc::read(
651                    self.read_fd.as_raw_fd(),
652                    buf.as_mut_ptr() as *mut _,
653                    buf.len(),
654                );
655                if res == -1 {
656                    let error = io::Error::last_os_error();
657                    return if error.kind() == io::ErrorKind::WouldBlock {
658                        Ok(())
659                    } else {
660                        Err(error)
661                    };
662                }
663            }
664        }
665    }
666    fn as_event_fd(&self) -> libc::c_int {
667        self.read_fd.as_raw_fd()
668    }
669}