Crate reactio

Source
Expand description

§Portable Reactor pattern in Rust.

Supported platforms: Linux, Windows

ReactIO is a Rust library that implements event-driven Reactor pattern in single-threaded and multiple-threaded environment. Users implement a Reactor (as least implement on_inbound_message) and add it to a ReactRuntime. Each ReactRuntime instance runs in a dedicated thread. It polls all events for all managed Reactors. There’are 2 kinds of events:

  • Socket events. We only register socket READ events initially. MsgReader & MsgSender are provided for Reactor to send/receive messages. MsgReader reads from buffer and dispatches messages. MsgSender sends or register SEND event for resend when seeing WOULDBLOCK.
  • Commands. Through mpsc channel, reactors could send user defined commands to each other.

Key technologies:

  • IO events (epoll, iocp): handles IO only when there are event.
  • MsgSender helps socket read (messages are dispatched when full messages are received); MsgSender handles socket write (it queues unsent messages and auto resends).
  • No mutex or lock. Just send commands to a mpsc channel owned by ReactRuntime.
  • Deferred commands are executed in a defered time.

§Example

§Non-threaded polling model.

See example in reactor.rs.

use reactio;
use reactio::logerr;
use std::io::Write;

/// SimpleIoReactor implements `Reactor` and calls user handlers on events.
pub fn test_io_reactor() {
    type AppData = (); // no application data for reactor.
    let app_data = ();
    let addr = "127.0.0.1:12355";
    let recv_buffer_min_size = 1024;
    let mut runtime = reactio::SimpleIoRuntime::new();

    let on_server_sock_msg =
        |buf: &mut [u8], ctx: &mut reactio::SimpleIoReactorContext<'_>, _: &mut AppData| {
            ctx.send_or_que(buf)?; // echo back message.
            Ok(buf.len()) // return number of bytes having been consumed.
        };

    let on_server_connected =
        |ctx: &mut reactio::SimpleIoReactorContext<'_>, listenerid, _: &mut AppData| {
            ctx.cmd_sender
                .send_close(listenerid, reactio::Deferred::Immediate, |_| {})?; // close parent listerner.
            Ok(()) // accept current connection.
        };

    let on_new_connection = move |_childid| {
        // create a new Reactor for the new connection.
        Some(reactio::SimpleIoReactor::<AppData>::new_boxed(
            app_data,
            Some(Box::new(on_server_connected)), // on_connected
            None,                                // on_closed
            on_server_sock_msg,                  // on_sock_msg
        ))
    };

    //-- server
    runtime
        .get_cmd_sender()
        .send_listen(
            addr,
            reactio::SimpleIoListener::new(recv_buffer_min_size, on_new_connection),
            reactio::Deferred::Immediate,
            |_| {}, // OnCommandCompletion
        )
        .unwrap();
    // wait for server ready.
    let timer = reactio::utils::Timer::new_millis(1000);
    while runtime.count_reactors() < 1 {
        if timer.expired() {
            logerr!("ERROR: timeout waiting for listener start!");
            break;
        }
        runtime.process_events();
    }
    //-- client
    let on_client_connected =
        move |ctx: &mut reactio::SimpleIoReactorContext<'_>, _, _: &mut AppData| {
            // client sends initial msg.
            let mut auto_sender = ctx.acquire_send(); // send on drop
            auto_sender.write_fmt(format_args!("test ")).unwrap();
            auto_sender.write_fmt(format_args!("msgsend")).unwrap();
            assert_eq!(auto_sender.count_written(), 12);
            assert_eq!(auto_sender.get_written(), b"test msgsend");
            // auto_sender.send(None).unwrap(); // this line can be omitted to let it auto send on drop.
            // ctx.send_or_que("Hello".as_bytes())?; // rather than using auto_sender, we call ctx.send_or_que
            Ok(()) // accept connection
        };
    let on_client_sock_msg =
        |_buf: &mut [u8], _ctx: &mut reactio::SimpleIoReactorContext<'_>, _: &mut AppData| {
            Err("Client disconnect on recv response.".to_owned())
        };

    runtime
        .get_cmd_sender()
        .send_connect(
            addr,
            recv_buffer_min_size,
            reactio::SimpleIoReactor::new(
                app_data,
                Some(Box::new(on_client_connected)), // on_connected
                None,                                // on_closed
                on_client_sock_msg,                  // on_sock_msg
            ),
            reactio::Deferred::Immediate,
            |_| {}, // OnCommandCompletion
        )
        .unwrap();
    // In non-threaded environment, process_events until there're no reactors, no events, no deferred events.
    let timer = reactio::utils::Timer::new_millis(1000);
    while runtime.process_events() {
        if timer.expired() {
            logerr!("ERROR: timeout waiting for tests to complete!");
            break;
        }
    }
    assert_eq!(runtime.count_reactors(), 0);
    assert_eq!(runtime.count_deferred_queue(), 0);
}

