Skip to main content

Reactor

Struct Reactor 

Source
pub struct Reactor { /* private fields */ }
Expand description

A lightweight epoll reactor using edge-triggered monitoring (EPOLLET).

§Edge-Triggered Contract

Because this reactor uses EPOLLET, all handlers MUST drain their respective read or write sources until they receive an EAGAIN / EWOULDBLOCK error (represented as Ok(None) in the Fd helpers).

Failure to drain a source will result in missing future readiness events for that file descriptor until it is re-registered or another event occurs.

§Fork Safety

The Reactor owns an epoll descriptor which is O_CLOEXEC. After an exec call in a child process, the reactor and all its registrations are lost. If the child continues without exec, it shares the same epoll instance, which is generally unsafe and requires careful coordination.

§Example

let mut reactor = Reactor::new()?;
let token = reactor.add(&fd, true, false)?;

let mut events = Vec::new();
loop {
    reactor.wait(&mut events, 64, -1)?;
    for ev in &events {
        if ev.token == token {
            // Drain fd...
        }
    }
}

Implementations§

Source§

impl Reactor

Source

pub fn new() -> Result<Self, CoreError>

Create a new epoll reactor.

§Errors
  • EMFILE: Process limit on open file descriptors hit.
  • ENFILE: System-wide limit on open files hit.
  • ENOMEM: Insufficient kernel memory.
Examples found in repository?
examples/watch_temp.rs (line 6)
5fn main() -> Result<(), Box<dyn std::error::Error>> {
6    let mut reactor = Reactor::new()?;
7    let (inotify_fd, inotify_token) = reactor.setup_inotify()?;
8
9    let temp_dir = std::env::temp_dir();
10    let temp_dir_str = temp_dir.to_string_lossy().into_owned();
11
12    println!("Watching {} for modifications...", temp_dir_str);
13    add_watch(&inotify_fd, &temp_dir_str, MODIFY_MASK)?;
14
15    let mut events = Vec::new();
16    // Monitor for 10 seconds
17    let start = std::time::Instant::now();
18    while start.elapsed() < Duration::from_secs(10) {
19        let n = reactor.wait(&mut events, 64, 1000)?;
20        if n > 0 {
21            for ev in &events {
22                if ev.token == inotify_token {
23                    let in_events = read_events(&inotify_fd)?;
24                    for ie in in_events {
25                        if let Some(name) = ie.name {
26                            println!("File modified: {}", String::from_utf8_lossy(&name));
27                        } else {
28                            println!("Directory modified (wd={})", ie.wd);
29                        }
30                    }
31                }
32            }
33        }
34    }
35
36    println!("Finished watching.");
37    Ok(())
38}
Source

pub fn setup_inotify(&mut self) -> Result<(Fd, Token), CoreError>

Initialize inotify and add it to the reactor.

§Errors
  • EMFILE: Process limit on open file descriptors hit.
  • ENFILE: System-wide limit on open files hit.
  • ENOMEM: Insufficient kernel memory.
  • EPERM: Permission denied to create inotify instance.
Examples found in repository?
examples/watch_temp.rs (line 7)
5fn main() -> Result<(), Box<dyn std::error::Error>> {
6    let mut reactor = Reactor::new()?;
7    let (inotify_fd, inotify_token) = reactor.setup_inotify()?;
8
9    let temp_dir = std::env::temp_dir();
10    let temp_dir_str = temp_dir.to_string_lossy().into_owned();
11
12    println!("Watching {} for modifications...", temp_dir_str);
13    add_watch(&inotify_fd, &temp_dir_str, MODIFY_MASK)?;
14
15    let mut events = Vec::new();
16    // Monitor for 10 seconds
17    let start = std::time::Instant::now();
18    while start.elapsed() < Duration::from_secs(10) {
19        let n = reactor.wait(&mut events, 64, 1000)?;
20        if n > 0 {
21            for ev in &events {
22                if ev.token == inotify_token {
23                    let in_events = read_events(&inotify_fd)?;
24                    for ie in in_events {
25                        if let Some(name) = ie.name {
26                            println!("File modified: {}", String::from_utf8_lossy(&name));
27                        } else {
28                            println!("Directory modified (wd={})", ie.wd);
29                        }
30                    }
31                }
32            }
33        }
34    }
35
36    println!("Finished watching.");
37    Ok(())
38}
Source

