1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
//! # Portable Reactor pattern in Rust.
//!
//! Supported platforms: Linux, Windows
//!
//! ReactIO is a Rust library that implements event-driven Reactor pattern in single-threaded and multiple-threaded environment.
//! Users implement a `Reactor` (as least implement `on_inbound_message`) and add it to a `ReactRuntime`.
//! Each `ReactRuntime` instance runs in a dedicated thread. It polls all events for all managed Reactors. There'are 2 kinds of events:
//! - Socket events. We only register socket READ events initially. MsgReader & MsgSender are provided for Reactor to send/receive messages.
//! MsgReader reads from buffer and dispatches messages. MsgSender sends or register SEND event for resend when seeing WOULDBLOCK.
//! - Commands. Through mpsc channel, reactors could send user defined commands to each other.
//!
//!
//! Key technologies:
//! - IO events (epoll, iocp): handles IO only when there are event.
//! - MsgSender helps socket read (messages are dispatched when full messages are received); MsgSender handles socket write (it queues unsent messages and auto resends).
//! - No mutex or lock. Just send commands to a mpsc channel owned by `ReactRuntime`.
//! - Deferred commands are executed in a defered time.
//!
//!
//! ## Example
//!
//! ### Non-threaded polling model.
//!
//! See example in reactor.rs.
//! ```rust,no_run
//! use reactio;
//! use reactio::logerr;
//! use std::io::Write;
//!
//! /// SimpleIoReactor implements `Reactor` and calls user handlers on events.
//! pub fn test_io_reactor() {
//! type AppData = (); // no application data for reactor.
//! let app_data = ();
//! let addr = "127.0.0.1:12355";
//! let recv_buffer_min_size = 1024;
//! let mut runtime = reactio::SimpleIoRuntime::new();
//!
//! let on_server_sock_msg =
//! |buf: &mut [u8], ctx: &mut reactio::SimpleIoReactorContext<'_>, _: &mut AppData| {
//! ctx.send_or_que(buf)?; // echo back message.
//! Ok(buf.len()) // return number of bytes having been consumed.
//! };
//!
//! let on_server_connected =
//! |ctx: &mut reactio::SimpleIoReactorContext<'_>, listenerid, _: &mut AppData| {
//! ctx.cmd_sender
//! .send_close(listenerid, reactio::Deferred::Immediate, |_| {})?; // close parent listerner.
//! Ok(()) // accept current connection.
//! };
//!
//! let on_new_connection = move |_childid| {
//! // create a new Reactor for the new connection.
//! Some(reactio::SimpleIoReactor::<AppData>::new_boxed(
//! app_data,
//! Some(Box::new(on_server_connected)), // on_connected
//! None, // on_closed
//! on_server_sock_msg, // on_sock_msg
//! ))
//! };
//!
//! //-- server
//! runtime
//! .get_cmd_sender()
//! .send_listen(
//! addr,
//! reactio::SimpleIoListener::new(recv_buffer_min_size, on_new_connection),
//! reactio::Deferred::Immediate,
//! |_| {}, // OnCommandCompletion
//! )
//! .unwrap();
//! // wait for server ready.
//! let timer = reactio::utils::Timer::new_millis(1000);
//! while runtime.count_reactors() < 1 {
//! if timer.expired() {
//! logerr!("ERROR: timeout waiting for listener start!");
//! break;
//! }
//! runtime.process_events();
//! }
//! //-- client
//! let on_client_connected =
//! move |ctx: &mut reactio::SimpleIoReactorContext<'_>, _, _: &mut AppData| {
//! // client sends initial msg.
//! let mut auto_sender = ctx.acquire_send(); // send on drop
//! auto_sender.write_fmt(format_args!("test ")).unwrap();
//! auto_sender.write_fmt(format_args!("msgsend")).unwrap();
//! assert_eq!(auto_sender.count_written(), 12);
//! assert_eq!(auto_sender.get_written(), b"test msgsend");
//! // auto_sender.send(None).unwrap(); // this line can be omitted to let it auto send on drop.
//! // ctx.send_or_que("Hello".as_bytes())?; // rather than using auto_sender, we call ctx.send_or_que
//! Ok(()) // accept connection
//! };
//! let on_client_sock_msg =
//! |_buf: &mut [u8], _ctx: &mut reactio::SimpleIoReactorContext<'_>, _: &mut AppData| {
//! Err("Client disconnect on recv response.".to_owned())
//! };
//!
//! runtime
//! .get_cmd_sender()
//! .send_connect(
//! addr,
//! recv_buffer_min_size,
//! reactio::SimpleIoReactor::new(
//! app_data,
//! Some(Box::new(on_client_connected)), // on_connected
//! None, // on_closed
//! on_client_sock_msg, // on_sock_msg
//! ),
//! reactio::Deferred::Immediate,
//! |_| {}, // OnCommandCompletion
//! )
//! .unwrap();
//! // In non-threaded environment, process_events until there're no reactors, no events, no deferred events.
//! let timer = reactio::utils::Timer::new_millis(1000);
//! while runtime.process_events() {
//! if timer.expired() {
//! logerr!("ERROR: timeout waiting for tests to complete!");
//! break;
//! }
//! }
//! assert_eq!(runtime.count_reactors(), 0);
//! assert_eq!(runtime.count_deferred_queue(), 0);
//! }
//! ```
//!
//! More examples are echo_client/echo_server (for TCP latency test), PingpongReactor, ThreadedPingpongReactor.
//!
pub use *;