reactio/lib.rs
1//! # Portable Reactor pattern in Rust.
2//!
3//! Supported platforms: Linux, Windows
4//!
5//! ReactIO is a Rust library that implements event-driven Reactor pattern in single-threaded and multiple-threaded environment.
6//! Users implement a `Reactor` (as least implement `on_inbound_message`) and add it to a `ReactRuntime`.
7//! Each `ReactRuntime` instance runs in a dedicated thread. It polls all events for all managed Reactors. There'are 2 kinds of events:
8//! - Socket events. We only register socket READ events initially. MsgReader & MsgSender are provided for Reactor to send/receive messages.
9//! MsgReader reads from buffer and dispatches messages. MsgSender sends or register SEND event for resend when seeing WOULDBLOCK.
10//! - Commands. Through mpsc channel, reactors could send user defined commands to each other.
11//!
12//!
13//! Key technologies:
14//! - IO events (epoll, iocp): handles IO only when there are event.
15//! - MsgSender helps socket read (messages are dispatched when full messages are received); MsgSender handles socket write (it queues unsent messages and auto resends).
16//! - No mutex or lock. Just send commands to a mpsc channel owned by `ReactRuntime`.
17//! - Deferred commands are executed in a defered time.
18//!
19//!
20//! ## Example
21//!
22//! ### Non-threaded polling model.
23//!
24//! See example in reactor.rs.
25//! ```rust,no_run
26//! use reactio;
27//! use reactio::logerr;
28//! use std::io::Write;
29//!
30//! /// SimpleIoReactor implements `Reactor` and calls user handlers on events.
31//! pub fn test_io_reactor() {
32//! type AppData = (); // no application data for reactor.
33//! let app_data = ();
34//! let addr = "127.0.0.1:12355";
35//! let recv_buffer_min_size = 1024;
36//! let mut runtime = reactio::SimpleIoRuntime::new();
37//!
38//! let on_server_sock_msg =
39//! |buf: &mut [u8], ctx: &mut reactio::SimpleIoReactorContext<'_>, _: &mut AppData| {
40//! ctx.send_or_que(buf)?; // echo back message.
41//! Ok(buf.len()) // return number of bytes having been consumed.
42//! };
43//!
44//! let on_server_connected =
45//! |ctx: &mut reactio::SimpleIoReactorContext<'_>, listenerid, _: &mut AppData| {
46//! ctx.cmd_sender
47//! .send_close(listenerid, reactio::Deferred::Immediate, |_| {})?; // close parent listerner.
48//! Ok(()) // accept current connection.
49//! };
50//!
51//! let on_new_connection = move |_childid| {
52//! // create a new Reactor for the new connection.
53//! Some(reactio::SimpleIoReactor::<AppData>::new_boxed(
54//! app_data,
55//! Some(Box::new(on_server_connected)), // on_connected
56//! None, // on_closed
57//! on_server_sock_msg, // on_sock_msg
58//! ))
59//! };
60//!
61//! //-- server
62//! runtime
63//! .get_cmd_sender()
64//! .send_listen(
65//! addr,
66//! reactio::SimpleIoListener::new(recv_buffer_min_size, on_new_connection),
67//! reactio::Deferred::Immediate,
68//! |_| {}, // OnCommandCompletion
69//! )
70//! .unwrap();
71//! // wait for server ready.
72//! let timer = reactio::utils::Timer::new_millis(1000);
73//! while runtime.count_reactors() < 1 {
74//! if timer.expired() {
75//! logerr!("ERROR: timeout waiting for listener start!");
76//! break;
77//! }
78//! runtime.process_events();
79//! }
80//! //-- client
81//! let on_client_connected =
82//! move |ctx: &mut reactio::SimpleIoReactorContext<'_>, _, _: &mut AppData| {
83//! // client sends initial msg.
84//! let mut auto_sender = ctx.acquire_send(); // send on drop
85//! auto_sender.write_fmt(format_args!("test ")).unwrap();
86//! auto_sender.write_fmt(format_args!("msgsend")).unwrap();
87//! assert_eq!(auto_sender.count_written(), 12);
88//! assert_eq!(auto_sender.get_written(), b"test msgsend");
89//! // auto_sender.send(None).unwrap(); // this line can be omitted to let it auto send on drop.
90//! // ctx.send_or_que("Hello".as_bytes())?; // rather than using auto_sender, we call ctx.send_or_que
91//! Ok(()) // accept connection
92//! };
93//! let on_client_sock_msg =
94//! |_buf: &mut [u8], _ctx: &mut reactio::SimpleIoReactorContext<'_>, _: &mut AppData| {
95//! Err("Client disconnect on recv response.".to_owned())
96//! };
97//!
98//! runtime
99//! .get_cmd_sender()
100//! .send_connect(
101//! addr,
102//! recv_buffer_min_size,
103//! reactio::SimpleIoReactor::new(
104//! app_data,
105//! Some(Box::new(on_client_connected)), // on_connected
106//! None, // on_closed
107//! on_client_sock_msg, // on_sock_msg
108//! ),
109//! reactio::Deferred::Immediate,
110//! |_| {}, // OnCommandCompletion
111//! )
112//! .unwrap();
113//! // In non-threaded environment, process_events until there're no reactors, no events, no deferred events.
114//! let timer = reactio::utils::Timer::new_millis(1000);
115//! while runtime.process_events() {
116//! if timer.expired() {
117//! logerr!("ERROR: timeout waiting for tests to complete!");
118//! break;
119//! }
120//! }
121//! assert_eq!(runtime.count_reactors(), 0);
122//! assert_eq!(runtime.count_deferred_queue(), 0);
123//! }
124//! ```
125//!
126//! More examples are echo_client/echo_server (for TCP latency test), PingpongReactor, ThreadedPingpongReactor.
127//!
128
129#![allow(dead_code)]
130
131pub mod flat_storage;
132mod reactor;
133pub use reactor::*;
134pub mod threaded_reactors;
135pub mod utils;