Expand description
§Portable Reactor pattern in Rust.
Supported platforms: Linux, Windows
ReactIO is a Rust library that implements event-driven Reactor pattern in single-threaded and multiple-threaded environment.
Users must implement their own trait Reactors and add reactors to a ReactRuntime.
Each ReactRuntime
instance runs in a dedicated thread. It polls all events for all managed Reactors. There’are 2 kinds of events:
- socket events. We only register socket READ events initially. MsgReader & MsgSender are provided for Reactor to send/receive messages.
- commands. Through mpsc channel, reactors could send user defined commands to each other.
When processing events, Reactor doesn’t need any mutex to protect resources.
§Examples
§Non-threaded polling model.
See example in reactor.rs.
use reactio::{ReactRuntime, Deferred, DefaultTcpListenerHandler};
use reactio::example;
pub fn test_reactors_cmd() {
let addr = "127.0.0.1:12355";
let mut runtime = ReactRuntime::new();
let cmd_sender = runtime.get_cmd_sender();
cmd_sender
.send_listen(
addr,
DefaultTcpListenerHandler::<example::MyReactor>::new(example::ServerParam {
name: "server".to_owned(),
latency_batch: 1000,
}),
Deferred::Immediate,
|_| {},
)
.unwrap();
cmd_sender
.send_connect(
addr,
example::MyReactor::new_client("client".to_owned(), 2, 1000),
Deferred::Immediate,
|_| {},
)
.unwrap();
// In single threaded environment, process_events until there're no reactors, no events, no deferred events.
while runtime.process_events() {}
assert_eq!(runtime.count_reactors(), 0);
}
§Multi-threaded model.
use std::{sync::{Arc, atomic}};
use reactio::logmsg;
use reactio::{CommandCompletion, Deferred, DefaultTcpListenerHandler, threaded_reactors::ThreadedReactorMgr};
use reactio::threaded_reactors::example::{self, ThreadedServerParam, create_tcp_listener};
pub fn test_threaded_reactors() {
let addr = "127.0.0.1:12355";
let stopcounter = Arc::new(atomic::AtomicI32::new(0)); // each Reactor increases it when exiting.
let mgr = ThreadedReactorMgr::<String>::new(2); // 2 threads
let (threadid0, threadid1) = (0, 1);
// cloned Arc are passed to threads.
let (amgr, astopcounter) = (Arc::clone(&mgr), Arc::clone(&stopcounter));
mgr.get_cmd_sender(threadid0)
.unwrap()
.send_listen(
addr,
create_tcp_listener(ThreadedServerParam {
runtimeid: threadid0,
reactormgr: Arc::clone(&mgr),
stopcounter: Arc::clone(&stopcounter),
name: "server".to_owned(),
latency_batch: 1000,
}),
Deferred::Immediate,
// when listen socket is ready, send another command to connect from another thread.
move |res| {
if let CommandCompletion::Error(_) = res {
logmsg!("[ERROR] Failed to listen exit!");
return;
}
amgr.get_cmd_sender(threadid1)
.unwrap()
.send_connect(
addr,
example::MyThreadedReactor::new_client(
"myclient".to_owned(),
threadid1,
Arc::clone(&amgr),
5,
1000,
Arc::clone(&astopcounter),
),
Deferred::Immediate,
|res| {
if let CommandCompletion::Error(_) = res {
logmsg!("[ERROR] Failed connect!");
}
},
)
.unwrap();
},
)
.unwrap();
// wait for 2 reactors exit
let start = std::time::SystemTime::now();
while stopcounter.load(atomic::Ordering::Relaxed) != 2 {
std::thread::sleep(std::time::Duration::from_millis(10));
std::thread::yield_now();
if start.elapsed().unwrap() >= std::time::Duration::from_millis(2000) {
logmsg!("ERROR: timeout waiting for reactors to complete");
break;
}
}
}
Modules§
Macros§
- log only in debug mode.
Structs§
- CmdSender is owned by a ReactRuntime. Users send commands to a reactor with specific ReactorID.
DispatchContext
contains all info that could be used to dispatch command/message to reactors.MsgReader
is a per-socket helper to read socket messages. It auto handles partial/multiple messages in recv buffer.
On Readable event, call MsgReader::try_read() to read messages from a sock into a recv_buffer and calls dispatcher::on_inbound_message to dispatch message.- MsgSender is a per-socket object. It tries sending msg on a non-blocking socket. if sending fails due to WOULDBLOCK, the unsent bytes are saved and register a Write insterest in poller, so that the remaining data will be scheduled to send on next Writeable event.
- Used to save the pending Send action.
- ReactRuntime manages & owns Reactors which receive/send socket data or command. A ReactRuntime has a command queue, deferred command queue and a collection of reactors. Each reactor is assigned a ReactorID when adding to ReactRuntime. Users send command to a reactor with a specific ReactorID that belongs to the ReactRuntime. The command could be immediate or deferred for a time. E.g, on close of a reactor, it could send a command to the ReactRuntime to reconnect in future.
ReactorReableContext
is a helper for a reactor to send/recv socket message, or send command.
Enums§
CommandCompletion
is used as an argument of command completion callback.Deferred
is used to indicate a command to be executed immidately or in a deferred time.MessageResult
is returned byon_inbound_message
to indicate result.SendOrQueResult
is the result ofMsgSender::send_or_que
.
Constants§
Traits§
- NewServerReactor is used by TcpListenerHandler to create a new reactor when accept a new socket.
- A
Reactor
is assigned a unique ReactorID when adding to a ReactRuntime, and is able to receive socket messsages (via reader) and commands.process_events
of a ReactRuntime instance should be periodically called in a dedicated thread. Besides socket communication, Sending command is the only thread-safe way to communicate with a Reactor. A Reactor could send socket messages (via sender), or send commands (via cmd_sender) to another Reactor with specific ReactorID. A Reactor is destroyed when the socket is closed. TcpListenerHandler
handles incoming connections on a listening socket. Similar toReactor
, it’s destroyed when listening socket is closed.