fd_reactor/
lib.rs

1//! A reactor for handling file descriptors in a background thread.
2//!
3//! ## Implementation Notes
4//!
5//! - The reactor's background thread is spawned on the first time that the reactor handle is fetched.
6//! - Each file descriptor registers an interest to listen for.
7//! - On registering a new file descriptor, a pipe is used to interrupt the poll operation.
8
9use once_cell::sync::Lazy;
10use std::{
11    collections::HashMap,
12    fs::File,
13    io::{self, Read, Write},
14    os::unix::io::{AsRawFd, FromRawFd, RawFd},
15    sync::{
16        atomic::{AtomicBool, Ordering},
17        Arc, Mutex,
18    },
19    task::Waker,
20};
21
22type ReactorFds = Arc<Mutex<HashMap<RawFd, (Interest, Arc<AtomicBool>, Waker)>>>;
23
24bitflags::bitflags! {
25    /// Events that should be listened for on a given file descriptor.
26    pub struct Interest: i16 {
27        /// Listen for read events
28        const READ = libc::POLLIN;
29
30        /// Listen for write events
31        const WRITE = libc::POLLOUT;
32
33        /// Listen for both read and write events
34        const BOTH = libc::POLLIN | libc::POLLOUT;
35    }
36}
37
38/// A handle to the reactor, for registering and unregistering file descriptors.
39pub struct Handle {
40    /// A set of file descriptors which are currently registered on the reactor.
41    fds: ReactorFds,
42
43    /// The write end of the pipe, for interrupting the poll operation.
44    interrupt: File,
45}
46
47impl Handle {
48    /// Register a new file descriptor onto the reactor.
49    pub fn register(
50        &self,
51        fd: RawFd,
52        interest: Interest,
53        completed: Arc<AtomicBool>,
54        waker: Waker,
55    ) {
56        let mut lock = self.fds.lock().unwrap();
57        lock.insert(fd, (interest, completed, waker));
58        let _ = self.interrupt.try_clone().unwrap().write_all(b"0");
59    }
60
61    /// Unregister the given file descriptor from the reactor.
62    pub fn unregister(&self, fd: RawFd) {
63        let mut lock = self.fds.lock().unwrap();
64        lock.remove(&fd);
65        let _ = self.interrupt.try_clone().unwrap().write_all(b"0");
66    }
67}
68
69/// Fetches the handle to the reactor which is running in a background thread.
70pub static REACTOR: Lazy<Handle> = Lazy::new(|| {
71    // Create a pipe to use as an interruption mechanism.
72    let (mut reader, writer) = create_pipe();
73
74    let fds: ReactorFds = Arc::default();
75    let fds_ = fds.clone();
76
77    std::thread::spawn(move || {
78        let fds = fds_;
79        let mut pollers = Vec::new();
80        let mut buffer = [0u8; 1];
81
82        pollers.push(libc::pollfd {
83            fd: reader.as_raw_fd(),
84            events: libc::POLLIN,
85            revents: 0,
86        });
87
88        loop {
89            let returned = unsafe {
90                let pollers: &mut [libc::pollfd] = &mut pollers;
91                libc::poll(
92                    pollers as *mut _ as *mut libc::pollfd,
93                    pollers.len() as u64,
94                    -1,
95                )
96            };
97
98            if returned == -1 {
99                panic!(
100                    "fatal error in process reactor: {}",
101                    io::Error::last_os_error()
102                );
103            } else if returned < 1 {
104                continue;
105            }
106
107            let lock = fds.lock().unwrap();
108            if pollers[0].revents == libc::POLLIN {
109                let _ = reader.read(&mut buffer);
110            } else {
111                pollers[1..]
112                    .iter()
113                    .filter(|event| event.revents != 0)
114                    .for_each(|event| {
115                        if let Some(value) = lock.get(&event.fd) {
116                            if value
117                                .0
118                                .contains(Interest::from_bits_truncate(event.revents))
119                            {
120                                value.1.store(true, Ordering::SeqCst);
121                                value.2.wake_by_ref();
122                            }
123                        }
124                    })
125            }
126
127            pollers.clear();
128
129            pollers.push(libc::pollfd {
130                fd: reader.as_raw_fd(),
131                events: libc::POLLIN,
132                revents: 0,
133            });
134
135            for (&fd, &(interest, _, _)) in lock.iter() {
136                pollers.push(libc::pollfd {
137                    fd,
138                    events: interest.bits(),
139                    revents: 0,
140                });
141            }
142        }
143    });
144
145    Handle {
146        fds,
147        interrupt: writer,
148    }
149});
150
151fn create_pipe() -> (File, File) {
152    let mut fds = [0; 2];
153    unsafe { libc::pipe(&mut fds as *mut _ as *mut libc::c_int) };
154    let reader = unsafe { File::from_raw_fd(fds[0]) };
155    let writer = unsafe { File::from_raw_fd(fds[1]) };
156    (reader, writer)
157}