More examples are echo_client/echo_server (for TCP latency test), PingpongReactor, ThreadedPingpongReactor.

Modules§

Macros§

Structs§

  • CmdSender is owned by a ReactRuntime. Users send commands to a reactor with specific ReactorID.
  • DispatchContext contains all info that could be used to dispatch command/message to reactors.
  • MsgReader is a per-socket helper to read socket messages. It auto handles partial/multiple messages in recv buffer.
    On Readable event, call MsgReader::try_read_fast_dispatch/try_read_fast_read to read messages from a sock into a recv_buffer and calls dispatcher::on_inbound_message to dispatch message.
  • MsgSender is a per-socket object. It tries sending msg on a non-blocking socket. if sending fails due to WOULDBLOCK, the unsent bytes are saved and register a Write insterest in poller, so that the remaining data will be scheduled to send on next Writeable event.
  • Used to save the pending Send action.
  • ReactRuntime manages & owns Reactors which receive/send socket data or command. A ReactRuntime has a command queue, deferred command queue and a collection of reactors. Each reactor is assigned a ReactorID when adding to ReactRuntime. Users send command to a reactor with a specific ReactorID that belongs to the ReactRuntime. The command could be immediate or deferred for a time. E.g, on close of a reactor, it could send a command to the ReactRuntime to reconnect in future.
  • ReactorReableContext is a helper for a reactor to send/recv socket message, or send command.
  • SimpleIoListener implements TcpListenerHandler. When receiving new connection, it creates SimpleIoReactor using user specified reactor_creator.
  • SimpleIoReactor doesn’t have UserCommand. User supplies callback functions to handle inbound socket messages and on_connected/on_close events. On each readable socket event, the MsgReader reads all data and call on_sock_msg_handler to dispatch message.
  • A SimpleIoService instance serves multiple sockets, which diffs from SimpleIoReactor/SimpleIoListener that serves a socket per instance. See test_io_service

Enums§

  • Deferred is used to indicate a command to be executed immidately or in a deferred time.
  • MessageResult is returned by on_inbound_message to indicate result.
  • SendOrQueResult is the result of MsgSender::send_or_que or DispatchContext::send_or_que.

Constants§

Traits§

  • NewServerReactor is used by TcpListenerHandler to create a new reactor when accept a new socket.
  • A Reactor is assigned a unique ReactorID when adding to a ReactRuntime, and is able to receive socket messsages (via reader) and commands. process_events of a ReactRuntime instance should be periodically called in a dedicated thread. Besides socket communication, Sending command is the only thread-safe way to communicate with a Reactor. A Reactor could send socket messages (via sender), or send commands (via cmd_sender) to another Reactor with specific ReactorID. A Reactor is destroyed when the socket is closed.
  • TcpListenerHandler handles incoming connections on a listening socket. Similar to Reactor, it’s destroyed when listening socket is closed.

Type Aliases§