pub struct Reactor { /* private fields */ }Expand description
A lightweight epoll reactor using edge-triggered monitoring (EPOLLET).
§Edge-Triggered Contract
Because this reactor uses EPOLLET, all handlers MUST drain their respective
read or write sources until they receive an EAGAIN / EWOULDBLOCK error
(represented as Ok(None) in the Fd helpers).
Failure to drain a source will result in missing future readiness events for that file descriptor until it is re-registered or another event occurs.
§Fork Safety
The Reactor owns an epoll descriptor which is O_CLOEXEC. After an
exec call in a child process, the reactor and all its registrations are
lost. If the child continues without exec, it shares the same epoll
instance, which is generally unsafe and requires careful coordination.
§Example
let mut reactor = Reactor::new()?;
let token = reactor.add(&fd, true, false)?;
let mut events = Vec::new();
loop {
reactor.wait(&mut events, 64, -1)?;
for ev in &events {
if ev.token == token {
// Drain fd...
}
}
}Implementations§
Source§impl Reactor
impl Reactor
Sourcepub fn new() -> Result<Self, CoreError>
pub fn new() -> Result<Self, CoreError>
Create a new epoll reactor.
§Errors
EMFILE: Process limit on open file descriptors hit.ENFILE: System-wide limit on open files hit.ENOMEM: Insufficient kernel memory.
Examples found in repository?
5fn main() -> Result<(), Box<dyn std::error::Error>> {
6 let mut reactor = Reactor::new()?;
7 let (inotify_fd, inotify_token) = reactor.setup_inotify()?;
8
9 let temp_dir = std::env::temp_dir();
10 let temp_dir_str = temp_dir.to_string_lossy().into_owned();
11
12 println!("Watching {} for modifications...", temp_dir_str);
13 add_watch(&inotify_fd, &temp_dir_str, MODIFY_MASK)?;
14
15 let mut events = Vec::new();
16 // Monitor for 10 seconds
17 let start = std::time::Instant::now();
18 while start.elapsed() < Duration::from_secs(10) {
19 let n = reactor.wait(&mut events, 64, 1000)?;
20 if n > 0 {
21 for ev in &events {
22 if ev.token == inotify_token {
23 let in_events = read_events(&inotify_fd)?;
24 for ie in in_events {
25 if let Some(name) = ie.name {
26 println!("File modified: {}", String::from_utf8_lossy(&name));
27 } else {
28 println!("Directory modified (wd={})", ie.wd);
29 }
30 }
31 }
32 }
33 }
34 }
35
36 println!("Finished watching.");
37 Ok(())
38}Sourcepub fn setup_inotify(&mut self) -> Result<(Fd, Token), CoreError>
pub fn setup_inotify(&mut self) -> Result<(Fd, Token), CoreError>
Initialize inotify and add it to the reactor.
§Errors
EMFILE: Process limit on open file descriptors hit.ENFILE: System-wide limit on open files hit.ENOMEM: Insufficient kernel memory.EPERM: Permission denied to create inotify instance.
Examples found in repository?
5fn main() -> Result<(), Box<dyn std::error::Error>> {
6 let mut reactor = Reactor::new()?;
7 let (inotify_fd, inotify_token) = reactor.setup_inotify()?;
8
9 let temp_dir = std::env::temp_dir();
10 let temp_dir_str = temp_dir.to_string_lossy().into_owned();
11
12 println!("Watching {} for modifications...", temp_dir_str);
13 add_watch(&inotify_fd, &temp_dir_str, MODIFY_MASK)?;
14
15 let mut events = Vec::new();
16 // Monitor for 10 seconds
17 let start = std::time::Instant::now();
18 while start.elapsed() < Duration::from_secs(10) {
19 let n = reactor.wait(&mut events, 64, 1000)?;
20 if n > 0 {
21 for ev in &events {
22 if ev.token == inotify_token {
23 let in_events = read_events(&inotify_fd)?;
24 for ie in in_events {
25 if let Some(name) = ie.name {
26 println!("File modified: {}", String::from_utf8_lossy(&name));
27 } else {
28 println!("Directory modified (wd={})", ie.wd);
29 }
30 }
31 }
32 }
33 }
34 }
35
36 println!("Finished watching.");
37 Ok(())
38}Sourcepub fn setup_signalfd(&mut self) -> Result<Token, CoreError>
pub fn setup_signalfd(&mut self) -> Result<Token, CoreError>
Initialize signalfd for SIGCHLD and add it to the reactor.
The previous current-thread signal mask is restored when the reactor is dropped.
§Errors
EBADF: The provided file descriptor is invalid.EINVAL: Signal mask is invalid or already set up.EMFILE: Process limit on open file descriptors hit.
Sourcepub fn drain_signalfd(&self) -> Result<(), CoreError>
pub fn drain_signalfd(&self) -> Result<(), CoreError>
Drain the internal signalfd buffer.
Sourcepub fn add(
&mut self,
fd: &Fd,
readable: bool,
writable: bool,
) -> Result<Token, CoreError>
pub fn add( &mut self, fd: &Fd, readable: bool, writable: bool, ) -> Result<Token, CoreError>
Register a file descriptor with the reactor.
This assigns a new unique token for the descriptor and enables edge-triggered monitoring.
Sourcepub fn add_priority(&mut self, fd: &Fd) -> Result<Token, CoreError>
pub fn add_priority(&mut self, fd: &Fd) -> Result<Token, CoreError>
Register a file descriptor for priority readiness (EPOLLPRI).
Sourcepub fn add_with_flags(
&mut self,
fd: &Fd,
flags: u32,
) -> Result<Token, CoreError>
pub fn add_with_flags( &mut self, fd: &Fd, flags: u32, ) -> Result<Token, CoreError>
Register a file descriptor with custom epoll flags.
This allows registration with flags like EPOLLONESHOT or explicit
control over EPOLLET.
§Example
let mut reactor = Reactor::new().unwrap();
let fd = Fd::eventfd(0).unwrap();
reactor.add_with_flags(&fd, (libc::EPOLLIN | libc::EPOLLONESHOT) as u32).unwrap();Sourcepub fn wait(
&mut self,
buffer: &mut Vec<Event>,
max_events: usize,
timeout: i32,
) -> Result<usize, CoreError>
pub fn wait( &mut self, buffer: &mut Vec<Event>, max_events: usize, timeout: i32, ) -> Result<usize, CoreError>
Wait for events.
This function blocks until at least one event is ready or the timeout
expires. Ready events are appended to the buffer.
§Timeout Contract
-1: Block indefinitely until an event occurs or a signal interrupts.0: Return immediately, even if no events are ready.> 0: Wait for up to the specified number of milliseconds.
Returns the number of events received.
Examples found in repository?
5fn main() -> Result<(), Box<dyn std::error::Error>> {
6 let mut reactor = Reactor::new()?;
7 let (inotify_fd, inotify_token) = reactor.setup_inotify()?;
8
9 let temp_dir = std::env::temp_dir();
10 let temp_dir_str = temp_dir.to_string_lossy().into_owned();
11
12 println!("Watching {} for modifications...", temp_dir_str);
13 add_watch(&inotify_fd, &temp_dir_str, MODIFY_MASK)?;
14
15 let mut events = Vec::new();
16 // Monitor for 10 seconds
17 let start = std::time::Instant::now();
18 while start.elapsed() < Duration::from_secs(10) {
19 let n = reactor.wait(&mut events, 64, 1000)?;
20 if n > 0 {
21 for ev in &events {
22 if ev.token == inotify_token {
23 let in_events = read_events(&inotify_fd)?;
24 for ie in in_events {
25 if let Some(name) = ie.name {
26 println!("File modified: {}", String::from_utf8_lossy(&name));
27 } else {
28 println!("Directory modified (wd={})", ie.wd);
29 }
30 }
31 }
32 }
33 }
34 }
35
36 println!("Finished watching.");
37 Ok(())
38}