Expand description
§Portable Reactor pattern in Rust.
Supported platforms: Linux, Windows
ReactIO is a Rust library that implements event-driven Reactor pattern in single-threaded and multiple-threaded environment.
Users implement a Reactor
(as least implement on_inbound_message
) and add it to a ReactRuntime
.
Each ReactRuntime
instance runs in a dedicated thread. It polls all events for all managed Reactors. There’are 2 kinds of events:
- socket events. We only register socket READ events initially. MsgReader & MsgSender are provided for Reactor to send/receive messages.
- commands. Through mpsc channel, reactors could send user defined commands to each other.
Key technologies:
- IO events (epoll, iocp): handles IO only when there are event.
- MsgSender helps socket read (messages are dispatched when full messages are received); MsgSender handles socket write (it queues unsent messages and auto resends).
- No mutex or lock. Just send commands to a mpsc channel owned by
ReactRuntime
. - Deferred commands are executed in a defered time.
§Example
§Non-threaded polling model.
See example in reactor.rs.
use reactio;
/// SimpleIoReactor implements `Reactor` and calls user-given handlers on events.
pub fn test_io_reactor() {
let addr = "127.0.0.1:12355";
let recv_buffer_min_size = 1024;
let mut runtime = reactio::SimpleIoRuntime::new();
let max_echos = 1;
let mut count_echos = 0;
let on_sock_msg = move |buf: &mut [u8], ctx: &mut reactio::SimpleIoReactorContext<'_>| {
if count_echos >= max_echos {
return Err(format!("Reached max echo: {max_echos}")); // close socket
}
ctx.send_msg(buf)?; // echo back message.
count_echos += 1;
Ok(buf.len()) // return number of bytes having been consumed.
};
let on_server_connected = |ctx: &mut reactio::SimpleIoReactorContext<'_>, listenerid| {
ctx.cmd_sender
.send_close(listenerid, reactio::Deferred::Immediate, |_| {})?; // close parent listerner.
Ok(()) // accept current connection.
};
let on_new_connection = move |_childid| {
// create a new Reactor for the new connection.
Some(reactio::SimpleIoReactor::new(
Some(Box::new(on_server_connected)), // on_connected
None, // on_closed
on_sock_msg, // on_sock_msg
))
};
let on_client_connected = |ctx: &mut reactio::SimpleIoReactorContext<'_>, _| {
ctx.send_msg("Hello".as_bytes())?; // client sends initial msg.
Ok(()) // accept connection
};
//-- server
runtime
.get_cmd_sender()
.send_listen(
addr,
reactio::SimpleIoListener::new(recv_buffer_min_size, on_new_connection),
reactio::Deferred::Immediate,
|_| {},
)
.unwrap();
// wait for server ready.
let timer = reactio::utils::Timer::new_millis(1000);
while runtime.count_reactors() < 1 {
if timer.expired() {
assert!(false, "ERROR: timeout waiting for listener start!");
break;
}
runtime.process_events();
}
//-- client
runtime
.get_cmd_sender()
.send_connect(
addr,
recv_buffer_min_size,
reactio::SimpleIoReactor::new(
Some(Box::new(on_client_connected)), // on_connected
None, // on_closed
on_sock_msg, // on_sock_msg
),
reactio::Deferred::Immediate,
|_| {},
)
.unwrap();
// In non-threaded environment, process_events until there're no reactors, no events, no deferred events.
let timer = reactio::utils::Timer::new_millis(1000 * 20);
while runtime.process_events() {
if timer.expired() {
assert!(false, "ERROR: timeout waiting for tests to complete!");
break;
}
}
assert_eq!(runtime.count_reactors(), 0);
assert_eq!(runtime.count_deferred_queue(), 0);
}
See examples folder for more examples about echo_client/echo_server (test TCP latencies), PingpongReactor, PingpongReactor.
Modules§
Macros§
- log only in debug mode.
Structs§
- CmdSender is owned by a ReactRuntime. Users send commands to a reactor with specific ReactorID.
CommandReactor
doesn’t have associated sockets, so only process commands by calling cmd_handler.DispatchContext
contains all info that could be used to dispatch command/message to reactors.MsgReader
is a per-socket helper to read socket messages. It auto handles partial/multiple messages in recv buffer.
On Readable event, call MsgReader::try_read_fast_dispatch/try_read_fast_read to read messages from a sock into a recv_buffer and calls dispatcher::on_inbound_message to dispatch message.- MsgSender is a per-socket object. It tries sending msg on a non-blocking socket. if sending fails due to WOULDBLOCK, the unsent bytes are saved and register a Write insterest in poller, so that the remaining data will be scheduled to send on next Writeable event.
- Used to save the pending Send action.
- ReactRuntime manages & owns Reactors which receive/send socket data or command. A ReactRuntime has a command queue, deferred command queue and a collection of reactors. Each reactor is assigned a ReactorID when adding to ReactRuntime. Users send command to a reactor with a specific ReactorID that belongs to the ReactRuntime. The command could be immediate or deferred for a time. E.g, on close of a reactor, it could send a command to the ReactRuntime to reconnect in future.
ReactorReableContext
is a helper for a reactor to send/recv socket message, or send command.SimpleIoListener
implements TcpListenerHandler. When receiving new connection, it creates SimpleIoReactor using user specified reactor_creator.SimpleIoReactor
doesn’t haveUserCommand
. User supplies callback functions to handle inbound socket messages and on_connected/on_close events. On each readable socket event, the MsgReader reads all data and call on_sock_msg_handler to dispatch message.
Enums§
CommandCompletion
is used as an argument of command completion callback.Deferred
is used to indicate a command to be executed immidately or in a deferred time.MessageResult
is returned byon_inbound_message
to indicate result.SendOrQueResult
is the result ofMsgSender::send_or_que
orDispatchContext::send_msg
.
Constants§
Traits§
- NewServerReactor is used by TcpListenerHandler to create a new reactor when accept a new socket.
- A
Reactor
is assigned a unique ReactorID when adding to a ReactRuntime, and is able to receive socket messsages (via reader) and commands.process_events
of a ReactRuntime instance should be periodically called in a dedicated thread. Besides socket communication, Sending command is the only thread-safe way to communicate with a Reactor. A Reactor could send socket messages (via sender), or send commands (via cmd_sender) to another Reactor with specific ReactorID. A Reactor is destroyed when the socket is closed. TcpListenerHandler
handles incoming connections on a listening socket. Similar toReactor
, it’s destroyed when listening socket is closed.