reactio/
lib.rs

1//! # Portable Reactor pattern in Rust.
2//!
3//! Supported platforms: Linux, Windows
4//!
5//! ReactIO is a Rust library that implements event-driven Reactor pattern in single-threaded and multiple-threaded environment.
6//! Users implement a `Reactor` (as least implement `on_inbound_message`) and add it to a `ReactRuntime`.
7//! Each `ReactRuntime` instance runs in a dedicated thread. It polls all events for all managed Reactors. There'are 2 kinds of events:
8//! - Socket events. We only register socket READ events initially. MsgReader & MsgSender are provided for Reactor to send/receive messages.
9//!   MsgReader reads from buffer and dispatches messages. MsgSender sends or register SEND event for resend when seeing WOULDBLOCK.
10//! - Commands. Through mpsc channel, reactors could send user defined commands to each other.
11//!
12//!
13//! Key technologies:
14//! - IO events (epoll, iocp): handles IO only when there are event.
15//! - MsgSender helps socket read (messages are dispatched when full messages are received); MsgSender handles socket write (it queues unsent messages and auto resends).
16//! - No mutex or lock. Just send commands to a mpsc channel owned by `ReactRuntime`.
17//! - Deferred commands are executed in a defered time.
18//!
19//!
20//! ## Example
21//!
22//! ### Non-threaded polling model.
23//!
24//! See example in reactor.rs.
25//! ```rust,no_run
26//! use reactio;
27//! use reactio::logerr;
28//! use std::io::Write;
29//!
30//! /// SimpleIoReactor implements `Reactor` and calls user handlers on events.
31//! pub fn test_io_reactor() {
32//!     type AppData = (); // no application data for reactor.
33//!     let app_data = ();
34//!     let addr = "127.0.0.1:12355";
35//!     let recv_buffer_min_size = 1024;
36//!     let mut runtime = reactio::SimpleIoRuntime::new();
37//!
38//!     let on_server_sock_msg =
39//!         |buf: &mut [u8], ctx: &mut reactio::SimpleIoReactorContext<'_>, _: &mut AppData| {
40//!             ctx.send_or_que(buf)?; // echo back message.
41//!             Ok(buf.len()) // return number of bytes having been consumed.
42//!         };
43//!
44//!     let on_server_connected =
45//!         |ctx: &mut reactio::SimpleIoReactorContext<'_>, listenerid, _: &mut AppData| {
46//!             ctx.cmd_sender
47//!                 .send_close(listenerid, reactio::Deferred::Immediate, |_| {})?; // close parent listerner.
48//!             Ok(()) // accept current connection.
49//!         };
50//!
51//!     let on_new_connection = move |_childid| {
52//!         // create a new Reactor for the new connection.
53//!         Some(reactio::SimpleIoReactor::<AppData>::new_boxed(
54//!             app_data,
55//!             Some(Box::new(on_server_connected)), // on_connected
56//!             None,                                // on_closed
57//!             on_server_sock_msg,                  // on_sock_msg
58//!         ))
59//!     };
60//!
61//!     //-- server
62//!     runtime
63//!         .get_cmd_sender()
64//!         .send_listen(
65//!             addr,
66//!             reactio::SimpleIoListener::new(recv_buffer_min_size, on_new_connection),
67//!             reactio::Deferred::Immediate,
68//!             |_| {}, // OnCommandCompletion
69//!         )
70//!         .unwrap();
71//!     // wait for server ready.
72//!     let timer = reactio::utils::Timer::new_millis(1000);
73//!     while runtime.count_reactors() < 1 {
74//!         if timer.expired() {
75//!             logerr!("ERROR: timeout waiting for listener start!");
76//!             break;
77//!         }
78//!         runtime.process_events();
79//!     }
80//!     //-- client
81//!     let on_client_connected =
82//!         move |ctx: &mut reactio::SimpleIoReactorContext<'_>, _, _: &mut AppData| {
83//!             // client sends initial msg.
84//!             let mut auto_sender = ctx.acquire_send(); // send on drop
85//!             auto_sender.write_fmt(format_args!("test ")).unwrap();
86//!             auto_sender.write_fmt(format_args!("msgsend")).unwrap();
87//!             assert_eq!(auto_sender.count_written(), 12);
88//!             assert_eq!(auto_sender.get_written(), b"test msgsend");
89//!             // auto_sender.send(None).unwrap(); // this line can be omitted to let it auto send on drop.
90//!             // ctx.send_or_que("Hello".as_bytes())?; // rather than using auto_sender, we call ctx.send_or_que
91//!             Ok(()) // accept connection
92//!         };
93//!     let on_client_sock_msg =
94//!         |_buf: &mut [u8], _ctx: &mut reactio::SimpleIoReactorContext<'_>, _: &mut AppData| {
95//!             Err("Client disconnect on recv response.".to_owned())
96//!         };
97//!
98//!     runtime
99//!         .get_cmd_sender()
100//!         .send_connect(
101//!             addr,
102//!             recv_buffer_min_size,
103//!             reactio::SimpleIoReactor::new(
104//!                 app_data,
105//!                 Some(Box::new(on_client_connected)), // on_connected
106//!                 None,                                // on_closed
107//!                 on_client_sock_msg,                  // on_sock_msg
108//!             ),
109//!             reactio::Deferred::Immediate,
110//!             |_| {}, // OnCommandCompletion
111//!         )
112//!         .unwrap();
113//!     // In non-threaded environment, process_events until there're no reactors, no events, no deferred events.
114//!     let timer = reactio::utils::Timer::new_millis(1000);
115//!     while runtime.process_events() {
116//!         if timer.expired() {
117//!             logerr!("ERROR: timeout waiting for tests to complete!");
118//!             break;
119//!         }
120//!     }
121//!     assert_eq!(runtime.count_reactors(), 0);
122//!     assert_eq!(runtime.count_deferred_queue(), 0);
123//! }
124//! ```
125//!
126//! More examples are echo_client/echo_server (for TCP latency test), PingpongReactor, ThreadedPingpongReactor.
127//!
128
129#![allow(dead_code)]
130
131pub mod flat_storage;
132mod reactor;
133pub use reactor::*;
134pub mod threaded_reactors;
135pub mod utils;