ReactIO

Low-latency Event-driven Non-blocking Reactor pattern in Rust.
Supported platforms: Linux, Windows and x86_64, arm64. (Other platforms are not tested.)
Only 64-bit platforms are supported
ReactIO impements a low-latency event-driven Reactor pattern in non-threaded and multiple-threaded environment.
Users implement a Reactor
(as least implement on_inbound_message
) and add it to a ReactRuntime
.
Each ReactRuntime
instance runs in a dedicated thread. It polls all events for managed Reactors. There'are 2 kinds of events:
- Socket events. We only register socket READ events initially. MsgReader & MsgSender are provided for Reactor to send/receive messages.
- Commands. Through mpsc channel, reactors could send user defined commands to each other.
There's another set of structs - SimpleIoReactor, SimpleIoListener, CommandReactor - when people only want to supply event functions. See below examples.
Key technologies:
- IO events (epoll, iocp): handles IO only when there are event.
- MsgSender helps socket read (messages are dispatched when full messages are received); MsgSender handles socket write (it queues unsent messages and auto resends).
- No/minimum mutexes or locks. Just send commands to a mpsc channel owned by a
ReactRuntime
.
- Deferred commands are executed in a defered time.
- No heap allocated objects on receiving events (it diffs from async frameworks which create Futures on heap).
When processing events, Reactor doesn't need any mutex to protect resources.
Examples
None-threaded ReactRuntime
Example 1: Define a struct MyReactor to implement Reactor.
More tests are in tests and examples.
pub fn test_ping_pong_reactor() {
let addr = "127.0.0.1:12355";
let recv_buffer_min_size = 1024;
let mut runtime = ReactRuntime::new();
let cmd_sender = runtime.get_cmd_sender();
cmd_sender
.send_listen(
addr,
DefaultTcpListenerHandler::<PingpongReactor>::new(
recv_buffer_min_size,
ServerParam {
name: "server".to_owned(), latency_batch: 1000, },
),
Deferred::Immediate,
|_| {}, )
.unwrap();
cmd_sender
.send_connect(
addr,
recv_buffer_min_size,
PingpongReactor::new_client("client".to_owned(), 2, 1000),
Deferred::Immediate,
|_| {}, )
.unwrap();
let timer = utils::Timer::new_millis(1000);
while runtime.process_events() {
if timer.expired() {
assert!(false, "ERROR: timeout waiting for tests to complete!");
break;
}
}
assert_eq!(runtime.count_reactors(), 0);
assert_eq!(runtime.count_deferred_queue(), 0);
}
Example 2: User supplies callback functions to Reactor instance to handle socket messages.
pub fn test_io_reactor() {
let addr = "127.0.0.1:12355";
let recv_buffer_min_size = 1024;
let mut runtime = reactio::SimpleIoRuntime::new();
let max_echos = 1;
let mut count_echos = 0;
let on_sock_msg = move |buf: &mut [u8], ctx: &mut reactio::SimpleIoReactorContext<'_>| {
if count_echos >= max_echos {
return Err(format!("Reached max echo: {max_echos}")); }
ctx.send_msg(ctx.sock, buf)?; count_echos += 1;
Ok(buf.len()) };
let on_server_connected = |ctx: &mut reactio::SimpleIoReactorContext<'_>, listenerid| {
ctx.cmd_sender
.send_close(listenerid, reactio::Deferred::Immediate, |_| {})?; Ok(()) };
let on_new_connection = move |_childid| {
Some(reactio::SimpleIoReactor::new_boxed(
Some(Box::new(on_server_connected)), None, on_sock_msg, ))
};
let on_client_connected = |ctx: &mut reactio::SimpleIoReactorContext<'_>, _| {
let mut auto_sender = ctx.acquire_send(); auto_sender.write_fmt(format_args!("test ")).unwrap();
auto_sender.write_fmt(format_args!("msgsend")).unwrap();
assert_eq!(auto_sender.count_written(), 12);
Ok(()) };
runtime
.get_cmd_sender()
.send_listen(
addr,
reactio::SimpleIoListener::new(recv_buffer_min_size, on_new_connection),
reactio::Deferred::Immediate,
|_| {}, )
.unwrap();
let timer = reactio::utils::Timer::new_millis(1000);
while runtime.count_reactors() < 1 {
if timer.expired() {
assert!(false, "ERROR: timeout waiting for listener start!");
break;
}
runtime.process_events();
}
runtime
.get_cmd_sender()
.send_connect(
addr,
recv_buffer_min_size,
reactio::SimpleIoReactor::new(
Some(Box::new(on_client_connected)), None, on_sock_msg, ),
reactio::Deferred::Immediate,
|_| {}, )
.unwrap();
let timer = reactio::utils::Timer::new_millis(1000);
while runtime.process_events() {
if timer.expired() {
assert!(false, "ERROR: timeout waiting for tests to complete!");
break;
}
}
assert_eq!(runtime.count_reactors(), 0);
assert_eq!(runtime.count_deferred_queue(), 0);
}
Multi-threaded Reactor Example - Each thread runs an ReactRuntime
See example in threaded_pingpong.rs
.
pub fn test_threaded_pingpong() {
let addr = "127.0.0.1:12355";
let recv_buffer_min_size = 1024;
let stopcounter = Arc::new(AtomicI32::new(0)); let mgr = ThreadedReactorMgr::<String>::new(2); let (threadid0, threadid1) = (0, 1);
let (amgr, astopcounter) = (Arc::clone(&mgr), Arc::clone(&stopcounter));
mgr.get_cmd_sender(threadid0)
.unwrap()
.send_listen(
addr,
create_tcp_listener(
recv_buffer_min_size,
ThreadedServerParam {
runtimeid: threadid0,
reactormgr: Arc::clone(&mgr),
stopcounter: Arc::clone(&stopcounter),
name: "server".to_owned(),
latency_batch: 1000,
},
),
Deferred::Immediate,
move |res| {
if let CommandCompletion::Error(_) = res {
logmsg!("[ERROR] Failed to listen exit!");
return;
}
amgr.get_cmd_sender(threadid1)
.unwrap()
.send_connect(
addr,
recv_buffer_min_size,
ThreadedPingpongReactor::new_client(
"myclient".to_owned(),
threadid1,
Arc::clone(&amgr),
5,
1000,
Arc::clone(&astopcounter),
),
Deferred::Immediate,
|res| {
if let CommandCompletion::Error(_) = res {
logmsg!("[ERROR] Failed connect!");
}
},
)
.unwrap();
},
)
.unwrap();
let timer = utils::Timer::new_millis(1000);
while stopcounter.load(atomic::Ordering::Relaxed) != 2 {
timer.sleep_or_expire(10);
std::thread::yield_now();
if timer.expired() {
assert!(false, "ERROR: timeout waiting for reactors to complete");
break;
}
}
}
License
Licensed under either of
at your option.
Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
dual licensed as above, without any additional terms or conditions.