Skip to main content

Reactor

Struct Reactor 

Source
pub struct Reactor { /* private fields */ }
Expand description

A lightweight epoll reactor using edge-triggered monitoring (EPOLLET).

§Edge-Triggered Contract

Because this reactor uses EPOLLET, all handlers MUST drain their respective read or write sources until they receive an EAGAIN / EWOULDBLOCK error (represented as Ok(None) in the Fd helpers).

Failure to drain a source will result in missing future readiness events for that file descriptor until it is re-registered or another event occurs.

§Example

let mut reactor = Reactor::new()?;
let token = reactor.add(&fd, true, false)?;

let mut events = Vec::new();
loop {
    reactor.wait(&mut events, 64, -1)?;
    for ev in &events {
        if ev.token == token {
            // Drain fd...
        }
    }
}

Implementations§

Source§

impl Reactor

Source

pub fn new() -> Result<Self, CoreError>

Create a new epoll reactor.

§Errors

Returns CoreError if epoll_create1 fails.

Examples found in repository?
examples/watch_temp.rs (line 6)
5fn main() -> Result<(), Box<dyn std::error::Error>> {
6    let mut reactor = Reactor::new()?;
7    let (inotify_fd, inotify_token) = reactor.setup_inotify()?;
8
9    let temp_dir = std::env::temp_dir();
10    let temp_dir_str = temp_dir.to_string_lossy().into_owned();
11
12    println!("Watching {} for modifications...", temp_dir_str);
13    add_watch(&inotify_fd, &temp_dir_str, MODIFY_MASK)?;
14
15    let mut events = Vec::new();
16    // Monitor for 10 seconds
17    let start = std::time::Instant::now();
18    while start.elapsed() < Duration::from_secs(10) {
19        let n = reactor.wait(&mut events, 64, 1000)?;
20        if n > 0 {
21            for ev in &events {
22                if ev.token == inotify_token {
23                    let in_events = read_events(&inotify_fd)?;
24                    for ie in in_events {
25                        if let Some(name) = ie.name {
26                            println!("File modified: {}", String::from_utf8_lossy(&name));
27                        } else {
28                            println!("Directory modified (wd={})", ie.wd);
29                        }
30                    }
31                }
32            }
33        }
34    }
35
36    println!("Finished watching.");
37    Ok(())
38}
Source

pub fn setup_inotify(&mut self) -> Result<(Fd, Token), CoreError>

Initialize inotify and add it to the reactor.

§Errors

Returns CoreError if inotify_init1 or epoll_ctl fails.

Examples found in repository?
examples/watch_temp.rs (line 7)
5fn main() -> Result<(), Box<dyn std::error::Error>> {
6    let mut reactor = Reactor::new()?;
7    let (inotify_fd, inotify_token) = reactor.setup_inotify()?;
8
9    let temp_dir = std::env::temp_dir();
10    let temp_dir_str = temp_dir.to_string_lossy().into_owned();
11
12    println!("Watching {} for modifications...", temp_dir_str);
13    add_watch(&inotify_fd, &temp_dir_str, MODIFY_MASK)?;
14
15    let mut events = Vec::new();
16    // Monitor for 10 seconds
17    let start = std::time::Instant::now();
18    while start.elapsed() < Duration::from_secs(10) {
19        let n = reactor.wait(&mut events, 64, 1000)?;
20        if n > 0 {
21            for ev in &events {
22                if ev.token == inotify_token {
23                    let in_events = read_events(&inotify_fd)?;
24                    for ie in in_events {
25                        if let Some(name) = ie.name {
26                            println!("File modified: {}", String::from_utf8_lossy(&name));
27                        } else {
28                            println!("Directory modified (wd={})", ie.wd);
29                        }
30                    }
31                }
32            }
33        }
34    }
35
36    println!("Finished watching.");
37    Ok(())
38}
Source

pub fn setup_signalfd(&mut self) -> Result<Token, CoreError>

Initialize signalfd for SIGCHLD and add it to the reactor.

§Errors

Returns CoreError if pthread_sigmask, signalfd, or epoll_ctl fails.

The previous current-thread signal mask is restored when the reactor is dropped.

Source

pub fn drain_signalfd(&self) -> Result<(), CoreError>

Drain the internal signalfd buffer.

Source

pub fn add( &mut self, fd: &Fd, readable: bool, writable: bool, ) -> Result<Token, CoreError>

Register a file descriptor with the reactor.

This assigns a new unique token for the descriptor and enables edge-triggered monitoring.

Source

pub fn add_priority(&mut self, fd: &Fd) -> Result<Token, CoreError>

Register a file descriptor for priority readiness (EPOLLPRI).

Source

pub fn del(&self, fd: &Fd) -> Result<(), CoreError>

Remove a file descriptor from the reactor.

Source

pub fn wait( &mut self, buffer: &mut Vec<Event>, max_events: usize, timeout: i32, ) -> Result<usize, CoreError>

Wait for events.

This function blocks until at least one event is ready or the timeout expires. Ready events are appended to the buffer.

Returns the number of events received.

Examples found in repository?
examples/watch_temp.rs (line 19)
5fn main() -> Result<(), Box<dyn std::error::Error>> {
6    let mut reactor = Reactor::new()?;
7    let (inotify_fd, inotify_token) = reactor.setup_inotify()?;
8
9    let temp_dir = std::env::temp_dir();
10    let temp_dir_str = temp_dir.to_string_lossy().into_owned();
11
12    println!("Watching {} for modifications...", temp_dir_str);
13    add_watch(&inotify_fd, &temp_dir_str, MODIFY_MASK)?;
14
15    let mut events = Vec::new();
16    // Monitor for 10 seconds
17    let start = std::time::Instant::now();
18    while start.elapsed() < Duration::from_secs(10) {
19        let n = reactor.wait(&mut events, 64, 1000)?;
20        if n > 0 {
21            for ev in &events {
22                if ev.token == inotify_token {
23                    let in_events = read_events(&inotify_fd)?;
24                    for ie in in_events {
25                        if let Some(name) = ie.name {
26                            println!("File modified: {}", String::from_utf8_lossy(&name));
27                        } else {
28                            println!("Directory modified (wd={})", ie.wd);
29                        }
30                    }
31                }
32            }
33        }
34    }
35
36    println!("Finished watching.");
37    Ok(())
38}

Trait Implementations§

Source§

impl Drop for Reactor

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

fn pin_drop(self: Pin<&mut Self>)

🔬This is a nightly-only experimental API. (pin_ergonomics)
Execute the destructor for this type, but different to Drop::drop, it requires self to be pinned. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.