reactio/
reactor.rs

1use crate::dbglog;
2use crate::flat_storage::FlatStorage;
3use crate::utils;
4use crate::{logerr, logtrace};
5use core::panic;
6use polling::{Event, Events, PollMode, Poller};
7use std::io::{ErrorKind, Read, Write};
8use std::time::Duration;
9use std::{marker::PhantomData, net::TcpStream};
10
11//====================================================================================
12//            Reactor
13//====================================================================================
14
15/// A `Reactor` is assigned a unique ReactorID when adding to a ReactRuntime, and is able to receive socket messsages (via reader) and commands.
16/// `process_events` of a ReactRuntime instance should be periodically called in a dedicated thread.
17/// Besides socket communication, Sending command is the only thread-safe way to communicate with a Reactor.
18/// A Reactor could send socket messages (via sender), or send commands (via cmd_sender) to another Reactor with specific ReactorID.
19/// A Reactor is destroyed when the socket is closed.
20pub trait Reactor {
21    type UserCommand;
22
23    /// ReactRuntime calls it when connection is established.
24    /// * `ctx`  - The context the used for reactor to send socket message or command.
25    ///   * **Note that ctx.cmd_sener can only send command to a reactor that belongs to the same ReactRuntime.**
26    /// * `listener` - The listener ID when the reactor is created by a listener socket; otherwise, it's INVALID_REACTOR_ID.
27    /// * return err to close socket.
28    fn on_connected(
29        &mut self,
30        _ctx: &mut DispatchContext<Self::UserCommand>,
31        _listener: ReactorID,
32    ) -> Result<()> {
33        Ok(()) // accept the connection by default.
34    }
35
36    /// It's called by in on_readable() when either decoded_msg_size==0 (meaning message size is unknown) or decoded_msg_size <= buf.len() (meaning a full message is read).
37    ///
38    /// * `buf`  - The buffer containing all the received bytes
39    /// * `new_bytes` - Number of new received bytes, which are also bytes that are not processed. If previously on_inbound_message returned DropMsgSize. new_bytes is the remaining bytes.
40    /// * `decoded_msg_size` -  the decoded message size which is return value of previous call of on_inbound_message. 0 means message having not been decoded.
41    /// * return ExpectMsgSize(msgsize) to indicate more read until full msgsize is read then call next dispatch. msgsize==0 indicates msg size is unknown. DropMsgSize(msgsize) to indiciate message is processed already. framework can drop the message after call. then msgsize will be 0 again.
42    /// * **Note that when calling on_inbound_message(decoded_msg_size) -> ExpectMsgSize(expect_msg_size), if expect_msg_size!=0, it should be always > msg_size.**
43    fn on_inbound_message(
44        &mut self,
45        buf: &mut [u8],
46        new_bytes: usize,
47        decoded_msg_size: usize,
48        ctx: &mut DispatchContext<Self::UserCommand>,
49    ) -> Result<MessageResult>;
50
51    /// ReactRuntime calls it when there's a readable event. `ctx.reader` could be used to read message. See `MsgReader` for usage.
52    /// This is a default implementation which uses MsgReader to read all messages then call on_inbound_message to dispatch (default `try_read_fast_read`).
53    /// User may override this function to implement other strategies (e.g. `try_read_fast_dispatch``).
54    /// * return Err to close socket.
55    fn on_readable(&mut self, ctx: &mut ReactorReableContext<Self::UserCommand>) -> Result<()> {
56        ctx.reader.try_read_fast_read(
57            &mut DispatchContext {
58                reactorid: ctx.reactorid,
59                sock: ctx.sock,
60                sender: ctx.sender,
61                cmd_sender: ctx.cmd_sender,
62            },
63            &mut |buf, new_bytes, decoded_msg_size, ctx| {
64                self.on_inbound_message(buf, new_bytes, decoded_msg_size, ctx)
65            },
66        )
67    }
68
69    /// ReactRuntime calls it when receiving a command. If no user command is used (e.g. `type UserCommand=();`), user may not need to override it.
70    /// return Err to close the socket.
71    fn on_command(
72        &mut self,
73        _cmd: Self::UserCommand,
74        ctx: &mut DispatchContext<Self::UserCommand>,
75    ) -> Result<()> {
76        panic!("Please impl on_command for reactorid: {}", ctx.reactorid);
77    }
78
79    /// ReactRuntime calls it when the reactor is removed from poller and before closing the socket.
80    /// The Reactor will be destroyed after this call.
81    fn on_close(&mut self, _reactorid: ReactorID, _cmd_sender: &CmdSender<Self::UserCommand>) {
82        // noops by defaut
83    }
84}
85
86pub type Result<T> = std::result::Result<T, String>;
87
88/// `DispatchContext` contains all info that could be used to dispatch command/message to reactors.
89pub struct DispatchContext<'a, UserCommand> {
90    pub reactorid: ReactorID,
91    pub sock: &'a mut std::net::TcpStream,
92    pub sender: &'a mut MsgSender, // socker sender
93    pub cmd_sender: &'a CmdSender<UserCommand>,
94}
95impl<'a, UserCommand> DispatchContext<'a, UserCommand> {
96    fn from(data: &'a mut SockData, cmd_sender: &'a CmdSender<UserCommand>) -> Self {
97        Self {
98            reactorid: data.reactorid,
99            sock: &mut data.sock,
100            sender: &mut data.sender,
101            cmd_sender,
102        }
103    }
104    /// try send until Err, WOULDBLOCK or Complete. No internal buffer is used.
105    /// return number of bytes having sent if not Err.
106    pub fn send_no_que(&mut self, msg: &[u8]) -> std::io::Result<usize> {
107        MsgSender::try_send_all(self.sock, msg)
108    }
109    pub fn send_or_que(&mut self, msg: &[u8]) -> Result<SendOrQueResult> {
110        self.sender.send_or_que(self.sock, msg, None)
111    }
112    /// write and call send_or_que to send.
113    pub fn acquire_send(&mut self) -> AutoSendBuffer<'_> {
114        let old_buf_size = self.sender.buf.len();
115        AutoSendBuffer {
116            sender: self.sender,
117            sock: self.sock,
118            old_buf_size,
119        }
120    }
121}
122
123/// `MessageResult` is returned by `on_inbound_message` to indicate result.
124pub enum MessageResult {
125    /// Having received a partial message. Expecting more, indicating decoded/expected message size. If it's non-zero, `on_inbound_message`` will not be called until full message is read;
126    ///     if it's 0, meaning the message size is unknown, `on_inbound_message` will be called everytime when there are any bytes read.
127    /// * **Note that when calling on_inbound_message(decoded_msg_size) -> ExpectMsgSize(expect_msg_size), if expect_msg_size!=0, it should be always > msg_size.**
128    ExpectMsgSize(usize),
129    /// Full message has been processed, indicating bytes to drop. Next call `on_inbound_message` with argument decoded_msgsize=0 and all rest bytes will be treated as unprocessed.
130    DropMsgSize(usize),
131}
132
133/// `Deferred` is used to indicate a command to be executed immidately or in a deferred time.
134pub enum Deferred {
135    Immediate,
136    UtilTime(std::time::SystemTime),
137}
138/// `CommandCompletion` is used as an argument of command completion callback.
139pub type CommandCompletion = Result<ReactorID>;
140
141/// CmdSender is owned by a ReactRuntime. Users send commands to a reactor with specific ReactorID.
142/// * **Note that CmdSender can only send command to a reactor that belongs to the same ReactRuntime.**
143pub struct CmdSender<UserCommand>(std::sync::mpsc::Sender<CmdData<UserCommand>>);
144/// `CmdSender` is a `Send` so that is can be passed through threads.
145unsafe impl<UserCommand> Send for CmdSender<UserCommand> {}
146
147impl<UserCommand> Clone for CmdSender<UserCommand> {
148    fn clone(&self) -> Self {
149        Self(self.0.clone())
150    }
151}
152impl<UserCommand> CmdSender<UserCommand> {
153    /// Send a command to create a socket to connect to remote IP:Port. The reactor will receive socket messages once connected.
154    /// # Arguments
155    /// * `remote_addr` -  Remote address in format IP:Port.
156    /// * `reactor`     -  The reactor to be add to ReactRuntime to handle the socket messages.
157    /// * `deferred`    -  Indicate the command to be executed immediately or in a deferred time.
158    /// * `completion`  -  Callback to indicate if the command has been executed or failed (e.g. reactorid doesn't exist).
159    pub fn send_connect<AReactor: Reactor<UserCommand = UserCommand> + 'static>(
160        &self,
161        remote_addr: &str,
162        recv_buffer_min_size: usize,
163        reactor: AReactor,
164        deferred: Deferred,
165        completion: impl FnOnce(CommandCompletion) + 'static,
166    ) -> Result<()> {
167        self.send_cmd(
168            INVALID_REACTOR_ID,
169            SysCommand::NewConnect(
170                Box::new(reactor),
171                remote_addr.to_owned(),
172                recv_buffer_min_size,
173            ),
174            deferred,
175            completion,
176        )
177    }
178    /// Send a command to create a listen socket at IP:Port. The reactor will listen on the socket.
179    pub fn send_listen<AReactor: TcpListenerHandler<UserCommand = UserCommand> + 'static>(
180        &self,
181        local_addr: &str,
182        reactor: AReactor,
183        deferred: Deferred,
184        completion: impl FnOnce(CommandCompletion) + 'static,
185    ) -> Result<()> {
186        self.send_cmd(
187            INVALID_REACTOR_ID,
188            SysCommand::NewListen(Box::new(reactor), local_addr.to_owned()),
189            deferred,
190            completion,
191        )
192    }
193
194    /// Send a command to close a reactor and it's socket.
195    pub fn send_close(
196        &self,
197        reactorid: ReactorID,
198        deferred: Deferred,
199        completion: impl FnOnce(CommandCompletion) + 'static,
200    ) -> Result<()> {
201        self.send_cmd(reactorid, SysCommand::CloseSocket, deferred, completion)
202    }
203
204    /// Send a UserCommand to a reactor with specified `reactorid`.
205    /// The existance of reactorid is not check in this function.
206    /// When `process_events` is called and the deferred time becomes current,
207    /// `reactorid` is checked before passing the cmd to reactor.
208    pub fn send_user_cmd(
209        &self,
210        reactorid: ReactorID,
211        cmd: UserCommand,
212        deferred: Deferred,
213        completion: impl FnOnce(CommandCompletion) + 'static,
214    ) -> Result<()> {
215        self.send_cmd(reactorid, SysCommand::UserCmd(cmd), deferred, completion)
216    }
217
218    fn send_cmd(
219        &self,
220        reactorid: ReactorID,
221        cmd: SysCommand<UserCommand>,
222        deferred: Deferred,
223        completion: impl FnOnce(CommandCompletion) + 'static,
224    ) -> Result<()> {
225        // check NewConnect/NewListen when reactor == INVALID.
226        match &cmd {
227            SysCommand::NewListen(_, _) | SysCommand::NewConnect(_, _, _) => {
228                if reactorid != INVALID_REACTOR_ID {
229                    return Err(
230                        "reactorid msut be INVALID_REACTOR_ID if NewConnect/NewListen".to_owned(),
231                    );
232                }
233            }
234            SysCommand::UserCmd(_) => {
235                if reactorid == INVALID_REACTOR_ID {
236                    return Err("UserCmd must has a valid reactorid.".to_owned());
237                }
238                // the reactor id must exist, which is checked when runtime processes the command.
239            }
240            _ => {}
241        }
242        if self
243            .0
244            .send(CmdData::<UserCommand> {
245                reactorid,
246                cmd,
247                deferred,
248                completion: Box::new(completion),
249            })
250            .is_err()
251        {
252            return Err("Failed to send. Receiver disconnected.".to_owned());
253        }
254        Ok(())
255    }
256}
257
258/// `ReactorReableContext` is a helper for a reactor to send/recv socket message, or send command.
259pub struct ReactorReableContext<'a, UserCommand> {
260    pub reactorid: ReactorID,                   // current reactorid.
261    pub sock: &'a mut std::net::TcpStream,      // associated socket.
262    pub sender: &'a mut MsgSender,              // helper to send socket message.
263    pub reader: &'a mut MsgReader,              // helper to read socket message.
264    pub cmd_sender: &'a CmdSender<UserCommand>, // helper to send command.
265}
266/// `TcpListenerHandler` handles incoming connections on a listening socket.
267/// Similar to `Reactor`, it's destroyed when listening socket is closed.
268pub trait TcpListenerHandler {
269    type UserCommand;
270
271    /// called when the listen socket starts listeing.
272    fn on_start_listen(
273        &mut self,
274        _reactorid: ReactorID,
275        _cmd_sender: &CmdSender<Self::UserCommand>,
276    ) {
277    }
278
279    //// return (Reactor, recv_buffer_min_size) or None to close the new connection.
280    fn on_new_connection(
281        &mut self,
282        sock: &mut std::net::TcpListener,
283        new_sock: &mut std::net::TcpStream,
284    ) -> Option<NewStreamConnection<Self::UserCommand>>;
285
286    fn on_close_listen(
287        &mut self,
288        _reactorid: ReactorID,
289        _cmd_sender: &CmdSender<Self::UserCommand>,
290    ) {
291    }
292}
293pub struct NewStreamConnection<UserCommand> {
294    pub reactor: Box<dyn Reactor<UserCommand = UserCommand>>,
295    pub recv_buffer_min_size: usize,
296}
297
298/// ReactRuntime manages & owns Reactors which receive/send socket data or command.
299/// A ReactRuntime has a command queue, deferred command queue and a collection of reactors.
300/// Each reactor is assigned a ReactorID when adding to ReactRuntime. Users send command to a reactor
301/// with a specific ReactorID that belongs to the ReactRuntime. The command could be immediate or deferred for a time.
302/// E.g, on close of a reactor, it could send a command to the ReactRuntime to reconnect in future.
303///
304/// Communication between ReactRuntimes are via sending command also, which is the only thread-safe way.
305/// * **Note that `process_events` of a ReactRuntime instance should be periodically called in a dedicated thread.**
306pub struct ReactRuntime<UserCommand> {
307    mgr: ReactorMgr<UserCommand>,
308    deferred_data: FlatStorage<CmdData<UserCommand>>,
309    deferred_heap: Vec<DeferredKey>, // min heap of (scheduled_time_nanos, Cmd_index_in_deferred_data)
310    sock_events: Events, // decoupled events and connections to avoid double mutable refererence.
311    accum_sock_events: usize, // all events counting since last reset.
312    accum_commands: usize, // commands received since last reset.
313}
314
315#[derive(Copy, Clone)]
316struct DeferredKey {
317    millis: i64,
318    data: usize,
319}
320impl DeferredKey {
321    fn get_key(&self) -> i64 {
322        self.millis
323    }
324}
325
326// push the last element to a min heap. sift up
327fn min_heap_push(v: &mut [DeferredKey]) {
328    let mut k = v.len() - 1; // last element
329    if k == 0 {
330        return;
331    }
332    let mut parent = (k - 1) / 2;
333    while k > 0 && v[k].get_key() < v[parent].get_key() {
334        v.swap(k, parent);
335        k = parent;
336        parent = (k - 1) / 2;
337    }
338}
339// pop the first element to end. sift down.
340fn min_heap_pop(v: &mut [DeferredKey]) {
341    let mut k = 0;
342    let value = v[0];
343    while k < v.len() - 1 {
344        let (l, r) = ((k + 1) * 2 - 1, (k + 1) * 2);
345        let min = if r < v.len() - 1 {
346            if v[l].get_key() < v[r].get_key() {
347                l
348            } else {
349                r
350            }
351        } else if l < v.len() - 1 {
352            l
353        } else {
354            break;
355        };
356        v.swap(min, k);
357        k = min;
358    }
359    v[v.len() - 1] = value;
360}
361
362// Make a separate struct ReactorMgr because when interating TcpConnectionMgr::events, sessions must be mutable in process_events.
363struct ReactorMgr<UserCommand> {
364    socket_handlers: FlatStorage<TcpSocketHandler<UserCommand>>,
365    poller: Poller,
366    count_streams: usize, // TcpStreams only (excluding TcpListener)
367    cmd_recv: std::sync::mpsc::Receiver<CmdData<UserCommand>>,
368    cmd_sender: CmdSender<UserCommand>,
369}
370
371enum TcpSocketHandler<UserCommand> {
372    ListenerType(
373        ReactorID,
374        std::net::TcpListener,
375        Box<dyn TcpListenerHandler<UserCommand = UserCommand>>,
376    ), // <sock, handler, key_in_flat_storage>
377    StreamType(SockData, Box<dyn Reactor<UserCommand = UserCommand>>),
378}
379struct SockData {
380    pub reactorid: ReactorID,
381    pub sock: std::net::TcpStream,
382    pub sender: MsgSender,
383    pub reader: MsgReader,
384    interested_writable: bool,
385}
386impl<'a, UserCommand> ReactorReableContext<'a, UserCommand> {
387    fn from(data: &'a mut SockData, cmd_sender: &'a CmdSender<UserCommand>) -> Self {
388        Self {
389            reactorid: data.reactorid,
390            sock: &mut data.sock,
391            sender: &mut data.sender,
392            reader: &mut data.reader,
393            cmd_sender,
394        }
395    }
396}
397
398#[cfg(target_pointer_width = "64")]
399type HalfUsize = u32;
400#[cfg(target_pointer_width = "32")]
401type HalfUSize = u16;
402
403#[derive(Debug, Clone, Copy, PartialEq, Eq)]
404pub struct ReactorID {
405    sockslot: HalfUsize, // must be half of usize
406    ver: HalfUsize,      // must be half of usize
407}
408
409pub const INVALID_REACTOR_ID: ReactorID = ReactorID {
410    sockslot: HalfUsize::MAX,
411    ver: HalfUsize::MAX,
412};
413
414impl ReactorID {
415    /// convert to epoll event key
416    pub fn to_usize(&self) -> usize {
417        let halfbits = std::mem::size_of::<usize>() * 8 / 2;
418        ((self.ver as usize) << halfbits) | (self.sockslot as usize)
419    }
420    /// convert from epoll event key
421    pub fn from_usize(val: usize) -> Self {
422        let halfbits = std::mem::size_of::<usize>() * 8 / 2;
423        Self {
424            sockslot: val as HalfUsize,
425            ver: (val >> halfbits) as HalfUsize,
426        }
427    }
428}
429impl std::fmt::Display for ReactorID {
430    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431        write!(f, "{}:{}", self.sockslot, self.ver)
432    }
433}
434
435impl<UserCommand> ReactorMgr<UserCommand> {
436    fn new() -> Self {
437        let (cmd_sender, cmd_recv) = std::sync::mpsc::channel::<CmdData<UserCommand>>();
438        Self {
439            socket_handlers: FlatStorage::new(),
440            poller: Poller::new().unwrap(),
441            count_streams: 0,
442            cmd_sender: CmdSender(cmd_sender),
443            cmd_recv,
444        }
445    }
446    /// \return number of all managed socks.
447    fn len(&self) -> usize {
448        self.socket_handlers.len()
449    }
450    fn add_stream(
451        &mut self,
452        recv_buffer_min_size: usize,
453        sock: std::net::TcpStream,
454        handler: Box<dyn Reactor<UserCommand = UserCommand>>,
455    ) -> ReactorID {
456        let key = self.socket_handlers.add(TcpSocketHandler::StreamType(
457            SockData {
458                reactorid: INVALID_REACTOR_ID,
459                sock,
460                sender: MsgSender::new(),
461                reader: MsgReader::new(recv_buffer_min_size),
462                interested_writable: false,
463            },
464            handler,
465        ));
466        let reactorid = ReactorID {
467            sockslot: key as HalfUsize,
468            ver: self.socket_handlers.len() as HalfUsize,
469        };
470        self.count_streams += 1;
471        if let TcpSocketHandler::StreamType(sockdata, ref mut _handler) =
472            self.socket_handlers.get_mut(key).unwrap()
473        {
474            sockdata.reactorid = reactorid;
475            unsafe {
476                self.poller
477                    .add_with_mode(
478                        &sockdata.sock,
479                        Event::readable(reactorid.to_usize()),
480                        PollMode::Level,
481                    )
482                    .unwrap();
483            }
484            logtrace!(
485                "Added TcpStream reactorid: {}, sock: {:?}",
486                sockdata.reactorid,
487                sockdata.sock
488            );
489            return sockdata.reactorid;
490        }
491        panic!("ERROR! Failed to get new added sockdata!");
492    }
493
494    fn add_listener(
495        &mut self,
496        sock: std::net::TcpListener,
497        handler: Box<dyn TcpListenerHandler<UserCommand = UserCommand>>,
498    ) -> ReactorID {
499        let key = self.socket_handlers.add(TcpSocketHandler::ListenerType(
500            INVALID_REACTOR_ID,
501            sock,
502            handler,
503        ));
504        let reactorid = ReactorID {
505            sockslot: key as HalfUsize,
506            ver: self.socket_handlers.len() as HalfUsize,
507        };
508        if let TcpSocketHandler::ListenerType(areactorid, ref sock, _) =
509            self.socket_handlers.get_mut(key).unwrap()
510        {
511            *areactorid = reactorid;
512            // must read exaustively.
513            unsafe {
514                self.poller
515                    .add_with_mode(sock, Event::readable(reactorid.to_usize()), PollMode::Level)
516                    .unwrap();
517            }
518            logtrace!(
519                "Added TcpListener reactorid: {}, sock: {:?}",
520                reactorid,
521                sock
522            );
523        }
524        reactorid
525    }
526
527    /// Close & remove socket/reactor.
528    /// * return true if key exists; false when key doesn't exist or it's in process of polling.
529    fn close_reactor(&mut self, reactorid: ReactorID) -> bool {
530        if let Some(sockhandler) = self.socket_handlers.remove(reactorid.sockslot as usize) {
531            match sockhandler {
532                TcpSocketHandler::StreamType(sockdata, mut reactor) => {
533                    debug_assert_eq!(reactorid, sockdata.reactorid);
534                    logtrace!(
535                        "removing reactorid: {}, sock: {:?}, pending_read_bytes: {}, pending_send_bytes: {}",
536                        reactorid,
537                        sockdata.sock,
538                        sockdata.reader.bytes_in_buffer(),
539                        sockdata.sender.buf.len()
540                    );
541                    self.count_streams -= 1;
542                    self.poller.delete(&sockdata.sock).unwrap();
543                    (reactor).on_close(reactorid, &self.cmd_sender);
544                }
545                TcpSocketHandler::ListenerType(areactorid, sock, mut reactor) => {
546                    debug_assert_eq!(reactorid, areactorid);
547                    logtrace!("removing reactorid: {}, sock: {:?}", reactorid, sock);
548                    self.poller.delete(&sock).unwrap();
549                    (reactor).on_close_listen(reactorid, &self.cmd_sender);
550                }
551            }
552            return true;
553        }
554        false
555    }
556
557    /// * local_addr - ip:port. e.g. "127.0.0.1:8000"
558    fn start_listen(
559        &mut self,
560        local_addr: &str,
561        handler: Box<dyn TcpListenerHandler<UserCommand = UserCommand>>,
562    ) -> std::io::Result<ReactorID> {
563        let socket = std::net::TcpListener::bind(local_addr)?;
564        socket.set_nonblocking(true)?;
565        let reactorid = self.add_listener(socket, handler);
566        if let TcpSocketHandler::ListenerType(_, _, ref mut handler) = self
567            .socket_handlers
568            .get_mut(reactorid.sockslot as usize)
569            .unwrap()
570        {
571            handler.on_start_listen(reactorid, &self.cmd_sender);
572            return std::io::Result::Ok(reactorid);
573        }
574        std::io::Result::Ok(INVALID_REACTOR_ID)
575    }
576
577    fn start_connect(
578        &mut self,
579        remote_addr: &str,
580        recv_buffer_min_size: usize,
581        handler: Box<dyn Reactor<UserCommand = UserCommand>>,
582    ) -> std::io::Result<ReactorID> {
583        let socket = TcpStream::connect(remote_addr)?;
584        socket.set_nonblocking(true)?; // FIXME: use blocking socket and each nonblocking read and blocking write.
585        let reactorid = self.add_stream(recv_buffer_min_size, socket, handler);
586        if let TcpSocketHandler::StreamType(ref mut sockdata, ref mut handler) = self
587            .socket_handlers
588            .get_mut(reactorid.sockslot as usize)
589            .unwrap()
590        {
591            if handler
592                .on_connected(
593                    &mut DispatchContext::from(sockdata, &self.cmd_sender),
594                    INVALID_REACTOR_ID,
595                )
596                .is_ok()
597            {
598                return std::io::Result::Ok(reactorid);
599            }
600        }
601        self.close_reactor(reactorid);
602        std::io::Result::Ok(INVALID_REACTOR_ID)
603    }
604}
605
606impl<UserCommand> Default for ReactRuntime<UserCommand> {
607    fn default() -> Self {
608        Self::new()
609    }
610}
611impl<UserCommand> ReactRuntime<UserCommand> {
612    pub fn new() -> Self {
613        Self {
614            mgr: ReactorMgr::new(),
615            deferred_data: FlatStorage::new(),
616            deferred_heap: Vec::new(),
617            sock_events: Events::new(),
618            accum_sock_events: 0,
619            accum_commands: 0,
620        }
621    }
622    /// `process_events` should be periodically called to process socket messages, commands and deferred commands.
623    /// - return true if the runtime has any of reactors, commands or deferred commands;
624    /// - return false when there's no reactor/socket or command, then this runtime could be destroyed.
625    pub fn process_events(&mut self) -> bool {
626        self.process_events_with(1, 32)
627    }
628    pub fn process_events_with(&mut self, sock_timeout_millis: u64, max_commands: usize) -> bool {
629        let sock_events = self.process_sock_events(sock_timeout_millis);
630        self.accum_sock_events += sock_events;
631        self.process_deferred_queue();
632        let cmds = self.process_command_queue(max_commands);
633        self.accum_commands += cmds;
634        sock_events > 0 || cmds > 0 || !self.deferred_heap.is_empty() || self.mgr.len() > 0
635    }
636    /// Count listeners, Reactors
637    pub fn count_reactors(&self) -> usize {
638        self.mgr.len()
639    }
640    /// return number if deferred commands in deferred queue which saves deferred commands.
641    /// **Note: there's also an internal command queue that is used to receive commands**
642    pub fn count_deferred_queue(&self) -> usize {
643        self.deferred_data.len()
644    }
645    /// return number of streams (excluding listeners)
646    pub fn count_streams(&self) -> usize {
647        self.mgr.count_streams
648    }
649    /// return total number of received socket events.
650    pub fn count_sock_events(&self) -> usize {
651        self.accum_sock_events
652    }
653    /// return total number of received commands.
654    pub fn count_received_commands(&self) -> usize {
655        self.accum_commands
656    }
657    /// Get the `CmdSender` managed by this ReactRuntime. It's used to send command to reactors in this ReactRuntime.
658    pub fn get_cmd_sender(&self) -> &CmdSender<UserCommand> {
659        &self.mgr.cmd_sender
660    }
661
662    /// return number of socket events processed.
663    pub fn process_sock_events(&mut self, timeout_millis: u64) -> usize {
664        self.sock_events.clear();
665        self.mgr
666            .poller
667            .wait(
668                &mut self.sock_events,
669                Some(Duration::from_millis(timeout_millis)),
670            )
671            .unwrap(); // None duration means forever
672
673        for ev in self.sock_events.iter() {
674            let mut removesock = false;
675            let current_reactorid = ReactorID::from_usize(ev.key);
676            let mut new_connection_to_add = None;
677            if let Some(sockhandler) = self
678                .mgr
679                .socket_handlers
680                .get_mut(current_reactorid.sockslot as usize)
681            {
682                match sockhandler {
683                    TcpSocketHandler::ListenerType(reactorid, sock, handler) => {
684                        debug_assert_eq!(current_reactorid, *reactorid);
685                        if ev.readable {
686                            let (mut newsock, _) = sock.accept().unwrap();
687                            if let Some(new_stream_connection) =
688                                handler.on_new_connection(sock, &mut newsock)
689                            {
690                                newsock.set_nonblocking(true).unwrap();
691                                new_connection_to_add = Some((
692                                    newsock,
693                                    new_stream_connection.reactor,
694                                    new_stream_connection.recv_buffer_min_size,
695                                ));
696                            }
697                            // else newsock will auto destroy
698                        }
699                        if ev.writable {
700                            logerr!("writable listener sock!");
701                            removesock = true;
702                        }
703                    }
704                    TcpSocketHandler::StreamType(ref mut ctx, ref mut handler) => {
705                        debug_assert_eq!(current_reactorid, ctx.reactorid);
706                        if ev.writable {
707                            if !ctx.interested_writable {
708                                dbglog!("WARN: unsolicited writable sock: {:?}", ctx.sock);
709                            }
710                            ctx.interested_writable = true; // in case unsolicited event.
711                            if !ctx.sender.buf.is_empty() {
712                                if let Err(err) = ctx.sender.send_queued(&mut ctx.sock) {
713                                    logtrace!("{err}  send_queued failed.");
714                                    removesock = true;
715                                }
716                            }
717                        }
718                        if ev.readable {
719                            if let Err(err) = handler.on_readable(&mut ReactorReableContext::from(
720                                ctx,
721                                &self.mgr.cmd_sender,
722                            )) {
723                                if !err.is_empty() {
724                                    logtrace!("on_readable requested close current_reactorid: {current_reactorid}, sock: {:?}. Reason: {}", ctx.sock, err);
725                                }
726                                removesock = true;
727                            }
728                        }
729                        if ctx.sender.close_or_error {
730                            removesock = true;
731                        }
732                        // add or remove write interest
733                        if !removesock {
734                            if !ctx.interested_writable && !ctx.sender.buf.is_empty() {
735                                self.mgr
736                                    .poller
737                                    .modify_with_mode(
738                                        &ctx.sock,
739                                        Event::all(ev.key),
740                                        PollMode::Level,
741                                    )
742                                    .unwrap();
743                                ctx.interested_writable = true;
744                            } else if ctx.interested_writable && ctx.sender.buf.is_empty() {
745                                self.mgr
746                                    .poller
747                                    .modify_with_mode(
748                                        &ctx.sock,
749                                        Event::readable(ev.key),
750                                        PollMode::Level,
751                                    )
752                                    .unwrap();
753                                ctx.interested_writable = false;
754                            }
755                        }
756                    }
757                }
758            } else {
759                dbglog!("[ERROR] socket key has been removed {}!", current_reactorid);
760                continue;
761            }
762
763            if let Some((newsock, newhandler, recv_buffer_min_size)) = new_connection_to_add {
764                let newreactorid_to_close = {
765                    let newreactorid =
766                        self.mgr
767                            .add_stream(recv_buffer_min_size, newsock, newhandler);
768                    if let TcpSocketHandler::StreamType(ref mut newsockdata, ref mut newhandler) =
769                        self.mgr
770                            .socket_handlers
771                            .get_mut(newreactorid.sockslot as usize)
772                            .unwrap()
773                    {
774                        match newhandler.on_connected(
775                            &mut DispatchContext::from(newsockdata, &self.mgr.cmd_sender),
776                            current_reactorid,
777                        ) {
778                            Ok(_) => INVALID_REACTOR_ID, // accept it, don't close it.
779                            Err(err) => {
780                                if !err.is_empty() {
781                                    logtrace!("Reject new connection for listener_reactorid: {}. Reason: {}", current_reactorid, err);
782                                }
783                                newsockdata.reactorid // close it.
784                            }
785                        }
786                    } else {
787                        panic!("Failed to find new added stream!");
788                    }
789                };
790                if newreactorid_to_close != INVALID_REACTOR_ID {
791                    self.mgr.close_reactor(newreactorid_to_close);
792                }
793
794                continue;
795            }
796
797            if removesock {
798                self.mgr.close_reactor(current_reactorid);
799                continue; // ignore error events.
800            }
801            if ev.is_err().unwrap_or(false) {
802                logerr!("WARN: socket error key: {}", current_reactorid);
803                removesock = true;
804            }
805            if ev.is_interrupt() {
806                logerr!("WARN: socket interrupt key: {}", current_reactorid);
807                removesock = true;
808            }
809            if removesock {
810                self.mgr.close_reactor(current_reactorid);
811            }
812        }
813
814        self.sock_events.len()
815    }
816
817    /// return number of command procesed
818    pub fn process_command_queue(&mut self, max_commands: usize) -> usize {
819        let mut count_cmd = 0usize;
820        // max number of commands to process for each call of process_command_queue. So to have chances to process socket/deferred events.
821        for _ in 0..max_commands {
822            let cmddata: CmdData<UserCommand> = match self.mgr.cmd_recv.try_recv() {
823                Err(err) => {
824                    if err == std::sync::mpsc::TryRecvError::Empty {
825                        return count_cmd;
826                    } else {
827                        panic!("std::sync::mpsc::TryRecvError::Disconnected is not possible. Because both cmd_sender & cmd_recv are saved.");
828                    }
829                }
830                Ok(data) => data,
831            };
832            count_cmd += 1;
833
834            match cmddata.deferred {
835                Deferred::Immediate => {}
836                Deferred::UtilTime(time) => {
837                    let millis = time
838                        .duration_since(std::time::SystemTime::UNIX_EPOCH)
839                        .unwrap()
840                        .as_millis() as i64;
841                    if !ReactRuntime::<UserCommand>::is_deferred_current(millis) {
842                        // if beyond half millis tolerance
843                        let key = self.deferred_data.add(cmddata);
844                        self.deferred_heap.push(DeferredKey { millis, data: key });
845                        min_heap_push(&mut self.deferred_heap);
846                        continue; // continue loop recv
847                    }
848                }
849            }
850            self.execute_immediate_cmd(cmddata);
851        } // loop
852        count_cmd
853    }
854
855    /// return timeout/executed commands
856    pub fn process_deferred_queue(&mut self) -> usize {
857        let mut cmds = 0;
858        while !self.deferred_heap.is_empty()
859            && ReactRuntime::<UserCommand>::is_deferred_current(self.deferred_heap[0].millis)
860        {
861            let key = self.deferred_heap[0].data;
862            min_heap_pop(&mut self.deferred_heap);
863            self.deferred_heap.pop();
864            cmds += 1;
865            if let Some(cmddata) = self.deferred_data.remove(key) {
866                self.execute_immediate_cmd(cmddata);
867            } else {
868                panic!("No deferred CommandData with key: {}", key);
869            }
870        }
871        cmds
872    }
873
874    //----------------------------- private -----------------------------------------------
875
876    fn is_deferred_current(millis: i64) -> bool {
877        let now_nanos = utils::now_nanos();
878        millis * 1000000 + 5 * 100000 <= now_nanos
879    }
880
881    fn execute_immediate_cmd(&mut self, cmddata: CmdData<UserCommand>) {
882        let mut reactorid_to_close: ReactorID = INVALID_REACTOR_ID;
883        match cmddata.cmd {
884            SysCommand::NewConnect(reactor, remote_addr, recv_buffer_min_size) => {
885                match self
886                    .mgr
887                    .start_connect(&remote_addr, recv_buffer_min_size, reactor)
888                {
889                    Err(err) => {
890                        let errmsg =
891                            format!("Failed to connect to {}. Error: {}", remote_addr, err);
892                        (cmddata.completion)(Err(errmsg));
893                    }
894                    Ok(key) => {
895                        (cmddata.completion)(Ok(key));
896                    }
897                }
898            }
899            SysCommand::NewListen(reactor, local_addr) => {
900                match self.mgr.start_listen(&local_addr, reactor) {
901                    Err(err) => {
902                        let errmsg = format!("Failed to listen on {}. Error: {}", local_addr, err);
903                        (cmddata.completion)(Err(errmsg));
904                    }
905                    Ok(key) => {
906                        (cmddata.completion)(Ok(key));
907                    }
908                }
909            }
910            SysCommand::CloseSocket => {
911                if self.mgr.close_reactor(cmddata.reactorid) {
912                    (cmddata.completion)(Ok(cmddata.reactorid));
913                } else {
914                    (cmddata.completion)(Err(format!(
915                        "Failed to remove non existing socket with reactorid: {}",
916                        cmddata.reactorid
917                    )));
918                }
919            }
920            SysCommand::UserCmd(usercmd) => {
921                if cmddata.reactorid == INVALID_REACTOR_ID {
922                    panic!("UserCommand must be executed on a reactor!");
923                } else if let Some(handler) = self
924                    .mgr
925                    .socket_handlers
926                    .get_mut(cmddata.reactorid.sockslot as usize)
927                {
928                    match handler {
929                        TcpSocketHandler::ListenerType(reactorid, _, _) => {
930                            (cmddata.completion)(Err(format!(
931                                    "Listener cannot receive user command. cmd reactorid: {}, reactorid: {}",
932                                    cmddata.reactorid, *reactorid
933                                )));
934                        }
935                        TcpSocketHandler::StreamType(ctx, reactor) => {
936                            if cmddata.reactorid != ctx.reactorid {
937                                (cmddata.completion)(Err(format!(
938                                        "Failed to execute user command with wrong cmd reactorid: {}, found: {}",
939                                        cmddata.reactorid , ctx.reactorid
940                                    )));
941                            } else {
942                                let res = (reactor).on_command(
943                                    usercmd,
944                                    &mut DispatchContext {
945                                        reactorid: cmddata.reactorid,
946                                        sock: &mut ctx.sock,
947                                        sender: &mut ctx.sender,
948                                        cmd_sender: &self.mgr.cmd_sender,
949                                    },
950                                );
951                                (cmddata.completion)(Ok(cmddata.reactorid));
952                                if let Err(err) = res {
953                                    logtrace!(
954                                        "on_command requested closing reactorid: {}. {}",
955                                        cmddata.reactorid,
956                                        err
957                                    );
958                                    reactorid_to_close = cmddata.reactorid;
959                                }
960                            }
961                        }
962                    }
963                } else {
964                    (cmddata.completion)(Err(format!(
965                        "Failed to execute user command on non existing socket with reactorid: {}",
966                        cmddata.reactorid
967                    )));
968                }
969            }
970        } // match cmd
971
972        if reactorid_to_close != INVALID_REACTOR_ID {
973            self.mgr.close_reactor(reactorid_to_close);
974        }
975    }
976}
977
978//====================================================================================
979//            SysCommand to Reactor
980//====================================================================================
981
982enum SysCommand<UserCommand> {
983    //-- system commands are processed by Runtime, Reactor will not receive them.
984    NewConnect(
985        Box<dyn Reactor<UserCommand = UserCommand>>,
986        String, // connect to remote IP:Port
987        usize,  // min_recev_buffer_size
988    ),
989    NewListen(
990        Box<dyn TcpListenerHandler<UserCommand = UserCommand>>,
991        String, // connect to remote IP:Port
992    ), // listen on IP:Port
993    CloseSocket,
994    UserCmd(UserCommand),
995}
996
997struct CmdData<UserCommand> {
998    reactorid: ReactorID,
999    cmd: SysCommand<UserCommand>,
1000    deferred: Deferred,
1001    completion: Box<dyn FnOnce(CommandCompletion)>,
1002}
1003unsafe impl<UserCommand> Send for CmdData<UserCommand> {}
1004
1005//====================================================================================
1006//            MsgSender
1007//====================================================================================
1008
1009/// MsgSender is a per-socket object. It tries sending msg on a non-blocking socket. if sending fails due to WOULDBLOCK,
1010/// the unsent bytes are saved and register a Write insterest in poller, so that
1011/// the remaining data will be scheduled to send on next Writeable event.
1012///
1013/// If a reactor should choose either MsgSender or socket to send messages.
1014/// Mixed using of both may cause out-of-order messages.
1015pub struct MsgSender {
1016    pub buf: Vec<u8>,
1017    pub pending: FlatStorage<PendingSend>, // Each PendingSend represents a send_or_que action.
1018    first_pending_id: usize, // the id in flat_storage, usize::MAX is invalid. pop from front.
1019    last_pending_id: usize,  // the id in flat_storage, usize::MAX is invalid. push to back.
1020    pub bytes_sent: usize,   // total bytes having been sent. buf[0] is bytes_sent+1 byte to send.
1021    close_or_error: bool,
1022}
1023/// Used to save the pending Send action.
1024pub struct PendingSend {
1025    next_id: usize, // the id in flat_storage. LinkNode of PendingSend saved in MsgSender::pending.
1026    startpos: usize, // the first byte of message to sent in buf,
1027    msgsize: usize,
1028    completion: Box<dyn FnOnce()>, // notify write completion.
1029}
1030
1031/// `SendOrQueResult` is the result of `MsgSender::send_or_que` or `DispatchContext::send_or_que`.
1032#[derive(PartialEq, Eq)]
1033pub enum SendOrQueResult {
1034    /// No message in queue
1035    Complete,
1036    /// message in queue
1037    InQueue,
1038}
1039impl Default for MsgSender {
1040    fn default() -> Self {
1041        Self::new()
1042    }
1043}
1044impl MsgSender {
1045    pub fn new() -> Self {
1046        Self {
1047            buf: Vec::new(),
1048            pending: FlatStorage::new(),
1049            first_pending_id: usize::MAX, // FIFO queue, append to last. pop from first.
1050            last_pending_id: usize::MAX,
1051            bytes_sent: 0,
1052            close_or_error: false,
1053        }
1054    }
1055
1056    // try send until Err, WOULDBLOCK or Complete.
1057    // return number of bytes having sent.
1058    pub fn try_send_all(sock: &mut std::net::TcpStream, buf: &[u8]) -> std::io::Result<usize> {
1059        if buf.is_empty() {
1060            return Ok(0);
1061        }
1062        let mut buf = buf;
1063        let mut sentbytes = 0;
1064        loop {
1065            match sock.write(buf) {
1066                std::io::Result::Ok(bytes) => {
1067                    if bytes < buf.len() {
1068                        buf = &buf[bytes..];
1069                        sentbytes += bytes; // retry next loop
1070                    } else {
1071                        return Ok(sentbytes + bytes); // sent
1072                    }
1073                }
1074                std::io::Result::Err(err) => {
1075                    let errkind = err.kind();
1076                    if errkind == ErrorKind::WouldBlock {
1077                        return Ok(sentbytes); // queued
1078                    } else if errkind == ErrorKind::ConnectionReset {
1079                        logtrace!("sock reset : {sock:?}. close socket");
1080                        return Err(err);
1081                    // socket closed
1082                    } else if errkind == ErrorKind::Interrupted {
1083                        logtrace!("[WARN] sock Interrupted : {sock:?}. retry");
1084                        return Err(err); // Interrupted is not an error. queue
1085                    } else {
1086                        logtrace!("[ERROR]: write on sock {sock:?}, error: {err:?}");
1087                        return Err(err);
1088                    }
1089                }
1090            }
1091        }
1092    }
1093
1094    /// Send the message or queue it if unabe to send. When there's any messsage is in queue, The `ReactRuntime` will auto send it next time when `process_events`` is called.
1095    /// * Note that if this function is called with a socket. the same sender should always be used to send socket messages.
1096    /// * `send_completion` - callback to indicate the message is sent. If there's any error, the socket will be closed and this callback is not called.
1097    pub fn send_or_que(
1098        &mut self,
1099        sock: &mut std::net::TcpStream,
1100        buf: &[u8],
1101        send_completion: Option<Box<dyn FnOnce()>>,
1102    ) -> Result<SendOrQueResult> {
1103        // let mut buf = buf;
1104        if buf.is_empty() {
1105            if let Some(callback) = send_completion {
1106                (callback)();
1107            }
1108            return Ok(SendOrQueResult::Complete);
1109        }
1110        if !self.buf.is_empty() {
1111            self.buf.extend_from_slice(buf);
1112            self.queue_msg_completion(buf.len(), send_completion);
1113            return Ok(SendOrQueResult::InQueue);
1114        }
1115        // else sendbuf is empty.  try send. queue it if fails.
1116        debug_assert_eq!(self.bytes_sent, 0);
1117        debug_assert_eq!(self.first_pending_id, usize::MAX);
1118        debug_assert_eq!(self.last_pending_id, usize::MAX);
1119        debug_assert_eq!(self.pending.len(), 0);
1120
1121        let sentbytes = match MsgSender::try_send_all(sock, buf) {
1122            Err(err) => {
1123                return Err(err.to_string());
1124            }
1125            Ok(bytes) => bytes,
1126        };
1127
1128        if sentbytes == buf.len() {
1129            if let Some(callback) = send_completion {
1130                (callback)();
1131            }
1132            return Ok(SendOrQueResult::Complete); // sent
1133        }
1134        //---- queue the remaining bytes
1135        self.buf.extend_from_slice(&buf[sentbytes..]);
1136        self.queue_msg_completion(buf.len() - sentbytes, send_completion);
1137        Ok(SendOrQueResult::InQueue)
1138    }
1139
1140    // call this function after message has been appened to self.buf.
1141    fn queue_msg_completion(
1142        &mut self,
1143        queued_size: usize,
1144        send_completion: Option<Box<dyn FnOnce()>>,
1145    ) {
1146        if let Some(callback) = send_completion {
1147            // append to last.
1148            let prev_id = self.last_pending_id;
1149            self.last_pending_id = self.pending.add(PendingSend {
1150                next_id: usize::MAX,
1151                startpos: self.bytes_sent + self.buf.len() - queued_size,
1152                msgsize: queued_size,
1153                completion: callback,
1154            });
1155            if let Some(prev) = self.pending.get_mut(prev_id) {
1156                prev.next_id = self.last_pending_id;
1157            }
1158            if self.first_pending_id == usize::MAX {
1159                // add the first one
1160                self.first_pending_id = self.last_pending_id;
1161            }
1162        }
1163    }
1164
1165    // This function is called ony ReactRuntime to send messages in queue.
1166    #[allow(unused_assignments)]
1167    fn send_queued(&mut self, sock: &mut std::net::TcpStream) -> Result<SendOrQueResult> {
1168        if self.buf.is_empty() {
1169            return Ok(SendOrQueResult::Complete);
1170        }
1171        let mut sentbytes = 0;
1172        match sock.write(&self.buf[..]) {
1173            std::io::Result::Ok(bytes) => {
1174                sentbytes = bytes;
1175                if bytes == 0 {
1176                    self.close_or_error = true;
1177                    return Err(format!("[ERROR] write sock 0 bytes {sock:?}. close socket"));
1178                }
1179            }
1180            std::io::Result::Err(err) => {
1181                let errkind = err.kind();
1182                if errkind == ErrorKind::WouldBlock {
1183                    return Ok(SendOrQueResult::InQueue); // queued
1184                } else if errkind == ErrorKind::ConnectionReset {
1185                    self.close_or_error = true;
1186                    return Err(format!(
1187                        "[ERROR] Write sock ConnectionReset {sock:?}. close socket"
1188                    ));
1189                // socket closed
1190                } else if errkind == ErrorKind::Interrupted {
1191                    logtrace!("[WARN] sock Interrupted : {sock:?}. retry");
1192                    return Ok(SendOrQueResult::InQueue); // Interrupted is not an error. queue
1193                } else {
1194                    self.close_or_error = true;
1195                    return Err(format!("[ERROR]: write on sock {sock:?}, error: {err:?}"));
1196                }
1197            }
1198        }
1199        //-- now sent some bytes. pop pending list and notify.
1200        while self.first_pending_id != usize::MAX {
1201            let id = self.first_pending_id;
1202            let (mut sent, mut next_id) = (false, 0);
1203            if let Some(pending) = self.pending.get_mut(id) {
1204                if pending.startpos + pending.msgsize <= self.bytes_sent {
1205                    // fulled sent
1206                    sent = true;
1207                    next_id = pending.next_id;
1208                } else {
1209                    // the first msg not being fully sent.
1210                    pending.msgsize -= self.bytes_sent - pending.startpos;
1211                    pending.startpos = self.bytes_sent;
1212                    break;
1213                }
1214            } else {
1215                panic!("invalid id");
1216            }
1217            if sent {
1218                self.first_pending_id = next_id;
1219                if let Some(pending) = self.pending.remove(id) {
1220                    (pending.completion)();
1221                }
1222            }
1223        }
1224        if self.first_pending_id == usize::MAX {
1225            // removed the last
1226            self.last_pending_id = usize::MAX;
1227        }
1228        Ok(self.move_buf_front_after_send(sentbytes))
1229    }
1230    // return SendOrQueResult::Complete or InQueue
1231    fn move_buf_front_after_send(&mut self, sentbytes: usize) -> SendOrQueResult {
1232        //- move front buf
1233        let len = self.buf.len();
1234        self.buf.copy_within(sentbytes..len, 0);
1235        self.buf.resize(len - sentbytes, 0);
1236        if self.buf.is_empty() {
1237            // reset members if buf is empty.
1238            debug_assert_eq!(self.first_pending_id, usize::MAX);
1239            debug_assert_eq!(self.last_pending_id, usize::MAX);
1240            debug_assert_eq!(self.pending.len(), 0);
1241            self.bytes_sent = 0;
1242            SendOrQueResult::Complete
1243        } else {
1244            self.bytes_sent += sentbytes;
1245            SendOrQueResult::InQueue
1246        }
1247    }
1248}
1249
1250pub struct AutoSendBuffer<'sender> {
1251    sender: &'sender mut MsgSender,
1252    sock: &'sender mut std::net::TcpStream,
1253    old_buf_size: usize,
1254}
1255impl AutoSendBuffer<'_> {
1256    // clear all unsent bytes.
1257    pub fn clear(&mut self) {
1258        self.sender.buf.resize(self.old_buf_size, 0);
1259    }
1260    pub fn count_written(&self) -> usize {
1261        self.sender.buf.len() - self.old_buf_size
1262    }
1263    /// Get the buffer containing the written bytes.
1264    pub fn get_written(&self) -> &[u8] {
1265        &self.sender.buf[self.old_buf_size..]
1266    }
1267    pub fn send(
1268        &mut self,
1269        send_completion: Option<Box<dyn FnOnce()>>,
1270    ) -> std::io::Result<SendOrQueResult> {
1271        let buf = &self.sender.buf[self.old_buf_size..];
1272        let buf_len = buf.len();
1273        if buf.is_empty() {
1274            if let Some(callback) = send_completion {
1275                (callback)();
1276            }
1277            self.old_buf_size = self.sender.buf.len();
1278            return Ok(SendOrQueResult::Complete);
1279        }
1280        if self.old_buf_size > 0 {
1281            self.sender.queue_msg_completion(buf.len(), send_completion);
1282            self.old_buf_size = self.sender.buf.len();
1283            return Ok(SendOrQueResult::InQueue);
1284        }
1285
1286        let sentbytes = match MsgSender::try_send_all(self.sock, buf) {
1287            Err(err) => {
1288                self.sender.close_or_error = true;
1289                self.old_buf_size = self.sender.buf.len();
1290                return Err(err);
1291            }
1292            Ok(bytes) => bytes,
1293        };
1294
1295        if sentbytes > 0 {
1296            self.sender.move_buf_front_after_send(sentbytes);
1297        }
1298
1299        if sentbytes == buf_len {
1300            if let Some(callback) = send_completion {
1301                (callback)();
1302            }
1303            self.old_buf_size = self.sender.buf.len();
1304            return Ok(SendOrQueResult::Complete); // sent
1305        }
1306        //---- queue the remaining bytes
1307        self.sender
1308            .queue_msg_completion(self.sender.buf.len(), send_completion);
1309        self.old_buf_size = self.sender.buf.len();
1310        Ok(SendOrQueResult::InQueue)
1311    }
1312}
1313impl Drop for AutoSendBuffer<'_> {
1314    fn drop(&mut self) {
1315        self.send(None).unwrap(); // send on drop
1316    }
1317}
1318impl std::io::Write for AutoSendBuffer<'_> {
1319    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1320        self.sender.buf.extend_from_slice(buf);
1321        Ok(buf.len())
1322    }
1323    fn flush(&mut self) -> std::io::Result<()> {
1324        self.send(None)?;
1325        Ok(())
1326    }
1327}
1328
1329//====================================================================================
1330//            MsgReader
1331//====================================================================================
1332
1333/// `MsgReader` is a per-socket helper to read socket messages. It auto handles partial/multiple messages in recv buffer.  
1334/// On Readable event, call MsgReader::try_read_fast_dispatch/try_read_fast_read to read messages from a sock into a recv_buffer and calls dispatcher::on_inbound_message to dispatch message.
1335pub struct MsgReader {
1336    recv_buffer: Vec<u8>,
1337    min_reserve: usize,     // the min reserved buffer size before each read
1338    startpos: usize,        // msg start position or first effective byte.
1339    bufsize: usize,         // count from buffer[0] to last read byte.
1340    decoded_msgsize: usize, // decoded msg size from MessageResult::ExpectMsgSize,
1341}
1342
1343impl MsgReader {
1344    pub fn new(min_reserved_bytes: usize) -> Self {
1345        Self {
1346            // dispatcher : dispatch,
1347            recv_buffer: vec![0u8; min_reserved_bytes],
1348            min_reserve: min_reserved_bytes,
1349            startpos: 0,
1350            bufsize: 0,
1351            decoded_msgsize: 0,
1352        }
1353    }
1354
1355    pub fn bytes_in_buffer(&self) -> usize {
1356        self.bufsize - self.startpos
1357    }
1358
1359    pub fn clear(&mut self) {
1360        self.decoded_msgsize = 0;
1361        self.startpos = 0;
1362        self.bufsize = 0;
1363    }
1364
1365    /// Strategy 1: fast dispatch: dispatch on each read of about min_reserve bytes.
1366    /// return Err to close socket
1367    pub fn try_read_fast_dispatch<UserCommand>(
1368        &mut self,
1369        ctx: &mut DispatchContext<UserCommand>,
1370        // dispatcher: &mut (impl Reactor<UserCommand = UserCommand> + ?Sized),
1371        dispatcher: &mut impl FnMut(
1372            &mut [u8],
1373            usize,
1374            usize,
1375            &mut DispatchContext<UserCommand>,
1376        ) -> Result<MessageResult>,
1377    ) -> Result<()> {
1378        loop {
1379            debug_assert!(
1380                self.decoded_msgsize == 0 || self.decoded_msgsize > self.bufsize - self.startpos
1381            ); // msg size is not decoded, or have not received expected msg size.
1382
1383            if self.bufsize + self.min_reserve > self.recv_buffer.len() {
1384                self.recv_buffer.resize(
1385                    std::cmp::max(self.bufsize + self.min_reserve, self.recv_buffer.len() * 2),
1386                    0,
1387                ); // double buffer size
1388            }
1389            match ctx.sock.read(&mut self.recv_buffer[self.bufsize..]) {
1390                std::io::Result::Ok(new_bytes) => {
1391                    if new_bytes == 0 {
1392                        return Err("Peer closed sock".to_owned());
1393                    }
1394                    debug_assert!(self.bufsize + new_bytes <= self.recv_buffer.len());
1395
1396                    self.bufsize += new_bytes;
1397                    let should_return = self.bufsize < self.recv_buffer.len(); // not full, no need to retry this time.
1398
1399                    self.try_dispatch_all(new_bytes, ctx, dispatcher)?;
1400                    if should_return {
1401                        return Ok(()); // not full, wait for next readable.
1402                    } else {
1403                        continue; // try next read.
1404                    }
1405                }
1406                std::io::Result::Err(err) => {
1407                    let errkind = err.kind();
1408                    if errkind == ErrorKind::WouldBlock {
1409                        return Ok(()); // wait for next readable.
1410                    } else if errkind == ErrorKind::ConnectionReset {
1411                        return Err("Sock reset".to_owned());
1412                    } else if errkind == ErrorKind::Interrupted {
1413                        logtrace!("[WARN] sock Interrupted : {:?}. retry", ctx.sock);
1414                        return Ok(()); // Interrupted is not an error.
1415                    } else if errkind == ErrorKind::ConnectionAborted {
1416                        return Err("Sock ConnectionAborted".to_owned()); // closed by remote (windows)
1417                    }
1418                    return Err(format!("[ERROR]: Read on sock error: {err:?}"));
1419                }
1420            }
1421        }
1422    }
1423
1424    /// Read until WOULDBLOCK.
1425    /// return Err to close socket
1426    pub fn try_read_all<UserCommand>(
1427        &mut self,
1428        ctx: &mut DispatchContext<UserCommand>,
1429    ) -> Result<()> {
1430        loop {
1431            if self.bufsize + self.min_reserve > self.recv_buffer.len() {
1432                self.recv_buffer.resize(
1433                    std::cmp::max(self.bufsize + self.min_reserve, self.recv_buffer.len() * 2),
1434                    0,
1435                ); // double buffer size
1436            }
1437            match ctx.sock.read(&mut self.recv_buffer[self.bufsize..]) {
1438                std::io::Result::Ok(new_bytes) => {
1439                    if new_bytes == 0 {
1440                        return Err("Peer closed sock".to_owned());
1441                    }
1442                    debug_assert!(self.bufsize + new_bytes <= self.recv_buffer.len());
1443
1444                    self.bufsize += new_bytes;
1445                    if self.bufsize < self.recv_buffer.len() {
1446                        // not full, no need to retry this time.
1447                        return Ok(());
1448                    }
1449                }
1450                std::io::Result::Err(err) => {
1451                    let errkind = err.kind();
1452                    if errkind == ErrorKind::WouldBlock {
1453                        return Ok(()); // wait for next readable.
1454                    } else if errkind == ErrorKind::ConnectionReset {
1455                        return Err("Sock ConnectionReset".to_owned());
1456                    // socket closed
1457                    } else if errkind == ErrorKind::Interrupted {
1458                        logtrace!("[WARN] sock Interrupted : {:?}. retry", ctx.sock);
1459                        return Ok(()); // Interrupted is not an error.
1460                    } else if errkind == ErrorKind::ConnectionAborted {
1461                        return Err("sock ConnectionAborted".to_owned()); // closed by remote (windows)
1462                    }
1463                    return Err(format!("[ERROR]: Read on sock error: {err:?}"));
1464                }
1465            } // match
1466        } // loop
1467    }
1468
1469    /// try dispatch all messages in buffer.
1470    /// return Error if there's any error or close.
1471    pub fn try_dispatch_all<UserCommand>(
1472        &mut self,
1473        new_bytes: usize,
1474        ctx: &mut DispatchContext<UserCommand>,
1475        // dispatcher: &mut (impl Reactor<UserCommand = UserCommand> + ?Sized),
1476        dispatcher: &mut impl FnMut(
1477            &mut [u8],
1478            usize,
1479            usize,
1480            &mut DispatchContext<UserCommand>,
1481        ) -> Result<MessageResult>,
1482    ) -> Result<()> {
1483        let mut new_bytes = new_bytes;
1484        // loop while: buf_not_empty and ( partial_header or partial_msg )
1485        while self.startpos < self.bufsize
1486            && (self.decoded_msgsize == 0 || self.startpos + self.decoded_msgsize <= self.bufsize)
1487        {
1488            match dispatcher(
1489                &mut self.recv_buffer[self.startpos..self.bufsize],
1490                new_bytes,
1491                self.decoded_msgsize,
1492                ctx,
1493            ) {
1494                Err(err) => {
1495                    self.clear(); // user requested close. clear read buffer.
1496                    return Err(err);
1497                }
1498                Ok(res) => {
1499                    match res {
1500                        MessageResult::ExpectMsgSize(msgsize) => {
1501                            if !(msgsize == 0 || msgsize > self.bufsize - self.startpos) {
1502                                logerr!( "[WARN] on_inbound_message should NOT expect a msgsize while full message is already received, which may cause recursive call. msgsize:{msgsize:?} recved: {}",
1503                            self.bufsize - self.startpos);
1504                                debug_assert!(
1505                                    false,
1506                                    "on_inbound_message expects an already full message."
1507                                );
1508                            }
1509                            self.decoded_msgsize = msgsize; // could be 0 if msg size is unknown.
1510                            break; // read more in next round.
1511                        }
1512                        MessageResult::DropMsgSize(msgsize) => {
1513                            assert!(msgsize > 0 && msgsize <= self.bufsize - self.startpos); // drop size should not exceed buffer size.
1514                            self.startpos += msgsize;
1515                            self.decoded_msgsize = 0;
1516                            new_bytes = self.bufsize - self.startpos;
1517                        }
1518                    }
1519                }
1520            }
1521        }
1522        if self.startpos != 0 {
1523            // move front
1524            self.recv_buffer.copy_within(self.startpos..self.bufsize, 0); // don't resize.
1525            self.bufsize -= self.startpos;
1526            self.startpos = 0;
1527        }
1528        Ok(())
1529    }
1530
1531    /// Strategy 2: fast read: read all until WOULDBLOCK, then dispatch all.
1532    pub fn try_read_fast_read<UserCommand>(
1533        &mut self,
1534        ctx: &mut DispatchContext<UserCommand>,
1535        // dispatcher: &mut (impl Reactor<UserCommand = UserCommand> + ?Sized),
1536        dispatcher: &mut impl FnMut(
1537            &mut [u8],
1538            usize,
1539            usize,
1540            &mut DispatchContext<UserCommand>,
1541        ) -> Result<MessageResult>,
1542    ) -> Result<()> {
1543        let old_bytes = self.bufsize - self.startpos;
1544        let res = self.try_read_all(ctx);
1545        let res2 = self.try_dispatch_all(self.bufsize - self.startpos - old_bytes, ctx, dispatcher);
1546        res?;
1547        res2?;
1548        Ok(())
1549    }
1550}
1551
1552//====================================================================================
1553//         Default TcpListenerHandler
1554//====================================================================================
1555
1556/// NewServerReactor is used by TcpListenerHandler to create a new reactor when accept a new socket.
1557pub trait NewServerReactor: Reactor {
1558    /// The parameter to create NewServerReactor
1559    type InitServerParam: Clone;
1560
1561    /// * count - It's the number of Reactors that DefaultTcpListenerHandler has created, starting from 1.
1562    fn new_server_reactor(count: usize, param: Self::InitServerParam) -> Self;
1563}
1564pub struct DefaultTcpListenerHandler<NewReactor: NewServerReactor + 'static> {
1565    pub reactorid: ReactorID,
1566    count_children: usize,
1567    server_param: <NewReactor as NewServerReactor>::InitServerParam,
1568    recv_buffer_min_size: usize,
1569    _phantom: PhantomData<NewReactor>,
1570}
1571
1572//---------------------- !NewServerReactor ----------------
1573
1574impl<NewReactor: NewServerReactor + 'static> DefaultTcpListenerHandler<NewReactor> {
1575    pub fn new(
1576        recv_buffer_min_size: usize,
1577        param: <NewReactor as NewServerReactor>::InitServerParam,
1578    ) -> Self {
1579        Self {
1580            reactorid: INVALID_REACTOR_ID,
1581            count_children: 0,
1582            server_param: param,
1583            recv_buffer_min_size,
1584            _phantom: PhantomData,
1585        }
1586    }
1587}
1588
1589impl<NewReactor: NewServerReactor + 'static> TcpListenerHandler
1590    for DefaultTcpListenerHandler<NewReactor>
1591{
1592    type UserCommand = <NewReactor as Reactor>::UserCommand;
1593
1594    fn on_start_listen(
1595        &mut self,
1596        reactorid: ReactorID,
1597        _cmd_sender: &CmdSender<Self::UserCommand>,
1598    ) {
1599        self.reactorid = reactorid;
1600    }
1601    fn on_new_connection(
1602        &mut self,
1603        _conn: &mut std::net::TcpListener,
1604        _new_conn: &mut std::net::TcpStream,
1605    ) -> Option<NewStreamConnection<Self::UserCommand>> {
1606        self.count_children += 1;
1607        Some(NewStreamConnection {
1608            reactor: Box::new(NewReactor::new_server_reactor(
1609                self.count_children,
1610                self.server_param.clone(),
1611            )),
1612            recv_buffer_min_size: self.recv_buffer_min_size,
1613        })
1614    }
1615}
1616
1617//====================================================================================
1618//            SimpleIoReactor, SimpleIoListener
1619//====================================================================================
1620
1621pub type SimpleIoRuntime = ReactRuntime<()>;
1622pub type SimpleIoReactorContext<'a> = DispatchContext<'a, ()>;
1623pub type DynIoReactor = dyn Reactor<UserCommand = ()>;
1624
1625type OnConnectedHandler<AppData> = dyn FnMut(
1626    &mut SimpleIoReactorContext<'_>,
1627    ReactorID, // parent listener reactorid.
1628    &mut AppData,
1629) -> Result<()>;
1630
1631type OnClosedHandler<AppData> = dyn FnMut(ReactorID, &CmdSender<()>, &mut AppData);
1632
1633type OnSockMsgHandler<AppData> =
1634    dyn FnMut(&mut [u8], &mut SimpleIoReactorContext<'_>, &mut AppData) -> Result<usize>;
1635
1636enum DecodeResult<DecodedInfo> {
1637    /// Unknown MsgSize (partial header). Continue to call decoder next time.
1638    UnknownMsgSize,
1639    /// decoded header, waiting for framework to read content then dispatch full message together with buffer.
1640    MsgSize(usize, DecodedInfo),
1641}
1642/// Decode inbound message. SockMsgDecoder(buf: &mut [u8], new_bytes) -> Result<DecodeResult<DecodedInfo>>;
1643type SockMsgDecoder<DecodedInfo> = dyn FnMut(&mut [u8], usize) -> Result<DecodeResult<DecodedInfo>>;
1644
1645fn null_msg_decoder(buf: &mut [u8], _new_bytes: usize) -> Result<DecodeResult<()>> {
1646    Ok(DecodeResult::MsgSize(buf.len(), ()))
1647}
1648
1649/// `SimpleIoReactor` doesn't have `UserCommand`. User supplies callback functions to handle inbound socket messages and on_connected/on_close events.
1650/// On each readable socket event, the MsgReader reads all data and call on_sock_msg_handler to dispatch message.
1651///
1652/// **Note that SimpleIoReactor can only be used with SimpleIoRuntime
1653pub struct SimpleIoReactor<AppData> {
1654    app_data: AppData,
1655    on_connected_handler: Option<Box<OnConnectedHandler<AppData>>>,
1656    on_closed_handler: Option<Box<OnClosedHandler<AppData>>>,
1657    /// msg_handler returns Err to close socket. else, returns Ok(dropMsgSize);
1658    on_sock_msg_handler: Box<OnSockMsgHandler<AppData>>,
1659}
1660impl<AppData: 'static> SimpleIoReactor<AppData> {
1661    pub fn new(
1662        app_data: AppData,
1663        on_connected_handler: Option<Box<OnConnectedHandler<AppData>>>,
1664        on_closed_handler: Option<Box<OnClosedHandler<AppData>>>,
1665        on_sock_msg_handler: impl FnMut(&mut [u8], &mut SimpleIoReactorContext<'_>, &mut AppData) -> Result<usize>
1666            + 'static,
1667    ) -> Self {
1668        Self {
1669            app_data,
1670            on_connected_handler,
1671            on_closed_handler,
1672            on_sock_msg_handler: Box::new(on_sock_msg_handler),
1673        }
1674    }
1675    pub fn new_boxed(
1676        app_data: AppData,
1677        on_connected_handler: Option<Box<OnConnectedHandler<AppData>>>,
1678        on_closed_handler: Option<Box<OnClosedHandler<AppData>>>,
1679        on_sock_msg_handler: impl FnMut(&mut [u8], &mut SimpleIoReactorContext<'_>, &mut AppData) -> Result<usize>
1680            + 'static,
1681    ) -> Box<dyn Reactor<UserCommand = ()>> {
1682        Box::new(Self::new(
1683            app_data,
1684            on_connected_handler,
1685            on_closed_handler,
1686            on_sock_msg_handler,
1687        ))
1688    }
1689}
1690impl<AppData> Reactor for SimpleIoReactor<AppData> {
1691    type UserCommand = ();
1692
1693    fn on_inbound_message(
1694        &mut self,
1695        buf: &mut [u8],
1696        _new_bytes: usize,
1697        _decoded_msg_size: usize,
1698        ctx: &mut DispatchContext<Self::UserCommand>,
1699    ) -> Result<MessageResult> {
1700        let drop_msg_size = (self.on_sock_msg_handler)(buf, ctx, &mut self.app_data)?;
1701        Ok(MessageResult::DropMsgSize(drop_msg_size)) // drop all all messages.
1702    }
1703    fn on_connected(
1704        &mut self,
1705        ctx: &mut DispatchContext<Self::UserCommand>,
1706        listener: ReactorID,
1707    ) -> Result<()> {
1708        if let Some(ref mut h) = self.on_connected_handler {
1709            return (h)(ctx, listener, &mut self.app_data);
1710        }
1711        Ok(()) // accept the connection by default.
1712    }
1713    fn on_close(&mut self, reactorid: ReactorID, cmd_sender: &CmdSender<Self::UserCommand>) {
1714        if let Some(ref mut h) = self.on_closed_handler {
1715            (h)(reactorid, cmd_sender, &mut self.app_data)
1716        }
1717    }
1718}
1719
1720/// `SimpleIoListener` implements TcpListenerHandler. When receiving new connection, it creates SimpleIoReactor using user specified reactor_creator.
1721pub struct SimpleIoListener {
1722    count_children: usize,
1723    reactorid: ReactorID,
1724    recv_buffer_min_size: usize,
1725    reactor_creator: Box<dyn FnMut(usize) -> Option<Box<DynIoReactor>>>, // call reactor_creator(children_count) to create SimpleIoReactor
1726}
1727impl SimpleIoListener {
1728    pub fn new(
1729        recv_buffer_min_size: usize,
1730        reactor_creator: impl FnMut(usize) -> Option<Box<DynIoReactor>> + 'static,
1731    ) -> Self {
1732        Self {
1733            count_children: 0,
1734            reactorid: INVALID_REACTOR_ID,
1735            recv_buffer_min_size,
1736            reactor_creator: Box::new(reactor_creator),
1737        }
1738    }
1739
1740    pub fn new_with_io_service<AppData: 'static>(service: SimpleIoService<AppData>) -> Self {
1741        Self {
1742            count_children: 0,
1743            reactorid: INVALID_REACTOR_ID,
1744            recv_buffer_min_size: 0, // per socket recv buffer is not used.
1745            reactor_creator: Box::new(move |_| Some(Box::new(service.clone()))),
1746        }
1747    }
1748}
1749impl TcpListenerHandler for SimpleIoListener {
1750    type UserCommand = ();
1751
1752    fn on_start_listen(
1753        &mut self,
1754        reactorid: ReactorID,
1755        _cmd_sender: &CmdSender<Self::UserCommand>,
1756    ) {
1757        self.reactorid = reactorid;
1758    }
1759    fn on_new_connection(
1760        &mut self,
1761        _conn: &mut std::net::TcpListener,
1762        _new_conn: &mut std::net::TcpStream,
1763    ) -> Option<NewStreamConnection<Self::UserCommand>> {
1764        self.count_children += 1;
1765        (self.reactor_creator)(self.count_children).map(|reactor| NewStreamConnection {
1766            reactor,
1767            recv_buffer_min_size: self.recv_buffer_min_size,
1768        })
1769    }
1770}
1771
1772//====================================================================================
1773//            SimpleIoService: serves multiple socks per instance.
1774//====================================================================================
1775
1776/// A SimpleIoService instance serves multiple sockets, which diffs from SimpleIoReactor/SimpleIoListener that serves a socket per instance.
1777/// See `test_io_service`
1778pub struct SimpleIoService<AppData> {
1779    inner: std::rc::Rc<std::cell::RefCell<IoServiceInner<AppData>>>,
1780}
1781
1782pub struct IoServiceInner<AppData> {
1783    stream_reactor: SimpleIoReactor<AppData>,
1784    msg_reader: MsgReader, // do not use the runtime provided reader (each socket has a reader). use this shared one.
1785}
1786
1787impl<AppData> IoServiceInner<AppData> {
1788    // The outer call this function to override default impl.
1789    fn on_readable(&mut self, ctx: &mut ReactorReableContext<()>) -> Result<()> {
1790        self.msg_reader.try_read_fast_read(
1791            &mut DispatchContext {
1792                reactorid: ctx.reactorid,
1793                sock: ctx.sock,
1794                sender: ctx.sender,
1795                cmd_sender: ctx.cmd_sender,
1796            },
1797            &mut |buf, new_bytes, decoded_msg_size, ctx| {
1798                self.stream_reactor
1799                    .on_inbound_message(buf, new_bytes, decoded_msg_size, ctx)
1800            },
1801        )
1802    }
1803}
1804impl<AppData> Clone for SimpleIoService<AppData> {
1805    fn clone(&self) -> Self {
1806        Self {
1807            inner: std::rc::Rc::clone(&self.inner),
1808        }
1809    }
1810}
1811impl<AppData: 'static> SimpleIoService<AppData> {
1812    pub fn new(
1813        recv_buf_min_size: usize,
1814        app_data: AppData,
1815        on_connected_handler: Option<Box<OnConnectedHandler<AppData>>>,
1816        on_closed_handler: Option<Box<OnClosedHandler<AppData>>>,
1817        on_sock_msg_handler: impl FnMut(&mut [u8], &mut SimpleIoReactorContext<'_>, &mut AppData) -> Result<usize>
1818            + 'static,
1819    ) -> Self {
1820        Self {
1821            inner: std::rc::Rc::new(std::cell::RefCell::new(IoServiceInner::<AppData> {
1822                stream_reactor: SimpleIoReactor::<AppData>::new(
1823                    app_data,
1824                    on_connected_handler,
1825                    on_closed_handler,
1826                    on_sock_msg_handler,
1827                ),
1828                msg_reader: MsgReader::new(recv_buf_min_size),
1829            })),
1830        }
1831    }
1832
1833    pub fn new_boxed(
1834        recv_buf_min_size: usize,
1835        app_data: AppData,
1836        on_connected_handler: Option<Box<OnConnectedHandler<AppData>>>,
1837        on_closed_handler: Option<Box<OnClosedHandler<AppData>>>,
1838        on_sock_msg_handler: impl FnMut(&mut [u8], &mut SimpleIoReactorContext<'_>, &mut AppData) -> Result<usize>
1839            + 'static,
1840    ) -> Box<dyn Reactor<UserCommand = ()>> {
1841        Box::new(Self::new(
1842            recv_buf_min_size,
1843            app_data,
1844            on_connected_handler,
1845            on_closed_handler,
1846            on_sock_msg_handler,
1847        ))
1848    }
1849
1850    /// call func(app_data) if current service is borrowed. else return Err()
1851    pub fn apply_app_data(&self, func: impl FnOnce(&AppData)) -> Result<()> {
1852        if let Ok(v) = self.inner.try_borrow() {
1853            func(&v.stream_reactor.app_data);
1854            return Ok(());
1855        }
1856        Err("Unable to borrow SimpleIoService".to_owned())
1857    }
1858    pub fn apply_app_data_mut(&self, func: impl FnOnce(&mut AppData)) -> Result<()> {
1859        if let Ok(mut v) = self.inner.try_borrow_mut() {
1860            func(&mut v.stream_reactor.app_data);
1861            return Ok(());
1862        }
1863        Err("Unable to borrow SimpleIoService".to_owned())
1864    }
1865}
1866
1867impl<AppData> Reactor for SimpleIoService<AppData> {
1868    type UserCommand = ();
1869
1870    fn on_inbound_message(
1871        &mut self,
1872        _buf: &mut [u8],
1873        _new_bytes: usize,
1874        _decoded_msg_size: usize,
1875        _ctx: &mut DispatchContext<Self::UserCommand>,
1876    ) -> Result<MessageResult> {
1877        panic!("IoServiceInner handles on_inbound_message. this function should not be called!");
1878    }
1879    /// Override the default implemention. Use shared MsgReader.
1880    fn on_readable(&mut self, ctx: &mut ReactorReableContext<Self::UserCommand>) -> Result<()> {
1881        self.inner.borrow_mut().on_readable(ctx)
1882    }
1883
1884    fn on_connected(
1885        &mut self,
1886        ctx: &mut DispatchContext<Self::UserCommand>,
1887        listener: ReactorID,
1888    ) -> Result<()> {
1889        self.inner
1890            .borrow_mut()
1891            .stream_reactor
1892            .on_connected(ctx, listener)
1893    }
1894    fn on_close(&mut self, reactorid: ReactorID, cmd_sender: &CmdSender<Self::UserCommand>) {
1895        self.inner
1896            .borrow_mut()
1897            .stream_reactor
1898            .on_close(reactorid, cmd_sender);
1899    }
1900}
1901
1902#[cfg(test)]
1903mod tests {
1904
1905    static EMPTY_COMPLETION_FUNC: fn() = || {};
1906    fn is_empty_function(_fun: &(dyn Fn() + 'static)) -> Option<Box<dyn Fn() + 'static>> {
1907        // if std::ptr::eq(fun, &EMPTY_COMPLETION_FUNC as &dyn Fn()) {
1908        // return None;
1909
1910        // }
1911        None
1912        // Some(Box::new((*fun).clone())) // No way to clone Fn()
1913    }
1914
1915    #[test]
1916    pub fn test_compare_function() {
1917        assert!(is_empty_function(&EMPTY_COMPLETION_FUNC).is_none());
1918    }
1919}