Crate reactio

source
Expand description

§Portable Reactor pattern in Rust.

Supported platforms: Linux, Windows

ReactIO is a Rust library that implements event-driven Reactor pattern in single-threaded and multiple-threaded environment. Users must implement their own trait Reactors and add reactors to a ReactRuntime. Each ReactRuntime instance runs in a dedicated thread. It polls all events for all managed Reactors. There’are 2 kinds of events:

  • socket events. We only register socket READ events initially. MsgReader & MsgSender are provided for Reactor to send/receive messages.
  • commands. Through mpsc channel, reactors could send user defined commands to each other.

When processing events, Reactor doesn’t need any mutex to protect resources.

§Examples

§Non-threaded polling model.

See example in reactor.rs.

    use reactio::{ReactRuntime, Deferred, DefaultTcpListenerHandler};
    use reactio::example;
    pub fn test_reactors_cmd() {
        let addr = "127.0.0.1:12355";
        let mut runtime = ReactRuntime::new();
        let cmd_sender = runtime.get_cmd_sender();
        cmd_sender
            .send_listen(
                addr,
                DefaultTcpListenerHandler::<example::MyReactor>::new(example::ServerParam {
                    name: "server".to_owned(),
                    latency_batch: 1000,
                }),
                Deferred::Immediate,
                |_| {},
            )
            .unwrap();
        cmd_sender
            .send_connect(
                addr,
                example::MyReactor::new_client("client".to_owned(), 2, 1000),
                Deferred::Immediate,
                |_| {},
            )
            .unwrap();
        // In single threaded environment, process_events until there're no reactors, no events, no deferred events.
        while runtime.process_events() {}   
        assert_eq!(runtime.count_reactors(), 0);
    }

§Multi-threaded model.

use std::{sync::{Arc, atomic}};
use reactio::logmsg;
use reactio::{CommandCompletion, Deferred, DefaultTcpListenerHandler, threaded_reactors::ThreadedReactorMgr};
use reactio::threaded_reactors::example::{self, ThreadedServerParam, create_tcp_listener};

pub fn test_threaded_reactors() {
    let addr = "127.0.0.1:12355";
    let stopcounter = Arc::new(atomic::AtomicI32::new(0)); // each Reactor increases it when exiting.
    let mgr = ThreadedReactorMgr::<String>::new(2); // 2 threads
    let (threadid0, threadid1) = (0, 1);
     
    // cloned Arc are passed to threads.
    let (amgr, astopcounter) = (Arc::clone(&mgr), Arc::clone(&stopcounter));
     
    mgr.get_cmd_sender(threadid0)
        .unwrap()
        .send_listen(
            addr,
            create_tcp_listener(ThreadedServerParam {
                runtimeid: threadid0,
                reactormgr: Arc::clone(&mgr),
                stopcounter: Arc::clone(&stopcounter),
                name: "server".to_owned(),
                latency_batch: 1000,
            }),
            Deferred::Immediate,
            //  when listen socket is ready, send another command to connect from another thread.
            move |res| {
                if let CommandCompletion::Error(_) = res {
                    logmsg!("[ERROR] Failed to listen exit!");
                    return;
                }
                amgr.get_cmd_sender(threadid1)
                    .unwrap()
                    .send_connect(
                        addr,
                        example::MyThreadedReactor::new_client(
                            "myclient".to_owned(),
                            threadid1,
                            Arc::clone(&amgr),
                            5,
                            1000,
                            Arc::clone(&astopcounter),
                        ),
                        Deferred::Immediate,
                        |res| {
                            if let CommandCompletion::Error(_) = res {
                                logmsg!("[ERROR] Failed connect!");
                            }
                        },
                    )
                    .unwrap();
            },
        )
        .unwrap();
     
    // wait for 2 reactors exit
    let start = std::time::SystemTime::now();
    while stopcounter.load(atomic::Ordering::Relaxed) != 2 {
        std::thread::sleep(std::time::Duration::from_millis(10));
        std::thread::yield_now();
        if start.elapsed().unwrap() >= std::time::Duration::from_millis(2000) {
            logmsg!("ERROR: timeout waiting for reactors to complete");
            break;
        }
    }
}

Modules§

Macros§

Structs§

  • CmdSender is owned by a ReactRuntime. Users send commands to a reactor with specific ReactorID.
  • DispatchContext contains all info that could be used to dispatch command/message to reactors.
  • MsgReader is a per-socket helper to read socket messages. It auto handles partial/multiple messages in recv buffer.
    On Readable event, call MsgReader::try_read() to read messages from a sock into a recv_buffer and calls dispatcher::on_inbound_message to dispatch message.
  • MsgSender is a per-socket object. It tries sending msg on a non-blocking socket. if sending fails due to WOULDBLOCK, the unsent bytes are saved and register a Write insterest in poller, so that the remaining data will be scheduled to send on next Writeable event.
  • Used to save the pending Send action.
  • ReactRuntime manages & owns Reactors which receive/send socket data or command. A ReactRuntime has a command queue, deferred command queue and a collection of reactors. Each reactor is assigned a ReactorID when adding to ReactRuntime. Users send command to a reactor with a specific ReactorID that belongs to the ReactRuntime. The command could be immediate or deferred for a time. E.g, on close of a reactor, it could send a command to the ReactRuntime to reconnect in future.
  • ReactorReableContext is a helper for a reactor to send/recv socket message, or send command.

Enums§

  • CommandCompletion is used as an argument of command completion callback.
  • Deferred is used to indicate a command to be executed immidately or in a deferred time.
  • MessageResult is returned by on_inbound_message to indicate result.
  • SendOrQueResult is the result of MsgSender::send_or_que.

Constants§

Traits§

  • NewServerReactor is used by TcpListenerHandler to create a new reactor when accept a new socket.
  • A Reactor is assigned a unique ReactorID when adding to a ReactRuntime, and is able to receive socket messsages (via reader) and commands. process_events of a ReactRuntime instance should be periodically called in a dedicated thread. Besides socket communication, Sending command is the only thread-safe way to communicate with a Reactor. A Reactor could send socket messages (via sender), or send commands (via cmd_sender) to another Reactor with specific ReactorID. A Reactor is destroyed when the socket is closed.
  • TcpListenerHandler handles incoming connections on a listening socket. Similar to Reactor, it’s destroyed when listening socket is closed.