Crate reactio

Source
Expand description

§Portable Reactor pattern in Rust.

Supported platforms: Linux, Windows

ReactIO is a Rust library that implements event-driven Reactor pattern in single-threaded and multiple-threaded environment. Users implement a Reactor (as least implement on_inbound_message) and add it to a ReactRuntime. Each ReactRuntime instance runs in a dedicated thread. It polls all events for all managed Reactors. There’are 2 kinds of events:

  • socket events. We only register socket READ events initially. MsgReader & MsgSender are provided for Reactor to send/receive messages.
  • commands. Through mpsc channel, reactors could send user defined commands to each other.

Key technologies:

  • IO events (epoll, iocp): handles IO only when there are event.
  • MsgSender helps socket read (messages are dispatched when full messages are received); MsgSender handles socket write (it queues unsent messages and auto resends).
  • No mutex or lock. Just send commands to a mpsc channel owned by ReactRuntime.
  • Deferred commands are executed in a defered time.

§Example

§Non-threaded polling model.

See example in reactor.rs.

use reactio;

/// SimpleIoReactor implements `Reactor` and calls user-given handlers on events.
pub fn test_io_reactor() {
    let addr = "127.0.0.1:12355";
    let recv_buffer_min_size = 1024;
    let mut runtime = reactio::SimpleIoRuntime::new();
    let max_echos = 1;
    let mut count_echos = 0;

    let on_sock_msg = move |buf: &mut [u8], ctx: &mut reactio::SimpleIoReactorContext<'_>| {
        if count_echos >= max_echos {
            return Err(format!("Reached max echo: {max_echos}")); // close socket
        }
        ctx.send_msg(buf)?; // echo back message.
        count_echos += 1;
        Ok(buf.len()) // return number of bytes having been consumed.
    };

    let on_server_connected = |ctx: &mut reactio::SimpleIoReactorContext<'_>, listenerid| {
        ctx.cmd_sender
            .send_close(listenerid, reactio::Deferred::Immediate, |_| {})?; // close parent listerner.
        Ok(()) // accept current connection.
    };

    let on_new_connection = move |_childid| {
        // create a new Reactor for the new connection.
        Some(reactio::SimpleIoReactor::new(
            Some(Box::new(on_server_connected)), // on_connected
            None,                                // on_closed
            on_sock_msg,                         // on_sock_msg
        ))
    };

    let on_client_connected = |ctx: &mut reactio::SimpleIoReactorContext<'_>, _| {
        ctx.send_msg("Hello".as_bytes())?; // client sends initial msg.
        Ok(()) // accept connection
    };

    //-- server
    runtime
        .get_cmd_sender()
        .send_listen(
            addr,
            reactio::SimpleIoListener::new(recv_buffer_min_size, on_new_connection),
            reactio::Deferred::Immediate,
            |_| {},
        )
        .unwrap();
    // wait for server ready.
    let timer = reactio::utils::Timer::new_millis(1000);
    while runtime.count_reactors() < 1 {
        if timer.expired() {
            assert!(false, "ERROR: timeout waiting for listener start!");
            break;
        }
        runtime.process_events();
    }
    //-- client
    runtime
        .get_cmd_sender()
        .send_connect(
            addr,
            recv_buffer_min_size,
            reactio::SimpleIoReactor::new(
                Some(Box::new(on_client_connected)), // on_connected
                None,                                // on_closed
                on_sock_msg,                         // on_sock_msg
            ),
            reactio::Deferred::Immediate,
            |_| {},
        )
        .unwrap();
    // In non-threaded environment, process_events until there're no reactors, no events, no deferred events.
    let timer = reactio::utils::Timer::new_millis(1000 * 20);
    while runtime.process_events() {
        if timer.expired() {
            assert!(false, "ERROR: timeout waiting for tests to complete!");
            break;
        }
    }
    assert_eq!(runtime.count_reactors(), 0);
    assert_eq!(runtime.count_deferred_queue(), 0);
}

See examples folder for more examples about echo_client/echo_server (test TCP latencies), PingpongReactor, PingpongReactor.

Modules§

Macros§

Structs§

  • CmdSender is owned by a ReactRuntime. Users send commands to a reactor with specific ReactorID.
  • CommandReactor doesn’t have associated sockets, so only process commands by calling cmd_handler.
  • DispatchContext contains all info that could be used to dispatch command/message to reactors.
  • MsgReader is a per-socket helper to read socket messages. It auto handles partial/multiple messages in recv buffer.
    On Readable event, call MsgReader::try_read_fast_dispatch/try_read_fast_read to read messages from a sock into a recv_buffer and calls dispatcher::on_inbound_message to dispatch message.
  • MsgSender is a per-socket object. It tries sending msg on a non-blocking socket. if sending fails due to WOULDBLOCK, the unsent bytes are saved and register a Write insterest in poller, so that the remaining data will be scheduled to send on next Writeable event.
  • Used to save the pending Send action.
  • ReactRuntime manages & owns Reactors which receive/send socket data or command. A ReactRuntime has a command queue, deferred command queue and a collection of reactors. Each reactor is assigned a ReactorID when adding to ReactRuntime. Users send command to a reactor with a specific ReactorID that belongs to the ReactRuntime. The command could be immediate or deferred for a time. E.g, on close of a reactor, it could send a command to the ReactRuntime to reconnect in future.
  • ReactorReableContext is a helper for a reactor to send/recv socket message, or send command.
  • SimpleIoListener implements TcpListenerHandler. When receiving new connection, it creates SimpleIoReactor using user specified reactor_creator.
  • SimpleIoReactor doesn’t have UserCommand. User supplies callback functions to handle inbound socket messages and on_connected/on_close events. On each readable socket event, the MsgReader reads all data and call on_sock_msg_handler to dispatch message.

Enums§

  • CommandCompletion is used as an argument of command completion callback.
  • Deferred is used to indicate a command to be executed immidately or in a deferred time.
  • MessageResult is returned by on_inbound_message to indicate result.
  • SendOrQueResult is the result of MsgSender::send_or_que or DispatchContext::send_msg.

Constants§

Traits§

  • NewServerReactor is used by TcpListenerHandler to create a new reactor when accept a new socket.
  • A Reactor is assigned a unique ReactorID when adding to a ReactRuntime, and is able to receive socket messsages (via reader) and commands. process_events of a ReactRuntime instance should be periodically called in a dedicated thread. Besides socket communication, Sending command is the only thread-safe way to communicate with a Reactor. A Reactor could send socket messages (via sender), or send commands (via cmd_sender) to another Reactor with specific ReactorID. A Reactor is destroyed when the socket is closed.
  • TcpListenerHandler handles incoming connections on a listening socket. Similar to Reactor, it’s destroyed when listening socket is closed.

Type Aliases§