tun_rs/platform/unix/interrupt.rs
1/*!
2# Interruptible I/O Module
3
4This module provides support for interruptible I/O operations on Unix platforms.
5
6## Overview
7
8Interruptible I/O allows you to cancel blocking read/write operations on a TUN/TAP device
9from another thread. This is useful for graceful shutdown, implementing timeouts, or
10responding to signals.
11
12## Availability
13
14This module is only available when the `interruptible` feature flag is enabled:
15
16```toml
17[dependencies]
18tun-rs = { version = "2", features = ["interruptible"] }
19```
20
21## How It Works
22
23The implementation uses a pipe-based signaling mechanism:
24- An `InterruptEvent` creates a pipe internally
25- When triggered, it writes to the pipe
26- I/O operations use `poll()` (or `select()` on macOS) to wait on both the device fd and the pipe
27- If the pipe becomes readable, the I/O operation returns with `ErrorKind::Interrupted`
28
29## Usage
30
31```no_run
32# #[cfg(all(unix, feature = "interruptible"))]
33# {
34use tun_rs::{DeviceBuilder, InterruptEvent};
35use std::sync::Arc;
36use std::thread;
37
38let dev = DeviceBuilder::new()
39 .ipv4("10.0.0.1", 24, None)
40 .build_sync()?;
41
42let event = Arc::new(InterruptEvent::new()?);
43let event_clone = event.clone();
44
45// Spawn a thread that will read from the device
46let handle = thread::spawn(move || {
47 let mut buf = vec![0u8; 1500];
48 match dev.recv_intr(&mut buf, &event_clone) {
49 Ok(n) => println!("Received {} bytes", n),
50 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
51 println!("Read was interrupted");
52 }
53 Err(e) => eprintln!("Error: {}", e),
54 }
55});
56
57// From the main thread, trigger the interrupt
58thread::sleep(std::time::Duration::from_secs(1));
59event.trigger()?;
60
61handle.join().unwrap();
62# }
63# Ok::<(), std::io::Error>(())
64```
65
66## Performance Considerations
67
68- Interruptible I/O has slightly more overhead than regular I/O due to the additional poll() fd
69- The pipe is created once and reused across all operations
70- Non-blocking mode is set on the pipe fds to prevent deadlocks
71
72## Platform Support
73
74- **Linux**: Uses `poll()` with two file descriptors
75- **macOS**: Uses `select()` with fd_set
76- **FreeBSD/OpenBSD/NetBSD**: Uses `poll()` like Linux
77- **Windows**: Not supported (would need IOCP or overlapped I/O)
78
79## Thread Safety
80
81`InterruptEvent` is thread-safe and can be shared across threads using `Arc`.
82*/
83
84use crate::platform::unix::Fd;
85use std::io;
86use std::io::{IoSlice, IoSliceMut};
87use std::os::fd::AsRawFd;
88use std::sync::Mutex;
89
90impl Fd {
91 pub(crate) fn read_interruptible(
92 &self,
93 buf: &mut [u8],
94 event: &InterruptEvent,
95 timeout: Option<std::time::Duration>,
96 ) -> io::Result<usize> {
97 loop {
98 self.wait_readable_interruptible(event, timeout)?;
99 return match self.read(buf) {
100 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
101 continue;
102 }
103 rs => rs,
104 };
105 }
106 }
107 pub(crate) fn readv_interruptible(
108 &self,
109 bufs: &mut [IoSliceMut<'_>],
110 event: &InterruptEvent,
111 timeout: Option<std::time::Duration>,
112 ) -> io::Result<usize> {
113 loop {
114 self.wait_readable_interruptible(event, timeout)?;
115 return match self.readv(bufs) {
116 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
117 continue;
118 }
119
120 rs => rs,
121 };
122 }
123 }
124 pub(crate) fn write_interruptible(
125 &self,
126 buf: &[u8],
127 event: &InterruptEvent,
128 ) -> io::Result<usize> {
129 loop {
130 self.wait_writable_interruptible(event)?;
131 return match self.write(buf) {
132 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
133 continue;
134 }
135 rs => rs,
136 };
137 }
138 }
139 pub fn writev_interruptible(
140 &self,
141 bufs: &[IoSlice<'_>],
142 event: &InterruptEvent,
143 ) -> io::Result<usize> {
144 loop {
145 self.wait_writable_interruptible(event)?;
146 return match self.writev(bufs) {
147 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
148 continue;
149 }
150 rs => rs,
151 };
152 }
153 }
154 pub fn wait_readable_interruptible(
155 &self,
156 interrupted_event: &InterruptEvent,
157 timeout: Option<std::time::Duration>,
158 ) -> io::Result<()> {
159 let fd = self.as_raw_fd() as libc::c_int;
160 let event_fd = interrupted_event.as_event_fd();
161
162 let mut fds = [
163 libc::pollfd {
164 fd,
165 events: libc::POLLIN,
166 revents: 0,
167 },
168 libc::pollfd {
169 fd: event_fd,
170 events: libc::POLLIN,
171 revents: 0,
172 },
173 ];
174
175 let result = unsafe {
176 libc::poll(
177 fds.as_mut_ptr(),
178 fds.len() as libc::nfds_t,
179 timeout
180 .map(|t| t.as_millis().min(i32::MAX as _) as _)
181 .unwrap_or(-1),
182 )
183 };
184
185 if result == -1 {
186 return Err(io::Error::last_os_error());
187 }
188 if result == 0 {
189 return Err(io::Error::from(io::ErrorKind::TimedOut));
190 }
191 if fds[0].revents & libc::POLLIN != 0 {
192 return Ok(());
193 }
194
195 if fds[1].revents & libc::POLLIN != 0 {
196 return Err(io::Error::new(
197 io::ErrorKind::Interrupted,
198 "trigger interrupt",
199 ));
200 }
201
202 Err(io::Error::other("fd error"))
203 }
204 pub fn wait_writable_interruptible(
205 &self,
206 interrupted_event: &InterruptEvent,
207 ) -> io::Result<()> {
208 let fd = self.as_raw_fd() as libc::c_int;
209 let event_fd = interrupted_event.as_event_fd();
210
211 let mut fds = [
212 libc::pollfd {
213 fd,
214 events: libc::POLLOUT,
215 revents: 0,
216 },
217 libc::pollfd {
218 fd: event_fd,
219 events: libc::POLLIN,
220 revents: 0,
221 },
222 ];
223
224 let result = unsafe { libc::poll(fds.as_mut_ptr(), fds.len() as libc::nfds_t, -1) };
225
226 if result == -1 {
227 return Err(io::Error::last_os_error());
228 }
229 if fds[0].revents & libc::POLLOUT != 0 {
230 return Ok(());
231 }
232
233 if fds[1].revents & libc::POLLIN != 0 {
234 return Err(io::Error::new(
235 io::ErrorKind::Interrupted,
236 "trigger interrupt",
237 ));
238 }
239
240 Err(io::Error::other("fd error"))
241 }
242}
243
244#[cfg(target_os = "macos")]
245impl Fd {
246 pub fn wait_writable(
247 &self,
248 interrupt_event: Option<&InterruptEvent>,
249 timeout: Option<std::time::Duration>,
250 ) -> io::Result<()> {
251 let readfds: libc::fd_set = unsafe { std::mem::zeroed() };
252 let mut writefds: libc::fd_set = unsafe { std::mem::zeroed() };
253 let fd = self.as_raw_fd();
254 unsafe {
255 libc::FD_SET(fd, &mut writefds);
256 }
257 self.wait(readfds, Some(writefds), interrupt_event, timeout)
258 }
259 pub fn wait_readable(
260 &self,
261 interrupt_event: Option<&InterruptEvent>,
262 timeout: Option<std::time::Duration>,
263 ) -> io::Result<()> {
264 let mut readfds: libc::fd_set = unsafe { std::mem::zeroed() };
265 let fd = self.as_raw_fd();
266 unsafe {
267 libc::FD_SET(fd, &mut readfds);
268 }
269 self.wait(readfds, None, interrupt_event, timeout)
270 }
271 fn wait(
272 &self,
273 mut readfds: libc::fd_set,
274 mut writefds: Option<libc::fd_set>,
275 interrupt_event: Option<&InterruptEvent>,
276 timeout: Option<std::time::Duration>,
277 ) -> io::Result<()> {
278 let fd = self.as_raw_fd();
279 let mut errorfds: libc::fd_set = unsafe { std::mem::zeroed() };
280 let mut nfds = fd;
281
282 if let Some(interrupt_event) = interrupt_event {
283 unsafe {
284 libc::FD_SET(interrupt_event.as_event_fd(), &mut readfds);
285 }
286 nfds = nfds.max(interrupt_event.as_event_fd());
287 }
288 let mut tv = libc::timeval {
289 tv_sec: 0,
290 tv_usec: 0,
291 };
292 let tv_ptr = if let Some(timeout) = timeout {
293 let secs = timeout.as_secs().min(libc::time_t::MAX as u64) as libc::time_t;
294 let usecs = (timeout.subsec_micros()) as libc::suseconds_t;
295 tv.tv_sec = secs;
296 tv.tv_usec = usecs;
297 &mut tv as *mut libc::timeval
298 } else {
299 std::ptr::null_mut()
300 };
301
302 let result = unsafe {
303 libc::select(
304 nfds + 1,
305 &mut readfds,
306 writefds
307 .as_mut()
308 .map(|p| p as *mut _)
309 .unwrap_or_else(std::ptr::null_mut),
310 &mut errorfds,
311 tv_ptr,
312 )
313 };
314 if result < 0 {
315 return Err(io::Error::last_os_error());
316 }
317 if result == 0 {
318 return Err(io::Error::from(io::ErrorKind::TimedOut));
319 }
320 unsafe {
321 if let Some(cancel_event) = interrupt_event {
322 if libc::FD_ISSET(cancel_event.as_event_fd(), &readfds) {
323 return Err(io::Error::new(
324 io::ErrorKind::Interrupted,
325 "trigger interrupt",
326 ));
327 }
328 }
329 }
330 Ok(())
331 }
332}
333/// Event object for interrupting blocking I/O operations.
334///
335/// `InterruptEvent` provides a mechanism to cancel blocking read/write operations
336/// from another thread. It uses a pipe-based signaling mechanism internally.
337///
338/// # Thread Safety
339///
340/// This type is thread-safe and can be shared across threads using `Arc<InterruptEvent>`.
341///
342/// # Examples
343///
344/// Basic usage with interruptible read:
345///
346/// ```no_run
347/// # #[cfg(all(unix, feature = "interruptible"))]
348/// # {
349/// use std::sync::Arc;
350/// use std::thread;
351/// use std::time::Duration;
352/// use tun_rs::{DeviceBuilder, InterruptEvent};
353///
354/// let device = DeviceBuilder::new()
355/// .ipv4("10.0.0.1", 24, None)
356/// .build_sync()?;
357///
358/// let event = Arc::new(InterruptEvent::new()?);
359/// let event_clone = event.clone();
360///
361/// let reader = thread::spawn(move || {
362/// let mut buf = vec![0u8; 1500];
363/// device.recv_intr(&mut buf, &event_clone)
364/// });
365///
366/// // Give the reader time to start blocking
367/// thread::sleep(Duration::from_millis(100));
368///
369/// // Trigger the interrupt
370/// event.trigger()?;
371///
372/// match reader.join().unwrap() {
373/// Ok(n) => println!("Read {} bytes", n),
374/// Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
375/// println!("Successfully interrupted!");
376/// }
377/// Err(e) => eprintln!("Error: {}", e),
378/// }
379/// # }
380/// # Ok::<(), std::io::Error>(())
381/// ```
382///
383/// Using with a timeout:
384///
385/// ```no_run
386/// # #[cfg(all(unix, feature = "interruptible"))]
387/// # {
388/// use std::time::Duration;
389/// use tun_rs::{DeviceBuilder, InterruptEvent};
390///
391/// let device = DeviceBuilder::new()
392/// .ipv4("10.0.0.1", 24, None)
393/// .build_sync()?;
394///
395/// let event = InterruptEvent::new()?;
396/// let mut buf = vec![0u8; 1500];
397///
398/// // Will return an error if no data arrives within 5 seconds
399/// match device.recv_intr_timeout(&mut buf, &event, Some(Duration::from_secs(5))) {
400/// Ok(n) => println!("Read {} bytes", n),
401/// Err(e) if e.kind() == std::io::ErrorKind::TimedOut => {
402/// println!("Timed out waiting for data");
403/// }
404/// Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
405/// println!("Operation was interrupted");
406/// }
407/// Err(e) => eprintln!("Error: {}", e),
408/// }
409/// # }
410/// # Ok::<(), std::io::Error>(())
411/// ```
412///
413/// # Implementation Details
414///
415/// - Uses a Unix pipe for signaling
416/// - Both read and write ends are set to non-blocking mode
417/// - State is protected by a mutex to prevent race conditions
418/// - Once triggered, the event remains triggered until reset
419pub struct InterruptEvent {
420 state: Mutex<i32>,
421 read_fd: Fd,
422 write_fd: Fd,
423}
424impl InterruptEvent {
425 /// Creates a new `InterruptEvent`.
426 ///
427 /// This allocates a Unix pipe for signaling and sets both ends to non-blocking mode.
428 ///
429 /// # Errors
430 ///
431 /// Returns an error if:
432 /// - The pipe creation fails (e.g., out of file descriptors)
433 /// - Setting non-blocking mode fails
434 ///
435 /// # Example
436 ///
437 /// ```no_run
438 /// # #[cfg(all(unix, feature = "interruptible"))]
439 /// # {
440 /// use tun_rs::InterruptEvent;
441 ///
442 /// let event = InterruptEvent::new()?;
443 /// # }
444 /// # Ok::<(), std::io::Error>(())
445 /// ```
446 pub fn new() -> io::Result<Self> {
447 let mut fds: [libc::c_int; 2] = [0; 2];
448
449 unsafe {
450 if libc::pipe(fds.as_mut_ptr()) == -1 {
451 return Err(io::Error::last_os_error());
452 }
453 let read_fd = Fd::new_unchecked(fds[0]);
454 let write_fd = Fd::new_unchecked(fds[1]);
455 write_fd.set_nonblocking(true)?;
456 read_fd.set_nonblocking(true)?;
457 Ok(Self {
458 state: Mutex::new(0),
459 read_fd,
460 write_fd,
461 })
462 }
463 }
464
465 /// Triggers the interrupt event with value 1.
466 ///
467 /// This will cause any blocking I/O operations waiting on this event to return
468 /// with `ErrorKind::Interrupted`.
469 ///
470 /// Calling `trigger()` multiple times before `reset()` has no additional effect -
471 /// the event remains triggered.
472 ///
473 /// # Errors
474 ///
475 /// Returns an error if writing to the internal pipe fails, though this is rare
476 /// in practice (pipe write errors are usually `WouldBlock`, which is handled).
477 ///
478 /// # Example
479 ///
480 /// ```no_run
481 /// # #[cfg(all(unix, feature = "interruptible"))]
482 /// # {
483 /// use std::sync::Arc;
484 /// use std::thread;
485 /// use tun_rs::{DeviceBuilder, InterruptEvent};
486 ///
487 /// let event = Arc::new(InterruptEvent::new()?);
488 /// let event_clone = event.clone();
489 ///
490 /// thread::spawn(move || {
491 /// // ... blocking I/O with event_clone ...
492 /// });
493 ///
494 /// // Interrupt the operation
495 /// event.trigger()?;
496 /// # }
497 /// # Ok::<(), std::io::Error>(())
498 /// ```
499 pub fn trigger(&self) -> io::Result<()> {
500 self.trigger_value(1)
501 }
502
503 /// Triggers the interrupt event with a specific value.
504 ///
505 /// Similar to [`trigger()`](Self::trigger), but allows specifying a custom value
506 /// that can be retrieved with [`value()`](Self::value).
507 ///
508 /// # Arguments
509 ///
510 /// * `val` - The value to store (must be non-zero)
511 ///
512 /// # Errors
513 ///
514 /// - Returns `ErrorKind::InvalidInput` if `val` is 0
515 /// - Returns an error if writing to the pipe fails
516 ///
517 /// # Example
518 ///
519 /// ```no_run
520 /// # #[cfg(all(unix, feature = "interruptible"))]
521 /// # {
522 /// use tun_rs::InterruptEvent;
523 ///
524 /// let event = InterruptEvent::new()?;
525 ///
526 /// // Trigger with a custom value (e.g., signal number)
527 /// event.trigger_value(15)?; // SIGTERM
528 ///
529 /// // Later, check what value was used
530 /// assert_eq!(event.value(), 15);
531 /// # }
532 /// # Ok::<(), std::io::Error>(())
533 /// ```
534 pub fn trigger_value(&self, val: i32) -> io::Result<()> {
535 if val == 0 {
536 return Err(io::Error::new(
537 io::ErrorKind::InvalidInput,
538 "value cannot be 0",
539 ));
540 }
541 let mut guard = self.state.lock().unwrap();
542 if *guard != 0 {
543 return Ok(());
544 }
545 *guard = val;
546 let buf: [u8; 8] = 1u64.to_ne_bytes();
547 let res = unsafe {
548 libc::write(
549 self.write_fd.as_raw_fd(),
550 buf.as_ptr() as *const _,
551 buf.len(),
552 )
553 };
554 if res == -1 {
555 let e = io::Error::last_os_error();
556 if e.kind() == io::ErrorKind::WouldBlock {
557 return Ok(());
558 }
559 Err(e)
560 } else {
561 Ok(())
562 }
563 }
564 /// Checks if the event has been triggered.
565 ///
566 /// Returns `true` if the event is currently in the triggered state (value is non-zero).
567 ///
568 /// # Example
569 ///
570 /// ```no_run
571 /// # #[cfg(all(unix, feature = "interruptible"))]
572 /// # {
573 /// use tun_rs::InterruptEvent;
574 ///
575 /// let event = InterruptEvent::new()?;
576 /// assert!(!event.is_trigger());
577 ///
578 /// event.trigger()?;
579 /// assert!(event.is_trigger());
580 ///
581 /// event.reset()?;
582 /// assert!(!event.is_trigger());
583 /// # }
584 /// # Ok::<(), std::io::Error>(())
585 /// ```
586 pub fn is_trigger(&self) -> bool {
587 *self.state.lock().unwrap() != 0
588 }
589
590 /// Returns the current trigger value.
591 ///
592 /// Returns 0 if the event is not triggered, or the value passed to
593 /// [`trigger_value()`](Self::trigger_value) if triggered.
594 ///
595 /// # Example
596 ///
597 /// ```no_run
598 /// # #[cfg(all(unix, feature = "interruptible"))]
599 /// # {
600 /// use tun_rs::InterruptEvent;
601 ///
602 /// let event = InterruptEvent::new()?;
603 /// assert_eq!(event.value(), 0);
604 ///
605 /// event.trigger_value(42)?;
606 /// assert_eq!(event.value(), 42);
607 /// # }
608 /// # Ok::<(), std::io::Error>(())
609 /// ```
610 pub fn value(&self) -> i32 {
611 *self.state.lock().unwrap()
612 }
613
614 /// Resets the event to the non-triggered state.
615 ///
616 /// This drains any pending data from the internal pipe and sets the state back to 0.
617 /// After calling `reset()`, the event can be triggered again.
618 ///
619 /// # Errors
620 ///
621 /// Returns an error if reading from the pipe fails (other than `WouldBlock`).
622 ///
623 /// # Example
624 ///
625 /// ```no_run
626 /// # #[cfg(all(unix, feature = "interruptible"))]
627 /// # {
628 /// use tun_rs::InterruptEvent;
629 ///
630 /// let event = InterruptEvent::new()?;
631 ///
632 /// event.trigger()?;
633 /// assert!(event.is_trigger());
634 ///
635 /// event.reset()?;
636 /// assert!(!event.is_trigger());
637 ///
638 /// // Can trigger again after reset
639 /// event.trigger()?;
640 /// assert!(event.is_trigger());
641 /// # }
642 /// # Ok::<(), std::io::Error>(())
643 /// ```
644 pub fn reset(&self) -> io::Result<()> {
645 let mut buf = [0; 8];
646 let mut guard = self.state.lock().unwrap();
647 *guard = 0;
648 loop {
649 unsafe {
650 let res = libc::read(
651 self.read_fd.as_raw_fd(),
652 buf.as_mut_ptr() as *mut _,
653 buf.len(),
654 );
655 if res == -1 {
656 let error = io::Error::last_os_error();
657 return if error.kind() == io::ErrorKind::WouldBlock {
658 Ok(())
659 } else {
660 Err(error)
661 };
662 }
663 }
664 }
665 }
666 fn as_event_fd(&self) -> libc::c_int {
667 self.read_fd.as_raw_fd()
668 }
669}