ReactIO

Portable Reactor pattern in Rust.
Supported platforms: Linux, Windows Only 64-bit platforms are supported
ReactIO is a Rust library that implements event-driven Reactor pattern in single-threaded and multiple-threaded environment.
Each ReactRuntime
instance runs in a dedicated thread. It polls all events for managed Reactors. There'are 2 kinds of events:
- socket events. We only register socket READ events initially. MsgReader & MsgSender are provided for Reactor to send/receive messages.
- commands. Through mpsc channel, reactors could send user defined commands to each other.
When processing events, Reactor doesn't need any mutex to protect resources.
Examples
Single-threaded ReactRuntime
See example in reactor.rs.
pub fn test_reactors_cmd() {
let addr = "127.0.0.1:12355";
let recv_buffer_min_size = 1024;
let mut runtime = ReactRuntime::new();
let cmd_sender = runtime.get_cmd_sender();
cmd_sender
.send_listen(
addr,
recv_buffer_min_size,
DefaultTcpListenerHandler::<example::MyReactor>::new(recv_buffer_min_size, ServerParam {
name: "server".to_owned(),
latency_batch: 1000,
}),
Deferred::Immediate,
|_| {},
)
.unwrap();
cmd_sender
.send_connect(
addr,
recv_buffer_min_size,
example::MyReactor::new_client("client".to_owned(), 2, 1000),
Deferred::Immediate,
|_| {},
)
.unwrap();
while runtime.process_events() {}
assert_eq!(runtime.count_reactors(), 0);
}
Multi-threaded Reactors - Each thread runs an ReactRuntime
See example in threaded_reactors.rs.
pub fn test_threaded_reactors() {
let addr = "127.0.0.1:12355";
let recv_buffer_min_size = 1024;
let stopcounter = Arc::new(AtomicI32::new(0)); let mgr = ThreadedReactorMgr::<String>::new(2); let (threadid0, threadid1) = (0, 1);
let (amgr, astopcounter) = (Arc::clone(&mgr), Arc::clone(&stopcounter));
mgr.get_cmd_sender(threadid0)
.unwrap()
.send_listen(
addr,
create_tcp_listener(
recv_buffer_min_size,
ThreadedServerParam {
runtimeid: threadid0,
reactormgr: Arc::clone(&mgr),
stopcounter: Arc::clone(&stopcounter),
name: "server".to_owned(),
latency_batch: 1000,
}),
Deferred::Immediate,
move |res| {
if let CommandCompletion::Error(_) = res {
logmsg!("[ERROR] Failed to listen exit!");
return;
}
amgr.get_cmd_sender(threadid1)
.unwrap()
.send_connect(
addr,
recv_buffer_min_size,
example::MyThreadedReactor::new_client(
"myclient".to_owned(),
threadid1,
Arc::clone(&amgr),
5,
1000,
Arc::clone(&astopcounter),
),
Deferred::Immediate,
|res| {
if let CommandCompletion::Error(_) = res {
logmsg!("[ERROR] Failed connect!");
}
},
)
.unwrap();
},
)
.unwrap();
let start = std::time::SystemTime::now();
while stopcounter.load(atomic::Ordering::Relaxed) != 2 {
std::thread::sleep(std::time::Duration::from_millis(10));
std::thread::yield_now();
if start.elapsed().unwrap() >= std::time::Duration::from_millis(2000) {
logmsg!("ERROR: timeout waiting for reactors to complete");
break;
}
}
}
License
Licensed under either of
at your option.
Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
dual licensed as above, without any additional terms or conditions.