reactio/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
//! # Portable Reactor pattern in Rust.
//!
//! Supported platforms: Linux, Windows
//!
//! ReactIO is a Rust library that implements event-driven Reactor pattern in single-threaded and multiple-threaded environment.
//! Users implement a `Reactor` (as least implement `on_inbound_message`) and add it to a `ReactRuntime`.
//! Each `ReactRuntime` instance runs in a dedicated thread. It polls all events for all managed Reactors. There'are 2 kinds of events:
//! - Socket events. We only register socket READ events initially. MsgReader & MsgSender are provided for Reactor to send/receive messages.
//! MsgReader reads from buffer and dispatches messages. MsgSender sends or register SEND event for resend when seeing WOULDBLOCK.
//! - Commands. Through mpsc channel, reactors could send user defined commands to each other.
//!
//!
//! Key technologies:
//! - IO events (epoll, iocp): handles IO only when there are event.
//! - MsgSender helps socket read (messages are dispatched when full messages are received); MsgSender handles socket write (it queues unsent messages and auto resends).
//! - No mutex or lock. Just send commands to a mpsc channel owned by `ReactRuntime`.
//! - Deferred commands are executed in a defered time.
//!
//!
//! ## Example
//!
//! ### Non-threaded polling model.
//!
//! See example in reactor.rs.
//! ```rust,no_run
//! use reactio;
//! use std::io::Write;
//!
//! /// SimpleIoReactor implements `Reactor` and calls user-given handlers on events.
//! pub fn test_io_reactor() {
//! let addr = "127.0.0.1:12355";
//! let recv_buffer_min_size = 1024;
//! let mut runtime = reactio::SimpleIoRuntime::new();
//! let max_echos = 1;
//! let mut count_echos = 0;
//!
//! let on_sock_msg = move |buf: &mut [u8], ctx: &mut reactio::SimpleIoReactorContext<'_>| {
//! if count_echos >= max_echos {
//! return Err(format!("Reached max echo: {max_echos}")); // close socket
//! }
//! ctx.send_msg(buf)?; // echo back message.
//! count_echos += 1;
//! Ok(buf.len()) // return number of bytes having been consumed.
//! };
//!
//! let on_server_connected = |ctx: &mut reactio::SimpleIoReactorContext<'_>, listenerid| {
//! ctx.cmd_sender
//! .send_close(listenerid, reactio::Deferred::Immediate, |_| {})?; // close parent listerner.
//! Ok(()) // accept current connection.
//! };
//!
//! let on_new_connection = move |_childid| {
//! // create a new Reactor for the new connection.
//! Some(reactio::SimpleIoReactor::new_boxed(
//! Some(Box::new(on_server_connected)), // on_connected
//! None, // on_closed
//! on_sock_msg, // on_sock_msg
//! ))
//! };
//!
//! let on_client_connected = |ctx: &mut reactio::SimpleIoReactorContext<'_>, _| {
//! // client sends initial msg.
//! let mut auto_sender = ctx.acquire_send(); // send on drop
//! auto_sender.write_fmt(format_args!("test ")).unwrap();
//! auto_sender.write_fmt(format_args!("msgsend")).unwrap();
//! assert_eq!(auto_sender.count_written(), 12);
//! // auto_sender.send(None).unwrap(); // this line can be omitted to let it auto send on drop.
//! // ctx.send_msg("Hello".as_bytes())?; // rather than using auto_sender, we call ctx to send_msg
//! Ok(()) // accept connection
//! };
//!
//! //-- server
//! runtime
//! .get_cmd_sender()
//! .send_listen(
//! addr,
//! reactio::SimpleIoListener::new(recv_buffer_min_size, on_new_connection),
//! reactio::Deferred::Immediate,
//! |_| {}, //OnCommandCompletion
//! )
//! .unwrap();
//! // wait for server ready.
//! let timer = reactio::utils::Timer::new_millis(1000);
//! while runtime.count_reactors() < 1 {
//! if timer.expired() {
//! assert!(false, "ERROR: timeout waiting for listener start!");
//! break;
//! }
//! runtime.process_events();
//! }
//! //-- client
//! runtime
//! .get_cmd_sender()
//! .send_connect(
//! addr,
//! recv_buffer_min_size,
//! reactio::SimpleIoReactor::new(
//! Some(Box::new(on_client_connected)), // on_connected
//! None, // on_closed
//! on_sock_msg, // on_sock_msg
//! ),
//! reactio::Deferred::Immediate,
//! |_| {}, // OnCommandCompletion
//! )
//! .unwrap();
//! // In non-threaded environment, process_events until there're no reactors, no events, no deferred events.
//! let timer = reactio::utils::Timer::new_millis(1000);
//! while runtime.process_events() {
//! if timer.expired() {
//! assert!(false, "ERROR: timeout waiting for tests to complete!");
//! break;
//! }
//! }
//! assert_eq!(runtime.count_reactors(), 0);
//! assert_eq!(runtime.count_deferred_queue(), 0);
//! }
//! ```
//!
//! More examples are echo_client/echo_server (for TCP latency test), PingpongReactor, ThreadedPingpongReactor.
//!
#![allow(dead_code)]
pub mod flat_storage;
mod reactor;
pub use reactor::*;
pub mod threaded_reactors;
pub mod utils;