pub fn setup_signalfd(&mut self) -> Result<Token, CoreError>

Initialize signalfd for SIGCHLD and add it to the reactor.

The previous current-thread signal mask is restored when the reactor is dropped.

§Errors
  • EBADF: The provided file descriptor is invalid.
  • EINVAL: Signal mask is invalid or already set up.
  • EMFILE: Process limit on open file descriptors hit.
Source

pub fn drain_signalfd(&self) -> Result<(), CoreError>

Drain the internal signalfd buffer.

Source

pub fn add( &mut self, fd: &Fd, readable: bool, writable: bool, ) -> Result<Token, CoreError>

Register a file descriptor with the reactor.

This assigns a new unique token for the descriptor and enables edge-triggered monitoring.

Source

pub fn add_priority(&mut self, fd: &Fd) -> Result<Token, CoreError>

Register a file descriptor for priority readiness (EPOLLPRI).

Source

pub fn add_with_flags( &mut self, fd: &Fd, flags: u32, ) -> Result<Token, CoreError>

Register a file descriptor with custom epoll flags.

This allows registration with flags like EPOLLONESHOT or explicit control over EPOLLET.

§Example
let mut reactor = Reactor::new().unwrap();
let fd = Fd::eventfd(0).unwrap();
reactor.add_with_flags(&fd, (libc::EPOLLIN | libc::EPOLLONESHOT) as u32).unwrap();
Source

pub fn del(&self, fd: &Fd) -> Result<(), CoreError>

Remove a file descriptor from the reactor.

Source

pub fn wait( &mut self, buffer: &mut Vec<Event>, max_events: usize, timeout: i32, ) -> Result<usize, CoreError>

Wait for events.

This function blocks until at least one event is ready or the timeout expires. Ready events are appended to the buffer.

§Timeout Contract
  • -1: Block indefinitely until an event occurs or a signal interrupts.
  • 0: Return immediately, even if no events are ready.
  • > 0: Wait for up to the specified number of milliseconds.

Returns the number of events received.

Examples found in repository?
examples/watch_temp.rs (line 19)
5fn main() -> Result<(), Box<dyn std::error::Error>> {
6    let mut reactor = Reactor::new()?;
7    let (inotify_fd, inotify_token) = reactor.setup_inotify()?;
8
9    let temp_dir = std::env::temp_dir();
10    let temp_dir_str = temp_dir.to_string_lossy().into_owned();
11
12    println!("Watching {} for modifications...", temp_dir_str);
13    add_watch(&inotify_fd, &temp_dir_str, MODIFY_MASK)?;
14
15    let mut events = Vec::new();
16    // Monitor for 10 seconds
17    let start = std::time::Instant::now();
18    while start.elapsed() < Duration::from_secs(10) {
19        let n = reactor.wait(&mut events, 64, 1000)?;
20        if n > 0 {
21            for ev in &events {
22                if ev.token == inotify_token {
23                    let in_events = read_events(&inotify_fd)?;
24                    for ie in in_events {
25                        if let Some(name) = ie.name {
26                            println!("File modified: {}", String::from_utf8_lossy(&name));
27                        } else {
28                            println!("Directory modified (wd={})", ie.wd);
29                        }
30                    }
31                }
32            }
33        }
34    }
35
36    println!("Finished watching.");
37    Ok(())
38}

Trait Implementations§

Source§

impl Drop for Reactor

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

fn pin_drop(self: Pin<&mut Self>)

🔬This is a nightly-only experimental API. (pin_ergonomics)
Execute the destructor for this type, but different to Drop::drop, it requires self to be pinned. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.