Expand description
§Portable Reactor pattern in Rust.
Supported platforms: Linux, Windows
ReactIO is a Rust library that implements event-driven Reactor pattern in single-threaded and multiple-threaded environment.
Users implement a Reactor
(as least implement on_inbound_message
) and add it to a ReactRuntime
.
Each ReactRuntime
instance runs in a dedicated thread. It polls all events for all managed Reactors. There’are 2 kinds of events:
- Socket events. We only register socket READ events initially. MsgReader & MsgSender are provided for Reactor to send/receive messages. MsgReader reads from buffer and dispatches messages. MsgSender sends or register SEND event for resend when seeing WOULDBLOCK.
- Commands. Through mpsc channel, reactors could send user defined commands to each other.
Key technologies:
- IO events (epoll, iocp): handles IO only when there are event.
- MsgSender helps socket read (messages are dispatched when full messages are received); MsgSender handles socket write (it queues unsent messages and auto resends).
- No mutex or lock. Just send commands to a mpsc channel owned by
ReactRuntime
. - Deferred commands are executed in a defered time.
§Example
§Non-threaded polling model.
See example in reactor.rs.
use reactio;
use reactio::logerr;
use std::io::Write;
/// SimpleIoReactor implements `Reactor` and calls user handlers on events.
pub fn test_io_reactor() {
type AppData = (); // no application data for reactor.
let app_data = ();
let addr = "127.0.0.1:12355";
let recv_buffer_min_size = 1024;
let mut runtime = reactio::SimpleIoRuntime::new();
let on_server_sock_msg =
|buf: &mut [u8], ctx: &mut reactio::SimpleIoReactorContext<'_>, _: &mut AppData| {
ctx.send_or_que(buf)?; // echo back message.
Ok(buf.len()) // return number of bytes having been consumed.
};
let on_server_connected =
|ctx: &mut reactio::SimpleIoReactorContext<'_>, listenerid, _: &mut AppData| {
ctx.cmd_sender
.send_close(listenerid, reactio::Deferred::Immediate, |_| {})?; // close parent listerner.
Ok(()) // accept current connection.
};
let on_new_connection = move |_childid| {
// create a new Reactor for the new connection.
Some(reactio::SimpleIoReactor::<AppData>::new_boxed(
app_data,
Some(Box::new(on_server_connected)), // on_connected
None, // on_closed
on_server_sock_msg, // on_sock_msg
))
};
//-- server
runtime
.get_cmd_sender()
.send_listen(
addr,
reactio::SimpleIoListener::new(recv_buffer_min_size, on_new_connection),
reactio::Deferred::Immediate,
|_| {}, // OnCommandCompletion
)
.unwrap();
// wait for server ready.
let timer = reactio::utils::Timer::new_millis(1000);
while runtime.count_reactors() < 1 {
if timer.expired() {
logerr!("ERROR: timeout waiting for listener start!");
break;
}
runtime.process_events();
}
//-- client
let on_client_connected =
move |ctx: &mut reactio::SimpleIoReactorContext<'_>, _, _: &mut AppData| {
// client sends initial msg.
let mut auto_sender = ctx.acquire_send(); // send on drop
auto_sender.write_fmt(format_args!("test ")).unwrap();
auto_sender.write_fmt(format_args!("msgsend")).unwrap();
assert_eq!(auto_sender.count_written(), 12);
assert_eq!(auto_sender.get_written(), b"test msgsend");
// auto_sender.send(None).unwrap(); // this line can be omitted to let it auto send on drop.
// ctx.send_or_que("Hello".as_bytes())?; // rather than using auto_sender, we call ctx.send_or_que
Ok(()) // accept connection
};
let on_client_sock_msg =
|_buf: &mut [u8], _ctx: &mut reactio::SimpleIoReactorContext<'_>, _: &mut AppData| {
Err("Client disconnect on recv response.".to_owned())
};
runtime
.get_cmd_sender()
.send_connect(
addr,
recv_buffer_min_size,
reactio::SimpleIoReactor::new(
app_data,
Some(Box::new(on_client_connected)), // on_connected
None, // on_closed
on_client_sock_msg, // on_sock_msg
),
reactio::Deferred::Immediate,
|_| {}, // OnCommandCompletion
)
.unwrap();
// In non-threaded environment, process_events until there're no reactors, no events, no deferred events.
let timer = reactio::utils::Timer::new_millis(1000);
while runtime.process_events() {
if timer.expired() {
logerr!("ERROR: timeout waiting for tests to complete!");
break;
}
}
assert_eq!(runtime.count_reactors(), 0);
assert_eq!(runtime.count_deferred_queue(), 0);
}
More examples are echo_client/echo_server (for TCP latency test), PingpongReactor, ThreadedPingpongReactor.
Modules§
Macros§
- log only in debug mode.
Structs§
- CmdSender is owned by a ReactRuntime. Users send commands to a reactor with specific ReactorID.
DispatchContext
contains all info that could be used to dispatch command/message to reactors.MsgReader
is a per-socket helper to read socket messages. It auto handles partial/multiple messages in recv buffer.
On Readable event, call MsgReader::try_read_fast_dispatch/try_read_fast_read to read messages from a sock into a recv_buffer and calls dispatcher::on_inbound_message to dispatch message.- MsgSender is a per-socket object. It tries sending msg on a non-blocking socket. if sending fails due to WOULDBLOCK, the unsent bytes are saved and register a Write insterest in poller, so that the remaining data will be scheduled to send on next Writeable event.
- Used to save the pending Send action.
- ReactRuntime manages & owns Reactors which receive/send socket data or command. A ReactRuntime has a command queue, deferred command queue and a collection of reactors. Each reactor is assigned a ReactorID when adding to ReactRuntime. Users send command to a reactor with a specific ReactorID that belongs to the ReactRuntime. The command could be immediate or deferred for a time. E.g, on close of a reactor, it could send a command to the ReactRuntime to reconnect in future.
ReactorReableContext
is a helper for a reactor to send/recv socket message, or send command.SimpleIoListener
implements TcpListenerHandler. When receiving new connection, it creates SimpleIoReactor using user specified reactor_creator.SimpleIoReactor
doesn’t haveUserCommand
. User supplies callback functions to handle inbound socket messages and on_connected/on_close events. On each readable socket event, the MsgReader reads all data and call on_sock_msg_handler to dispatch message.- A SimpleIoService instance serves multiple sockets, which diffs from SimpleIoReactor/SimpleIoListener that serves a socket per instance. See
test_io_service
Enums§
Deferred
is used to indicate a command to be executed immidately or in a deferred time.MessageResult
is returned byon_inbound_message
to indicate result.SendOrQueResult
is the result ofMsgSender::send_or_que
orDispatchContext::send_or_que
.
Constants§
Traits§
- NewServerReactor is used by TcpListenerHandler to create a new reactor when accept a new socket.
- A
Reactor
is assigned a unique ReactorID when adding to a ReactRuntime, and is able to receive socket messsages (via reader) and commands.process_events
of a ReactRuntime instance should be periodically called in a dedicated thread. Besides socket communication, Sending command is the only thread-safe way to communicate with a Reactor. A Reactor could send socket messages (via sender), or send commands (via cmd_sender) to another Reactor with specific ReactorID. A Reactor is destroyed when the socket is closed. TcpListenerHandler
handles incoming connections on a listening socket. Similar toReactor
, it’s destroyed when listening socket is closed.
Type Aliases§
CommandCompletion
is used as an argument of command completion callback.