1use crate::dbglog;
2use crate::flat_storage::FlatStorage;
3use crate::utils;
4use crate::{logerr, logtrace};
5use core::panic;
6use polling::{Event, Events, PollMode, Poller};
7use std::io::{ErrorKind, Read, Write};
8use std::time::Duration;
9use std::{marker::PhantomData, net::TcpStream};
10
11pub trait Reactor {
21 type UserCommand;
22
23 fn on_connected(
29 &mut self,
30 _ctx: &mut DispatchContext<Self::UserCommand>,
31 _listener: ReactorID,
32 ) -> Result<()> {
33 Ok(()) }
35
36 fn on_inbound_message(
44 &mut self,
45 buf: &mut [u8],
46 new_bytes: usize,
47 decoded_msg_size: usize,
48 ctx: &mut DispatchContext<Self::UserCommand>,
49 ) -> Result<MessageResult>;
50
51 fn on_readable(&mut self, ctx: &mut ReactorReableContext<Self::UserCommand>) -> Result<()> {
56 ctx.reader.try_read_fast_read(
57 &mut DispatchContext {
58 reactorid: ctx.reactorid,
59 sock: ctx.sock,
60 sender: ctx.sender,
61 cmd_sender: ctx.cmd_sender,
62 },
63 &mut |buf, new_bytes, decoded_msg_size, ctx| {
64 self.on_inbound_message(buf, new_bytes, decoded_msg_size, ctx)
65 },
66 )
67 }
68
69 fn on_command(
72 &mut self,
73 _cmd: Self::UserCommand,
74 ctx: &mut DispatchContext<Self::UserCommand>,
75 ) -> Result<()> {
76 panic!("Please impl on_command for reactorid: {}", ctx.reactorid);
77 }
78
79 fn on_close(&mut self, _reactorid: ReactorID, _cmd_sender: &CmdSender<Self::UserCommand>) {
82 }
84}
85
86pub type Result<T> = std::result::Result<T, String>;
87
88pub struct DispatchContext<'a, UserCommand> {
90 pub reactorid: ReactorID,
91 pub sock: &'a mut std::net::TcpStream,
92 pub sender: &'a mut MsgSender, pub cmd_sender: &'a CmdSender<UserCommand>,
94}
95impl<'a, UserCommand> DispatchContext<'a, UserCommand> {
96 fn from(data: &'a mut SockData, cmd_sender: &'a CmdSender<UserCommand>) -> Self {
97 Self {
98 reactorid: data.reactorid,
99 sock: &mut data.sock,
100 sender: &mut data.sender,
101 cmd_sender,
102 }
103 }
104 pub fn send_no_que(&mut self, msg: &[u8]) -> std::io::Result<usize> {
107 MsgSender::try_send_all(self.sock, msg)
108 }
109 pub fn send_or_que(&mut self, msg: &[u8]) -> Result<SendOrQueResult> {
110 self.sender.send_or_que(self.sock, msg, None)
111 }
112 pub fn acquire_send(&mut self) -> AutoSendBuffer<'_> {
114 let old_buf_size = self.sender.buf.len();
115 AutoSendBuffer {
116 sender: self.sender,
117 sock: self.sock,
118 old_buf_size,
119 }
120 }
121}
122
123pub enum MessageResult {
125 ExpectMsgSize(usize),
129 DropMsgSize(usize),
131}
132
133pub enum Deferred {
135 Immediate,
136 UtilTime(std::time::SystemTime),
137}
138pub type CommandCompletion = Result<ReactorID>;
140
141pub struct CmdSender<UserCommand>(std::sync::mpsc::Sender<CmdData<UserCommand>>);
144unsafe impl<UserCommand> Send for CmdSender<UserCommand> {}
146
147impl<UserCommand> Clone for CmdSender<UserCommand> {
148 fn clone(&self) -> Self {
149 Self(self.0.clone())
150 }
151}
152impl<UserCommand> CmdSender<UserCommand> {
153 pub fn send_connect<AReactor: Reactor<UserCommand = UserCommand> + 'static>(
160 &self,
161 remote_addr: &str,
162 recv_buffer_min_size: usize,
163 reactor: AReactor,
164 deferred: Deferred,
165 completion: impl FnOnce(CommandCompletion) + 'static,
166 ) -> Result<()> {
167 self.send_cmd(
168 INVALID_REACTOR_ID,
169 SysCommand::NewConnect(
170 Box::new(reactor),
171 remote_addr.to_owned(),
172 recv_buffer_min_size,
173 ),
174 deferred,
175 completion,
176 )
177 }
178 pub fn send_listen<AReactor: TcpListenerHandler<UserCommand = UserCommand> + 'static>(
180 &self,
181 local_addr: &str,
182 reactor: AReactor,
183 deferred: Deferred,
184 completion: impl FnOnce(CommandCompletion) + 'static,
185 ) -> Result<()> {
186 self.send_cmd(
187 INVALID_REACTOR_ID,
188 SysCommand::NewListen(Box::new(reactor), local_addr.to_owned()),
189 deferred,
190 completion,
191 )
192 }
193
194 pub fn send_close(
196 &self,
197 reactorid: ReactorID,
198 deferred: Deferred,
199 completion: impl FnOnce(CommandCompletion) + 'static,
200 ) -> Result<()> {
201 self.send_cmd(reactorid, SysCommand::CloseSocket, deferred, completion)
202 }
203
204 pub fn send_user_cmd(
209 &self,
210 reactorid: ReactorID,
211 cmd: UserCommand,
212 deferred: Deferred,
213 completion: impl FnOnce(CommandCompletion) + 'static,
214 ) -> Result<()> {
215 self.send_cmd(reactorid, SysCommand::UserCmd(cmd), deferred, completion)
216 }
217
218 fn send_cmd(
219 &self,
220 reactorid: ReactorID,
221 cmd: SysCommand<UserCommand>,
222 deferred: Deferred,
223 completion: impl FnOnce(CommandCompletion) + 'static,
224 ) -> Result<()> {
225 match &cmd {
227 SysCommand::NewListen(_, _) | SysCommand::NewConnect(_, _, _) => {
228 if reactorid != INVALID_REACTOR_ID {
229 return Err(
230 "reactorid msut be INVALID_REACTOR_ID if NewConnect/NewListen".to_owned(),
231 );
232 }
233 }
234 SysCommand::UserCmd(_) => {
235 if reactorid == INVALID_REACTOR_ID {
236 return Err("UserCmd must has a valid reactorid.".to_owned());
237 }
238 }
240 _ => {}
241 }
242 if self
243 .0
244 .send(CmdData::<UserCommand> {
245 reactorid,
246 cmd,
247 deferred,
248 completion: Box::new(completion),
249 })
250 .is_err()
251 {
252 return Err("Failed to send. Receiver disconnected.".to_owned());
253 }
254 Ok(())
255 }
256}
257
258pub struct ReactorReableContext<'a, UserCommand> {
260 pub reactorid: ReactorID, pub sock: &'a mut std::net::TcpStream, pub sender: &'a mut MsgSender, pub reader: &'a mut MsgReader, pub cmd_sender: &'a CmdSender<UserCommand>, }
266pub trait TcpListenerHandler {
269 type UserCommand;
270
271 fn on_start_listen(
273 &mut self,
274 _reactorid: ReactorID,
275 _cmd_sender: &CmdSender<Self::UserCommand>,
276 ) {
277 }
278
279 fn on_new_connection(
281 &mut self,
282 sock: &mut std::net::TcpListener,
283 new_sock: &mut std::net::TcpStream,
284 ) -> Option<NewStreamConnection<Self::UserCommand>>;
285
286 fn on_close_listen(
287 &mut self,
288 _reactorid: ReactorID,
289 _cmd_sender: &CmdSender<Self::UserCommand>,
290 ) {
291 }
292}
293pub struct NewStreamConnection<UserCommand> {
294 pub reactor: Box<dyn Reactor<UserCommand = UserCommand>>,
295 pub recv_buffer_min_size: usize,
296}
297
298pub struct ReactRuntime<UserCommand> {
307 mgr: ReactorMgr<UserCommand>,
308 deferred_data: FlatStorage<CmdData<UserCommand>>,
309 deferred_heap: Vec<DeferredKey>, sock_events: Events, accum_sock_events: usize, accum_commands: usize, }
314
315#[derive(Copy, Clone)]
316struct DeferredKey {
317 millis: i64,
318 data: usize,
319}
320impl DeferredKey {
321 fn get_key(&self) -> i64 {
322 self.millis
323 }
324}
325
326fn min_heap_push(v: &mut [DeferredKey]) {
328 let mut k = v.len() - 1; if k == 0 {
330 return;
331 }
332 let mut parent = (k - 1) / 2;
333 while k > 0 && v[k].get_key() < v[parent].get_key() {
334 v.swap(k, parent);
335 k = parent;
336 parent = (k - 1) / 2;
337 }
338}
339fn min_heap_pop(v: &mut [DeferredKey]) {
341 let mut k = 0;
342 let value = v[0];
343 while k < v.len() - 1 {
344 let (l, r) = ((k + 1) * 2 - 1, (k + 1) * 2);
345 let min = if r < v.len() - 1 {
346 if v[l].get_key() < v[r].get_key() {
347 l
348 } else {
349 r
350 }
351 } else if l < v.len() - 1 {
352 l
353 } else {
354 break;
355 };
356 v.swap(min, k);
357 k = min;
358 }
359 v[v.len() - 1] = value;
360}
361
362struct ReactorMgr<UserCommand> {
364 socket_handlers: FlatStorage<TcpSocketHandler<UserCommand>>,
365 poller: Poller,
366 count_streams: usize, cmd_recv: std::sync::mpsc::Receiver<CmdData<UserCommand>>,
368 cmd_sender: CmdSender<UserCommand>,
369}
370
371enum TcpSocketHandler<UserCommand> {
372 ListenerType(
373 ReactorID,
374 std::net::TcpListener,
375 Box<dyn TcpListenerHandler<UserCommand = UserCommand>>,
376 ), StreamType(SockData, Box<dyn Reactor<UserCommand = UserCommand>>),
378}
379struct SockData {
380 pub reactorid: ReactorID,
381 pub sock: std::net::TcpStream,
382 pub sender: MsgSender,
383 pub reader: MsgReader,
384 interested_writable: bool,
385}
386impl<'a, UserCommand> ReactorReableContext<'a, UserCommand> {
387 fn from(data: &'a mut SockData, cmd_sender: &'a CmdSender<UserCommand>) -> Self {
388 Self {
389 reactorid: data.reactorid,
390 sock: &mut data.sock,
391 sender: &mut data.sender,
392 reader: &mut data.reader,
393 cmd_sender,
394 }
395 }
396}
397
398#[cfg(target_pointer_width = "64")]
399type HalfUsize = u32;
400#[cfg(target_pointer_width = "32")]
401type HalfUSize = u16;
402
403#[derive(Debug, Clone, Copy, PartialEq, Eq)]
404pub struct ReactorID {
405 sockslot: HalfUsize, ver: HalfUsize, }
408
409pub const INVALID_REACTOR_ID: ReactorID = ReactorID {
410 sockslot: HalfUsize::MAX,
411 ver: HalfUsize::MAX,
412};
413
414impl ReactorID {
415 pub fn to_usize(&self) -> usize {
417 let halfbits = std::mem::size_of::<usize>() * 8 / 2;
418 ((self.ver as usize) << halfbits) | (self.sockslot as usize)
419 }
420 pub fn from_usize(val: usize) -> Self {
422 let halfbits = std::mem::size_of::<usize>() * 8 / 2;
423 Self {
424 sockslot: val as HalfUsize,
425 ver: (val >> halfbits) as HalfUsize,
426 }
427 }
428}
429impl std::fmt::Display for ReactorID {
430 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431 write!(f, "{}:{}", self.sockslot, self.ver)
432 }
433}
434
435impl<UserCommand> ReactorMgr<UserCommand> {
436 fn new() -> Self {
437 let (cmd_sender, cmd_recv) = std::sync::mpsc::channel::<CmdData<UserCommand>>();
438 Self {
439 socket_handlers: FlatStorage::new(),
440 poller: Poller::new().unwrap(),
441 count_streams: 0,
442 cmd_sender: CmdSender(cmd_sender),
443 cmd_recv,
444 }
445 }
446 fn len(&self) -> usize {
448 self.socket_handlers.len()
449 }
450 fn add_stream(
451 &mut self,
452 recv_buffer_min_size: usize,
453 sock: std::net::TcpStream,
454 handler: Box<dyn Reactor<UserCommand = UserCommand>>,
455 ) -> ReactorID {
456 let key = self.socket_handlers.add(TcpSocketHandler::StreamType(
457 SockData {
458 reactorid: INVALID_REACTOR_ID,
459 sock,
460 sender: MsgSender::new(),
461 reader: MsgReader::new(recv_buffer_min_size),
462 interested_writable: false,
463 },
464 handler,
465 ));
466 let reactorid = ReactorID {
467 sockslot: key as HalfUsize,
468 ver: self.socket_handlers.len() as HalfUsize,
469 };
470 self.count_streams += 1;
471 if let TcpSocketHandler::StreamType(sockdata, ref mut _handler) =
472 self.socket_handlers.get_mut(key).unwrap()
473 {
474 sockdata.reactorid = reactorid;
475 unsafe {
476 self.poller
477 .add_with_mode(
478 &sockdata.sock,
479 Event::readable(reactorid.to_usize()),
480 PollMode::Level,
481 )
482 .unwrap();
483 }
484 logtrace!(
485 "Added TcpStream reactorid: {}, sock: {:?}",
486 sockdata.reactorid,
487 sockdata.sock
488 );
489 return sockdata.reactorid;
490 }
491 panic!("ERROR! Failed to get new added sockdata!");
492 }
493
494 fn add_listener(
495 &mut self,
496 sock: std::net::TcpListener,
497 handler: Box<dyn TcpListenerHandler<UserCommand = UserCommand>>,
498 ) -> ReactorID {
499 let key = self.socket_handlers.add(TcpSocketHandler::ListenerType(
500 INVALID_REACTOR_ID,
501 sock,
502 handler,
503 ));
504 let reactorid = ReactorID {
505 sockslot: key as HalfUsize,
506 ver: self.socket_handlers.len() as HalfUsize,
507 };
508 if let TcpSocketHandler::ListenerType(areactorid, ref sock, _) =
509 self.socket_handlers.get_mut(key).unwrap()
510 {
511 *areactorid = reactorid;
512 unsafe {
514 self.poller
515 .add_with_mode(sock, Event::readable(reactorid.to_usize()), PollMode::Level)
516 .unwrap();
517 }
518 logtrace!(
519 "Added TcpListener reactorid: {}, sock: {:?}",
520 reactorid,
521 sock
522 );
523 }
524 reactorid
525 }
526
527 fn close_reactor(&mut self, reactorid: ReactorID) -> bool {
530 if let Some(sockhandler) = self.socket_handlers.remove(reactorid.sockslot as usize) {
531 match sockhandler {
532 TcpSocketHandler::StreamType(sockdata, mut reactor) => {
533 debug_assert_eq!(reactorid, sockdata.reactorid);
534 logtrace!(
535 "removing reactorid: {}, sock: {:?}, pending_read_bytes: {}, pending_send_bytes: {}",
536 reactorid,
537 sockdata.sock,
538 sockdata.reader.bytes_in_buffer(),
539 sockdata.sender.buf.len()
540 );
541 self.count_streams -= 1;
542 self.poller.delete(&sockdata.sock).unwrap();
543 (reactor).on_close(reactorid, &self.cmd_sender);
544 }
545 TcpSocketHandler::ListenerType(areactorid, sock, mut reactor) => {
546 debug_assert_eq!(reactorid, areactorid);
547 logtrace!("removing reactorid: {}, sock: {:?}", reactorid, sock);
548 self.poller.delete(&sock).unwrap();
549 (reactor).on_close_listen(reactorid, &self.cmd_sender);
550 }
551 }
552 return true;
553 }
554 false
555 }
556
557 fn start_listen(
559 &mut self,
560 local_addr: &str,
561 handler: Box<dyn TcpListenerHandler<UserCommand = UserCommand>>,
562 ) -> std::io::Result<ReactorID> {
563 let socket = std::net::TcpListener::bind(local_addr)?;
564 socket.set_nonblocking(true)?;
565 let reactorid = self.add_listener(socket, handler);
566 if let TcpSocketHandler::ListenerType(_, _, ref mut handler) = self
567 .socket_handlers
568 .get_mut(reactorid.sockslot as usize)
569 .unwrap()
570 {
571 handler.on_start_listen(reactorid, &self.cmd_sender);
572 return std::io::Result::Ok(reactorid);
573 }
574 std::io::Result::Ok(INVALID_REACTOR_ID)
575 }
576
577 fn start_connect(
578 &mut self,
579 remote_addr: &str,
580 recv_buffer_min_size: usize,
581 handler: Box<dyn Reactor<UserCommand = UserCommand>>,
582 ) -> std::io::Result<ReactorID> {
583 let socket = TcpStream::connect(remote_addr)?;
584 socket.set_nonblocking(true)?; let reactorid = self.add_stream(recv_buffer_min_size, socket, handler);
586 if let TcpSocketHandler::StreamType(ref mut sockdata, ref mut handler) = self
587 .socket_handlers
588 .get_mut(reactorid.sockslot as usize)
589 .unwrap()
590 {
591 if handler
592 .on_connected(
593 &mut DispatchContext::from(sockdata, &self.cmd_sender),
594 INVALID_REACTOR_ID,
595 )
596 .is_ok()
597 {
598 return std::io::Result::Ok(reactorid);
599 }
600 }
601 self.close_reactor(reactorid);
602 std::io::Result::Ok(INVALID_REACTOR_ID)
603 }
604}
605
606impl<UserCommand> Default for ReactRuntime<UserCommand> {
607 fn default() -> Self {
608 Self::new()
609 }
610}
611impl<UserCommand> ReactRuntime<UserCommand> {
612 pub fn new() -> Self {
613 Self {
614 mgr: ReactorMgr::new(),
615 deferred_data: FlatStorage::new(),
616 deferred_heap: Vec::new(),
617 sock_events: Events::new(),
618 accum_sock_events: 0,
619 accum_commands: 0,
620 }
621 }
622 pub fn process_events(&mut self) -> bool {
626 self.process_events_with(1, 32)
627 }
628 pub fn process_events_with(&mut self, sock_timeout_millis: u64, max_commands: usize) -> bool {
629 let sock_events = self.process_sock_events(sock_timeout_millis);
630 self.accum_sock_events += sock_events;
631 self.process_deferred_queue();
632 let cmds = self.process_command_queue(max_commands);
633 self.accum_commands += cmds;
634 sock_events > 0 || cmds > 0 || !self.deferred_heap.is_empty() || self.mgr.len() > 0
635 }
636 pub fn count_reactors(&self) -> usize {
638 self.mgr.len()
639 }
640 pub fn count_deferred_queue(&self) -> usize {
643 self.deferred_data.len()
644 }
645 pub fn count_streams(&self) -> usize {
647 self.mgr.count_streams
648 }
649 pub fn count_sock_events(&self) -> usize {
651 self.accum_sock_events
652 }
653 pub fn count_received_commands(&self) -> usize {
655 self.accum_commands
656 }
657 pub fn get_cmd_sender(&self) -> &CmdSender<UserCommand> {
659 &self.mgr.cmd_sender
660 }
661
662 pub fn process_sock_events(&mut self, timeout_millis: u64) -> usize {
664 self.sock_events.clear();
665 self.mgr
666 .poller
667 .wait(
668 &mut self.sock_events,
669 Some(Duration::from_millis(timeout_millis)),
670 )
671 .unwrap(); for ev in self.sock_events.iter() {
674 let mut removesock = false;
675 let current_reactorid = ReactorID::from_usize(ev.key);
676 let mut new_connection_to_add = None;
677 if let Some(sockhandler) = self
678 .mgr
679 .socket_handlers
680 .get_mut(current_reactorid.sockslot as usize)
681 {
682 match sockhandler {
683 TcpSocketHandler::ListenerType(reactorid, sock, handler) => {
684 debug_assert_eq!(current_reactorid, *reactorid);
685 if ev.readable {
686 let (mut newsock, _) = sock.accept().unwrap();
687 if let Some(new_stream_connection) =
688 handler.on_new_connection(sock, &mut newsock)
689 {
690 newsock.set_nonblocking(true).unwrap();
691 new_connection_to_add = Some((
692 newsock,
693 new_stream_connection.reactor,
694 new_stream_connection.recv_buffer_min_size,
695 ));
696 }
697 }
699 if ev.writable {
700 logerr!("writable listener sock!");
701 removesock = true;
702 }
703 }
704 TcpSocketHandler::StreamType(ref mut ctx, ref mut handler) => {
705 debug_assert_eq!(current_reactorid, ctx.reactorid);
706 if ev.writable {
707 if !ctx.interested_writable {
708 dbglog!("WARN: unsolicited writable sock: {:?}", ctx.sock);
709 }
710 ctx.interested_writable = true; if !ctx.sender.buf.is_empty() {
712 if let Err(err) = ctx.sender.send_queued(&mut ctx.sock) {
713 logtrace!("{err} send_queued failed.");
714 removesock = true;
715 }
716 }
717 }
718 if ev.readable {
719 if let Err(err) = handler.on_readable(&mut ReactorReableContext::from(
720 ctx,
721 &self.mgr.cmd_sender,
722 )) {
723 if !err.is_empty() {
724 logtrace!("on_readable requested close current_reactorid: {current_reactorid}, sock: {:?}. Reason: {}", ctx.sock, err);
725 }
726 removesock = true;
727 }
728 }
729 if ctx.sender.close_or_error {
730 removesock = true;
731 }
732 if !removesock {
734 if !ctx.interested_writable && !ctx.sender.buf.is_empty() {
735 self.mgr
736 .poller
737 .modify_with_mode(
738 &ctx.sock,
739 Event::all(ev.key),
740 PollMode::Level,
741 )
742 .unwrap();
743 ctx.interested_writable = true;
744 } else if ctx.interested_writable && ctx.sender.buf.is_empty() {
745 self.mgr
746 .poller
747 .modify_with_mode(
748 &ctx.sock,
749 Event::readable(ev.key),
750 PollMode::Level,
751 )
752 .unwrap();
753 ctx.interested_writable = false;
754 }
755 }
756 }
757 }
758 } else {
759 dbglog!("[ERROR] socket key has been removed {}!", current_reactorid);
760 continue;
761 }
762
763 if let Some((newsock, newhandler, recv_buffer_min_size)) = new_connection_to_add {
764 let newreactorid_to_close = {
765 let newreactorid =
766 self.mgr
767 .add_stream(recv_buffer_min_size, newsock, newhandler);
768 if let TcpSocketHandler::StreamType(ref mut newsockdata, ref mut newhandler) =
769 self.mgr
770 .socket_handlers
771 .get_mut(newreactorid.sockslot as usize)
772 .unwrap()
773 {
774 match newhandler.on_connected(
775 &mut DispatchContext::from(newsockdata, &self.mgr.cmd_sender),
776 current_reactorid,
777 ) {
778 Ok(_) => INVALID_REACTOR_ID, Err(err) => {
780 if !err.is_empty() {
781 logtrace!("Reject new connection for listener_reactorid: {}. Reason: {}", current_reactorid, err);
782 }
783 newsockdata.reactorid }
785 }
786 } else {
787 panic!("Failed to find new added stream!");
788 }
789 };
790 if newreactorid_to_close != INVALID_REACTOR_ID {
791 self.mgr.close_reactor(newreactorid_to_close);
792 }
793
794 continue;
795 }
796
797 if removesock {
798 self.mgr.close_reactor(current_reactorid);
799 continue; }
801 if ev.is_err().unwrap_or(false) {
802 logerr!("WARN: socket error key: {}", current_reactorid);
803 removesock = true;
804 }
805 if ev.is_interrupt() {
806 logerr!("WARN: socket interrupt key: {}", current_reactorid);
807 removesock = true;
808 }
809 if removesock {
810 self.mgr.close_reactor(current_reactorid);
811 }
812 }
813
814 self.sock_events.len()
815 }
816
817 pub fn process_command_queue(&mut self, max_commands: usize) -> usize {
819 let mut count_cmd = 0usize;
820 for _ in 0..max_commands {
822 let cmddata: CmdData<UserCommand> = match self.mgr.cmd_recv.try_recv() {
823 Err(err) => {
824 if err == std::sync::mpsc::TryRecvError::Empty {
825 return count_cmd;
826 } else {
827 panic!("std::sync::mpsc::TryRecvError::Disconnected is not possible. Because both cmd_sender & cmd_recv are saved.");
828 }
829 }
830 Ok(data) => data,
831 };
832 count_cmd += 1;
833
834 match cmddata.deferred {
835 Deferred::Immediate => {}
836 Deferred::UtilTime(time) => {
837 let millis = time
838 .duration_since(std::time::SystemTime::UNIX_EPOCH)
839 .unwrap()
840 .as_millis() as i64;
841 if !ReactRuntime::<UserCommand>::is_deferred_current(millis) {
842 let key = self.deferred_data.add(cmddata);
844 self.deferred_heap.push(DeferredKey { millis, data: key });
845 min_heap_push(&mut self.deferred_heap);
846 continue; }
848 }
849 }
850 self.execute_immediate_cmd(cmddata);
851 } count_cmd
853 }
854
855 pub fn process_deferred_queue(&mut self) -> usize {
857 let mut cmds = 0;
858 while !self.deferred_heap.is_empty()
859 && ReactRuntime::<UserCommand>::is_deferred_current(self.deferred_heap[0].millis)
860 {
861 let key = self.deferred_heap[0].data;
862 min_heap_pop(&mut self.deferred_heap);
863 self.deferred_heap.pop();
864 cmds += 1;
865 if let Some(cmddata) = self.deferred_data.remove(key) {
866 self.execute_immediate_cmd(cmddata);
867 } else {
868 panic!("No deferred CommandData with key: {}", key);
869 }
870 }
871 cmds
872 }
873
874 fn is_deferred_current(millis: i64) -> bool {
877 let now_nanos = utils::now_nanos();
878 millis * 1000000 + 5 * 100000 <= now_nanos
879 }
880
881 fn execute_immediate_cmd(&mut self, cmddata: CmdData<UserCommand>) {
882 let mut reactorid_to_close: ReactorID = INVALID_REACTOR_ID;
883 match cmddata.cmd {
884 SysCommand::NewConnect(reactor, remote_addr, recv_buffer_min_size) => {
885 match self
886 .mgr
887 .start_connect(&remote_addr, recv_buffer_min_size, reactor)
888 {
889 Err(err) => {
890 let errmsg =
891 format!("Failed to connect to {}. Error: {}", remote_addr, err);
892 (cmddata.completion)(Err(errmsg));
893 }
894 Ok(key) => {
895 (cmddata.completion)(Ok(key));
896 }
897 }
898 }
899 SysCommand::NewListen(reactor, local_addr) => {
900 match self.mgr.start_listen(&local_addr, reactor) {
901 Err(err) => {
902 let errmsg = format!("Failed to listen on {}. Error: {}", local_addr, err);
903 (cmddata.completion)(Err(errmsg));
904 }
905 Ok(key) => {
906 (cmddata.completion)(Ok(key));
907 }
908 }
909 }
910 SysCommand::CloseSocket => {
911 if self.mgr.close_reactor(cmddata.reactorid) {
912 (cmddata.completion)(Ok(cmddata.reactorid));
913 } else {
914 (cmddata.completion)(Err(format!(
915 "Failed to remove non existing socket with reactorid: {}",
916 cmddata.reactorid
917 )));
918 }
919 }
920 SysCommand::UserCmd(usercmd) => {
921 if cmddata.reactorid == INVALID_REACTOR_ID {
922 panic!("UserCommand must be executed on a reactor!");
923 } else if let Some(handler) = self
924 .mgr
925 .socket_handlers
926 .get_mut(cmddata.reactorid.sockslot as usize)
927 {
928 match handler {
929 TcpSocketHandler::ListenerType(reactorid, _, _) => {
930 (cmddata.completion)(Err(format!(
931 "Listener cannot receive user command. cmd reactorid: {}, reactorid: {}",
932 cmddata.reactorid, *reactorid
933 )));
934 }
935 TcpSocketHandler::StreamType(ctx, reactor) => {
936 if cmddata.reactorid != ctx.reactorid {
937 (cmddata.completion)(Err(format!(
938 "Failed to execute user command with wrong cmd reactorid: {}, found: {}",
939 cmddata.reactorid , ctx.reactorid
940 )));
941 } else {
942 let res = (reactor).on_command(
943 usercmd,
944 &mut DispatchContext {
945 reactorid: cmddata.reactorid,
946 sock: &mut ctx.sock,
947 sender: &mut ctx.sender,
948 cmd_sender: &self.mgr.cmd_sender,
949 },
950 );
951 (cmddata.completion)(Ok(cmddata.reactorid));
952 if let Err(err) = res {
953 logtrace!(
954 "on_command requested closing reactorid: {}. {}",
955 cmddata.reactorid,
956 err
957 );
958 reactorid_to_close = cmddata.reactorid;
959 }
960 }
961 }
962 }
963 } else {
964 (cmddata.completion)(Err(format!(
965 "Failed to execute user command on non existing socket with reactorid: {}",
966 cmddata.reactorid
967 )));
968 }
969 }
970 } if reactorid_to_close != INVALID_REACTOR_ID {
973 self.mgr.close_reactor(reactorid_to_close);
974 }
975 }
976}
977
978enum SysCommand<UserCommand> {
983 NewConnect(
985 Box<dyn Reactor<UserCommand = UserCommand>>,
986 String, usize, ),
989 NewListen(
990 Box<dyn TcpListenerHandler<UserCommand = UserCommand>>,
991 String, ), CloseSocket,
994 UserCmd(UserCommand),
995}
996
997struct CmdData<UserCommand> {
998 reactorid: ReactorID,
999 cmd: SysCommand<UserCommand>,
1000 deferred: Deferred,
1001 completion: Box<dyn FnOnce(CommandCompletion)>,
1002}
1003unsafe impl<UserCommand> Send for CmdData<UserCommand> {}
1004
1005pub struct MsgSender {
1016 pub buf: Vec<u8>,
1017 pub pending: FlatStorage<PendingSend>, first_pending_id: usize, last_pending_id: usize, pub bytes_sent: usize, close_or_error: bool,
1022}
1023pub struct PendingSend {
1025 next_id: usize, startpos: usize, msgsize: usize,
1028 completion: Box<dyn FnOnce()>, }
1030
1031#[derive(PartialEq, Eq)]
1033pub enum SendOrQueResult {
1034 Complete,
1036 InQueue,
1038}
1039impl Default for MsgSender {
1040 fn default() -> Self {
1041 Self::new()
1042 }
1043}
1044impl MsgSender {
1045 pub fn new() -> Self {
1046 Self {
1047 buf: Vec::new(),
1048 pending: FlatStorage::new(),
1049 first_pending_id: usize::MAX, last_pending_id: usize::MAX,
1051 bytes_sent: 0,
1052 close_or_error: false,
1053 }
1054 }
1055
1056 pub fn try_send_all(sock: &mut std::net::TcpStream, buf: &[u8]) -> std::io::Result<usize> {
1059 if buf.is_empty() {
1060 return Ok(0);
1061 }
1062 let mut buf = buf;
1063 let mut sentbytes = 0;
1064 loop {
1065 match sock.write(buf) {
1066 std::io::Result::Ok(bytes) => {
1067 if bytes < buf.len() {
1068 buf = &buf[bytes..];
1069 sentbytes += bytes; } else {
1071 return Ok(sentbytes + bytes); }
1073 }
1074 std::io::Result::Err(err) => {
1075 let errkind = err.kind();
1076 if errkind == ErrorKind::WouldBlock {
1077 return Ok(sentbytes); } else if errkind == ErrorKind::ConnectionReset {
1079 logtrace!("sock reset : {sock:?}. close socket");
1080 return Err(err);
1081 } else if errkind == ErrorKind::Interrupted {
1083 logtrace!("[WARN] sock Interrupted : {sock:?}. retry");
1084 return Err(err); } else {
1086 logtrace!("[ERROR]: write on sock {sock:?}, error: {err:?}");
1087 return Err(err);
1088 }
1089 }
1090 }
1091 }
1092 }
1093
1094 pub fn send_or_que(
1098 &mut self,
1099 sock: &mut std::net::TcpStream,
1100 buf: &[u8],
1101 send_completion: Option<Box<dyn FnOnce()>>,
1102 ) -> Result<SendOrQueResult> {
1103 if buf.is_empty() {
1105 if let Some(callback) = send_completion {
1106 (callback)();
1107 }
1108 return Ok(SendOrQueResult::Complete);
1109 }
1110 if !self.buf.is_empty() {
1111 self.buf.extend_from_slice(buf);
1112 self.queue_msg_completion(buf.len(), send_completion);
1113 return Ok(SendOrQueResult::InQueue);
1114 }
1115 debug_assert_eq!(self.bytes_sent, 0);
1117 debug_assert_eq!(self.first_pending_id, usize::MAX);
1118 debug_assert_eq!(self.last_pending_id, usize::MAX);
1119 debug_assert_eq!(self.pending.len(), 0);
1120
1121 let sentbytes = match MsgSender::try_send_all(sock, buf) {
1122 Err(err) => {
1123 return Err(err.to_string());
1124 }
1125 Ok(bytes) => bytes,
1126 };
1127
1128 if sentbytes == buf.len() {
1129 if let Some(callback) = send_completion {
1130 (callback)();
1131 }
1132 return Ok(SendOrQueResult::Complete); }
1134 self.buf.extend_from_slice(&buf[sentbytes..]);
1136 self.queue_msg_completion(buf.len() - sentbytes, send_completion);
1137 Ok(SendOrQueResult::InQueue)
1138 }
1139
1140 fn queue_msg_completion(
1142 &mut self,
1143 queued_size: usize,
1144 send_completion: Option<Box<dyn FnOnce()>>,
1145 ) {
1146 if let Some(callback) = send_completion {
1147 let prev_id = self.last_pending_id;
1149 self.last_pending_id = self.pending.add(PendingSend {
1150 next_id: usize::MAX,
1151 startpos: self.bytes_sent + self.buf.len() - queued_size,
1152 msgsize: queued_size,
1153 completion: callback,
1154 });
1155 if let Some(prev) = self.pending.get_mut(prev_id) {
1156 prev.next_id = self.last_pending_id;
1157 }
1158 if self.first_pending_id == usize::MAX {
1159 self.first_pending_id = self.last_pending_id;
1161 }
1162 }
1163 }
1164
1165 #[allow(unused_assignments)]
1167 fn send_queued(&mut self, sock: &mut std::net::TcpStream) -> Result<SendOrQueResult> {
1168 if self.buf.is_empty() {
1169 return Ok(SendOrQueResult::Complete);
1170 }
1171 let mut sentbytes = 0;
1172 match sock.write(&self.buf[..]) {
1173 std::io::Result::Ok(bytes) => {
1174 sentbytes = bytes;
1175 if bytes == 0 {
1176 self.close_or_error = true;
1177 return Err(format!("[ERROR] write sock 0 bytes {sock:?}. close socket"));
1178 }
1179 }
1180 std::io::Result::Err(err) => {
1181 let errkind = err.kind();
1182 if errkind == ErrorKind::WouldBlock {
1183 return Ok(SendOrQueResult::InQueue); } else if errkind == ErrorKind::ConnectionReset {
1185 self.close_or_error = true;
1186 return Err(format!(
1187 "[ERROR] Write sock ConnectionReset {sock:?}. close socket"
1188 ));
1189 } else if errkind == ErrorKind::Interrupted {
1191 logtrace!("[WARN] sock Interrupted : {sock:?}. retry");
1192 return Ok(SendOrQueResult::InQueue); } else {
1194 self.close_or_error = true;
1195 return Err(format!("[ERROR]: write on sock {sock:?}, error: {err:?}"));
1196 }
1197 }
1198 }
1199 while self.first_pending_id != usize::MAX {
1201 let id = self.first_pending_id;
1202 let (mut sent, mut next_id) = (false, 0);
1203 if let Some(pending) = self.pending.get_mut(id) {
1204 if pending.startpos + pending.msgsize <= self.bytes_sent {
1205 sent = true;
1207 next_id = pending.next_id;
1208 } else {
1209 pending.msgsize -= self.bytes_sent - pending.startpos;
1211 pending.startpos = self.bytes_sent;
1212 break;
1213 }
1214 } else {
1215 panic!("invalid id");
1216 }
1217 if sent {
1218 self.first_pending_id = next_id;
1219 if let Some(pending) = self.pending.remove(id) {
1220 (pending.completion)();
1221 }
1222 }
1223 }
1224 if self.first_pending_id == usize::MAX {
1225 self.last_pending_id = usize::MAX;
1227 }
1228 Ok(self.move_buf_front_after_send(sentbytes))
1229 }
1230 fn move_buf_front_after_send(&mut self, sentbytes: usize) -> SendOrQueResult {
1232 let len = self.buf.len();
1234 self.buf.copy_within(sentbytes..len, 0);
1235 self.buf.resize(len - sentbytes, 0);
1236 if self.buf.is_empty() {
1237 debug_assert_eq!(self.first_pending_id, usize::MAX);
1239 debug_assert_eq!(self.last_pending_id, usize::MAX);
1240 debug_assert_eq!(self.pending.len(), 0);
1241 self.bytes_sent = 0;
1242 SendOrQueResult::Complete
1243 } else {
1244 self.bytes_sent += sentbytes;
1245 SendOrQueResult::InQueue
1246 }
1247 }
1248}
1249
1250pub struct AutoSendBuffer<'sender> {
1251 sender: &'sender mut MsgSender,
1252 sock: &'sender mut std::net::TcpStream,
1253 old_buf_size: usize,
1254}
1255impl AutoSendBuffer<'_> {
1256 pub fn clear(&mut self) {
1258 self.sender.buf.resize(self.old_buf_size, 0);
1259 }
1260 pub fn count_written(&self) -> usize {
1261 self.sender.buf.len() - self.old_buf_size
1262 }
1263 pub fn get_written(&self) -> &[u8] {
1265 &self.sender.buf[self.old_buf_size..]
1266 }
1267 pub fn send(
1268 &mut self,
1269 send_completion: Option<Box<dyn FnOnce()>>,
1270 ) -> std::io::Result<SendOrQueResult> {
1271 let buf = &self.sender.buf[self.old_buf_size..];
1272 let buf_len = buf.len();
1273 if buf.is_empty() {
1274 if let Some(callback) = send_completion {
1275 (callback)();
1276 }
1277 self.old_buf_size = self.sender.buf.len();
1278 return Ok(SendOrQueResult::Complete);
1279 }
1280 if self.old_buf_size > 0 {
1281 self.sender.queue_msg_completion(buf.len(), send_completion);
1282 self.old_buf_size = self.sender.buf.len();
1283 return Ok(SendOrQueResult::InQueue);
1284 }
1285
1286 let sentbytes = match MsgSender::try_send_all(self.sock, buf) {
1287 Err(err) => {
1288 self.sender.close_or_error = true;
1289 self.old_buf_size = self.sender.buf.len();
1290 return Err(err);
1291 }
1292 Ok(bytes) => bytes,
1293 };
1294
1295 if sentbytes > 0 {
1296 self.sender.move_buf_front_after_send(sentbytes);
1297 }
1298
1299 if sentbytes == buf_len {
1300 if let Some(callback) = send_completion {
1301 (callback)();
1302 }
1303 self.old_buf_size = self.sender.buf.len();
1304 return Ok(SendOrQueResult::Complete); }
1306 self.sender
1308 .queue_msg_completion(self.sender.buf.len(), send_completion);
1309 self.old_buf_size = self.sender.buf.len();
1310 Ok(SendOrQueResult::InQueue)
1311 }
1312}
1313impl Drop for AutoSendBuffer<'_> {
1314 fn drop(&mut self) {
1315 self.send(None).unwrap(); }
1317}
1318impl std::io::Write for AutoSendBuffer<'_> {
1319 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1320 self.sender.buf.extend_from_slice(buf);
1321 Ok(buf.len())
1322 }
1323 fn flush(&mut self) -> std::io::Result<()> {
1324 self.send(None)?;
1325 Ok(())
1326 }
1327}
1328
1329pub struct MsgReader {
1336 recv_buffer: Vec<u8>,
1337 min_reserve: usize, startpos: usize, bufsize: usize, decoded_msgsize: usize, }
1342
1343impl MsgReader {
1344 pub fn new(min_reserved_bytes: usize) -> Self {
1345 Self {
1346 recv_buffer: vec![0u8; min_reserved_bytes],
1348 min_reserve: min_reserved_bytes,
1349 startpos: 0,
1350 bufsize: 0,
1351 decoded_msgsize: 0,
1352 }
1353 }
1354
1355 pub fn bytes_in_buffer(&self) -> usize {
1356 self.bufsize - self.startpos
1357 }
1358
1359 pub fn clear(&mut self) {
1360 self.decoded_msgsize = 0;
1361 self.startpos = 0;
1362 self.bufsize = 0;
1363 }
1364
1365 pub fn try_read_fast_dispatch<UserCommand>(
1368 &mut self,
1369 ctx: &mut DispatchContext<UserCommand>,
1370 dispatcher: &mut impl FnMut(
1372 &mut [u8],
1373 usize,
1374 usize,
1375 &mut DispatchContext<UserCommand>,
1376 ) -> Result<MessageResult>,
1377 ) -> Result<()> {
1378 loop {
1379 debug_assert!(
1380 self.decoded_msgsize == 0 || self.decoded_msgsize > self.bufsize - self.startpos
1381 ); if self.bufsize + self.min_reserve > self.recv_buffer.len() {
1384 self.recv_buffer.resize(
1385 std::cmp::max(self.bufsize + self.min_reserve, self.recv_buffer.len() * 2),
1386 0,
1387 ); }
1389 match ctx.sock.read(&mut self.recv_buffer[self.bufsize..]) {
1390 std::io::Result::Ok(new_bytes) => {
1391 if new_bytes == 0 {
1392 return Err("Peer closed sock".to_owned());
1393 }
1394 debug_assert!(self.bufsize + new_bytes <= self.recv_buffer.len());
1395
1396 self.bufsize += new_bytes;
1397 let should_return = self.bufsize < self.recv_buffer.len(); self.try_dispatch_all(new_bytes, ctx, dispatcher)?;
1400 if should_return {
1401 return Ok(()); } else {
1403 continue; }
1405 }
1406 std::io::Result::Err(err) => {
1407 let errkind = err.kind();
1408 if errkind == ErrorKind::WouldBlock {
1409 return Ok(()); } else if errkind == ErrorKind::ConnectionReset {
1411 return Err("Sock reset".to_owned());
1412 } else if errkind == ErrorKind::Interrupted {
1413 logtrace!("[WARN] sock Interrupted : {:?}. retry", ctx.sock);
1414 return Ok(()); } else if errkind == ErrorKind::ConnectionAborted {
1416 return Err("Sock ConnectionAborted".to_owned()); }
1418 return Err(format!("[ERROR]: Read on sock error: {err:?}"));
1419 }
1420 }
1421 }
1422 }
1423
1424 pub fn try_read_all<UserCommand>(
1427 &mut self,
1428 ctx: &mut DispatchContext<UserCommand>,
1429 ) -> Result<()> {
1430 loop {
1431 if self.bufsize + self.min_reserve > self.recv_buffer.len() {
1432 self.recv_buffer.resize(
1433 std::cmp::max(self.bufsize + self.min_reserve, self.recv_buffer.len() * 2),
1434 0,
1435 ); }
1437 match ctx.sock.read(&mut self.recv_buffer[self.bufsize..]) {
1438 std::io::Result::Ok(new_bytes) => {
1439 if new_bytes == 0 {
1440 return Err("Peer closed sock".to_owned());
1441 }
1442 debug_assert!(self.bufsize + new_bytes <= self.recv_buffer.len());
1443
1444 self.bufsize += new_bytes;
1445 if self.bufsize < self.recv_buffer.len() {
1446 return Ok(());
1448 }
1449 }
1450 std::io::Result::Err(err) => {
1451 let errkind = err.kind();
1452 if errkind == ErrorKind::WouldBlock {
1453 return Ok(()); } else if errkind == ErrorKind::ConnectionReset {
1455 return Err("Sock ConnectionReset".to_owned());
1456 } else if errkind == ErrorKind::Interrupted {
1458 logtrace!("[WARN] sock Interrupted : {:?}. retry", ctx.sock);
1459 return Ok(()); } else if errkind == ErrorKind::ConnectionAborted {
1461 return Err("sock ConnectionAborted".to_owned()); }
1463 return Err(format!("[ERROR]: Read on sock error: {err:?}"));
1464 }
1465 } } }
1468
1469 pub fn try_dispatch_all<UserCommand>(
1472 &mut self,
1473 new_bytes: usize,
1474 ctx: &mut DispatchContext<UserCommand>,
1475 dispatcher: &mut impl FnMut(
1477 &mut [u8],
1478 usize,
1479 usize,
1480 &mut DispatchContext<UserCommand>,
1481 ) -> Result<MessageResult>,
1482 ) -> Result<()> {
1483 let mut new_bytes = new_bytes;
1484 while self.startpos < self.bufsize
1486 && (self.decoded_msgsize == 0 || self.startpos + self.decoded_msgsize <= self.bufsize)
1487 {
1488 match dispatcher(
1489 &mut self.recv_buffer[self.startpos..self.bufsize],
1490 new_bytes,
1491 self.decoded_msgsize,
1492 ctx,
1493 ) {
1494 Err(err) => {
1495 self.clear(); return Err(err);
1497 }
1498 Ok(res) => {
1499 match res {
1500 MessageResult::ExpectMsgSize(msgsize) => {
1501 if !(msgsize == 0 || msgsize > self.bufsize - self.startpos) {
1502 logerr!( "[WARN] on_inbound_message should NOT expect a msgsize while full message is already received, which may cause recursive call. msgsize:{msgsize:?} recved: {}",
1503 self.bufsize - self.startpos);
1504 debug_assert!(
1505 false,
1506 "on_inbound_message expects an already full message."
1507 );
1508 }
1509 self.decoded_msgsize = msgsize; break; }
1512 MessageResult::DropMsgSize(msgsize) => {
1513 assert!(msgsize > 0 && msgsize <= self.bufsize - self.startpos); self.startpos += msgsize;
1515 self.decoded_msgsize = 0;
1516 new_bytes = self.bufsize - self.startpos;
1517 }
1518 }
1519 }
1520 }
1521 }
1522 if self.startpos != 0 {
1523 self.recv_buffer.copy_within(self.startpos..self.bufsize, 0); self.bufsize -= self.startpos;
1526 self.startpos = 0;
1527 }
1528 Ok(())
1529 }
1530
1531 pub fn try_read_fast_read<UserCommand>(
1533 &mut self,
1534 ctx: &mut DispatchContext<UserCommand>,
1535 dispatcher: &mut impl FnMut(
1537 &mut [u8],
1538 usize,
1539 usize,
1540 &mut DispatchContext<UserCommand>,
1541 ) -> Result<MessageResult>,
1542 ) -> Result<()> {
1543 let old_bytes = self.bufsize - self.startpos;
1544 let res = self.try_read_all(ctx);
1545 let res2 = self.try_dispatch_all(self.bufsize - self.startpos - old_bytes, ctx, dispatcher);
1546 res?;
1547 res2?;
1548 Ok(())
1549 }
1550}
1551
1552pub trait NewServerReactor: Reactor {
1558 type InitServerParam: Clone;
1560
1561 fn new_server_reactor(count: usize, param: Self::InitServerParam) -> Self;
1563}
1564pub struct DefaultTcpListenerHandler<NewReactor: NewServerReactor + 'static> {
1565 pub reactorid: ReactorID,
1566 count_children: usize,
1567 server_param: <NewReactor as NewServerReactor>::InitServerParam,
1568 recv_buffer_min_size: usize,
1569 _phantom: PhantomData<NewReactor>,
1570}
1571
1572impl<NewReactor: NewServerReactor + 'static> DefaultTcpListenerHandler<NewReactor> {
1575 pub fn new(
1576 recv_buffer_min_size: usize,
1577 param: <NewReactor as NewServerReactor>::InitServerParam,
1578 ) -> Self {
1579 Self {
1580 reactorid: INVALID_REACTOR_ID,
1581 count_children: 0,
1582 server_param: param,
1583 recv_buffer_min_size,
1584 _phantom: PhantomData,
1585 }
1586 }
1587}
1588
1589impl<NewReactor: NewServerReactor + 'static> TcpListenerHandler
1590 for DefaultTcpListenerHandler<NewReactor>
1591{
1592 type UserCommand = <NewReactor as Reactor>::UserCommand;
1593
1594 fn on_start_listen(
1595 &mut self,
1596 reactorid: ReactorID,
1597 _cmd_sender: &CmdSender<Self::UserCommand>,
1598 ) {
1599 self.reactorid = reactorid;
1600 }
1601 fn on_new_connection(
1602 &mut self,
1603 _conn: &mut std::net::TcpListener,
1604 _new_conn: &mut std::net::TcpStream,
1605 ) -> Option<NewStreamConnection<Self::UserCommand>> {
1606 self.count_children += 1;
1607 Some(NewStreamConnection {
1608 reactor: Box::new(NewReactor::new_server_reactor(
1609 self.count_children,
1610 self.server_param.clone(),
1611 )),
1612 recv_buffer_min_size: self.recv_buffer_min_size,
1613 })
1614 }
1615}
1616
1617pub type SimpleIoRuntime = ReactRuntime<()>;
1622pub type SimpleIoReactorContext<'a> = DispatchContext<'a, ()>;
1623pub type DynIoReactor = dyn Reactor<UserCommand = ()>;
1624
1625type OnConnectedHandler<AppData> = dyn FnMut(
1626 &mut SimpleIoReactorContext<'_>,
1627 ReactorID, &mut AppData,
1629) -> Result<()>;
1630
1631type OnClosedHandler<AppData> = dyn FnMut(ReactorID, &CmdSender<()>, &mut AppData);
1632
1633type OnSockMsgHandler<AppData> =
1634 dyn FnMut(&mut [u8], &mut SimpleIoReactorContext<'_>, &mut AppData) -> Result<usize>;
1635
1636enum DecodeResult<DecodedInfo> {
1637 UnknownMsgSize,
1639 MsgSize(usize, DecodedInfo),
1641}
1642type SockMsgDecoder<DecodedInfo> = dyn FnMut(&mut [u8], usize) -> Result<DecodeResult<DecodedInfo>>;
1644
1645fn null_msg_decoder(buf: &mut [u8], _new_bytes: usize) -> Result<DecodeResult<()>> {
1646 Ok(DecodeResult::MsgSize(buf.len(), ()))
1647}
1648
1649pub struct SimpleIoReactor<AppData> {
1654 app_data: AppData,
1655 on_connected_handler: Option<Box<OnConnectedHandler<AppData>>>,
1656 on_closed_handler: Option<Box<OnClosedHandler<AppData>>>,
1657 on_sock_msg_handler: Box<OnSockMsgHandler<AppData>>,
1659}
1660impl<AppData: 'static> SimpleIoReactor<AppData> {
1661 pub fn new(
1662 app_data: AppData,
1663 on_connected_handler: Option<Box<OnConnectedHandler<AppData>>>,
1664 on_closed_handler: Option<Box<OnClosedHandler<AppData>>>,
1665 on_sock_msg_handler: impl FnMut(&mut [u8], &mut SimpleIoReactorContext<'_>, &mut AppData) -> Result<usize>
1666 + 'static,
1667 ) -> Self {
1668 Self {
1669 app_data,
1670 on_connected_handler,
1671 on_closed_handler,
1672 on_sock_msg_handler: Box::new(on_sock_msg_handler),
1673 }
1674 }
1675 pub fn new_boxed(
1676 app_data: AppData,
1677 on_connected_handler: Option<Box<OnConnectedHandler<AppData>>>,
1678 on_closed_handler: Option<Box<OnClosedHandler<AppData>>>,
1679 on_sock_msg_handler: impl FnMut(&mut [u8], &mut SimpleIoReactorContext<'_>, &mut AppData) -> Result<usize>
1680 + 'static,
1681 ) -> Box<dyn Reactor<UserCommand = ()>> {
1682 Box::new(Self::new(
1683 app_data,
1684 on_connected_handler,
1685 on_closed_handler,
1686 on_sock_msg_handler,
1687 ))
1688 }
1689}
1690impl<AppData> Reactor for SimpleIoReactor<AppData> {
1691 type UserCommand = ();
1692
1693 fn on_inbound_message(
1694 &mut self,
1695 buf: &mut [u8],
1696 _new_bytes: usize,
1697 _decoded_msg_size: usize,
1698 ctx: &mut DispatchContext<Self::UserCommand>,
1699 ) -> Result<MessageResult> {
1700 let drop_msg_size = (self.on_sock_msg_handler)(buf, ctx, &mut self.app_data)?;
1701 Ok(MessageResult::DropMsgSize(drop_msg_size)) }
1703 fn on_connected(
1704 &mut self,
1705 ctx: &mut DispatchContext<Self::UserCommand>,
1706 listener: ReactorID,
1707 ) -> Result<()> {
1708 if let Some(ref mut h) = self.on_connected_handler {
1709 return (h)(ctx, listener, &mut self.app_data);
1710 }
1711 Ok(()) }
1713 fn on_close(&mut self, reactorid: ReactorID, cmd_sender: &CmdSender<Self::UserCommand>) {
1714 if let Some(ref mut h) = self.on_closed_handler {
1715 (h)(reactorid, cmd_sender, &mut self.app_data)
1716 }
1717 }
1718}
1719
1720pub struct SimpleIoListener {
1722 count_children: usize,
1723 reactorid: ReactorID,
1724 recv_buffer_min_size: usize,
1725 reactor_creator: Box<dyn FnMut(usize) -> Option<Box<DynIoReactor>>>, }
1727impl SimpleIoListener {
1728 pub fn new(
1729 recv_buffer_min_size: usize,
1730 reactor_creator: impl FnMut(usize) -> Option<Box<DynIoReactor>> + 'static,
1731 ) -> Self {
1732 Self {
1733 count_children: 0,
1734 reactorid: INVALID_REACTOR_ID,
1735 recv_buffer_min_size,
1736 reactor_creator: Box::new(reactor_creator),
1737 }
1738 }
1739
1740 pub fn new_with_io_service<AppData: 'static>(service: SimpleIoService<AppData>) -> Self {
1741 Self {
1742 count_children: 0,
1743 reactorid: INVALID_REACTOR_ID,
1744 recv_buffer_min_size: 0, reactor_creator: Box::new(move |_| Some(Box::new(service.clone()))),
1746 }
1747 }
1748}
1749impl TcpListenerHandler for SimpleIoListener {
1750 type UserCommand = ();
1751
1752 fn on_start_listen(
1753 &mut self,
1754 reactorid: ReactorID,
1755 _cmd_sender: &CmdSender<Self::UserCommand>,
1756 ) {
1757 self.reactorid = reactorid;
1758 }
1759 fn on_new_connection(
1760 &mut self,
1761 _conn: &mut std::net::TcpListener,
1762 _new_conn: &mut std::net::TcpStream,
1763 ) -> Option<NewStreamConnection<Self::UserCommand>> {
1764 self.count_children += 1;
1765 (self.reactor_creator)(self.count_children).map(|reactor| NewStreamConnection {
1766 reactor,
1767 recv_buffer_min_size: self.recv_buffer_min_size,
1768 })
1769 }
1770}
1771
1772pub struct SimpleIoService<AppData> {
1779 inner: std::rc::Rc<std::cell::RefCell<IoServiceInner<AppData>>>,
1780}
1781
1782pub struct IoServiceInner<AppData> {
1783 stream_reactor: SimpleIoReactor<AppData>,
1784 msg_reader: MsgReader, }
1786
1787impl<AppData> IoServiceInner<AppData> {
1788 fn on_readable(&mut self, ctx: &mut ReactorReableContext<()>) -> Result<()> {
1790 self.msg_reader.try_read_fast_read(
1791 &mut DispatchContext {
1792 reactorid: ctx.reactorid,
1793 sock: ctx.sock,
1794 sender: ctx.sender,
1795 cmd_sender: ctx.cmd_sender,
1796 },
1797 &mut |buf, new_bytes, decoded_msg_size, ctx| {
1798 self.stream_reactor
1799 .on_inbound_message(buf, new_bytes, decoded_msg_size, ctx)
1800 },
1801 )
1802 }
1803}
1804impl<AppData> Clone for SimpleIoService<AppData> {
1805 fn clone(&self) -> Self {
1806 Self {
1807 inner: std::rc::Rc::clone(&self.inner),
1808 }
1809 }
1810}
1811impl<AppData: 'static> SimpleIoService<AppData> {
1812 pub fn new(
1813 recv_buf_min_size: usize,
1814 app_data: AppData,
1815 on_connected_handler: Option<Box<OnConnectedHandler<AppData>>>,
1816 on_closed_handler: Option<Box<OnClosedHandler<AppData>>>,
1817 on_sock_msg_handler: impl FnMut(&mut [u8], &mut SimpleIoReactorContext<'_>, &mut AppData) -> Result<usize>
1818 + 'static,
1819 ) -> Self {
1820 Self {
1821 inner: std::rc::Rc::new(std::cell::RefCell::new(IoServiceInner::<AppData> {
1822 stream_reactor: SimpleIoReactor::<AppData>::new(
1823 app_data,
1824 on_connected_handler,
1825 on_closed_handler,
1826 on_sock_msg_handler,
1827 ),
1828 msg_reader: MsgReader::new(recv_buf_min_size),
1829 })),
1830 }
1831 }
1832
1833 pub fn new_boxed(
1834 recv_buf_min_size: usize,
1835 app_data: AppData,
1836 on_connected_handler: Option<Box<OnConnectedHandler<AppData>>>,
1837 on_closed_handler: Option<Box<OnClosedHandler<AppData>>>,
1838 on_sock_msg_handler: impl FnMut(&mut [u8], &mut SimpleIoReactorContext<'_>, &mut AppData) -> Result<usize>
1839 + 'static,
1840 ) -> Box<dyn Reactor<UserCommand = ()>> {
1841 Box::new(Self::new(
1842 recv_buf_min_size,
1843 app_data,
1844 on_connected_handler,
1845 on_closed_handler,
1846 on_sock_msg_handler,
1847 ))
1848 }
1849
1850 pub fn apply_app_data(&self, func: impl FnOnce(&AppData)) -> Result<()> {
1852 if let Ok(v) = self.inner.try_borrow() {
1853 func(&v.stream_reactor.app_data);
1854 return Ok(());
1855 }
1856 Err("Unable to borrow SimpleIoService".to_owned())
1857 }
1858 pub fn apply_app_data_mut(&self, func: impl FnOnce(&mut AppData)) -> Result<()> {
1859 if let Ok(mut v) = self.inner.try_borrow_mut() {
1860 func(&mut v.stream_reactor.app_data);
1861 return Ok(());
1862 }
1863 Err("Unable to borrow SimpleIoService".to_owned())
1864 }
1865}
1866
1867impl<AppData> Reactor for SimpleIoService<AppData> {
1868 type UserCommand = ();
1869
1870 fn on_inbound_message(
1871 &mut self,
1872 _buf: &mut [u8],
1873 _new_bytes: usize,
1874 _decoded_msg_size: usize,
1875 _ctx: &mut DispatchContext<Self::UserCommand>,
1876 ) -> Result<MessageResult> {
1877 panic!("IoServiceInner handles on_inbound_message. this function should not be called!");
1878 }
1879 fn on_readable(&mut self, ctx: &mut ReactorReableContext<Self::UserCommand>) -> Result<()> {
1881 self.inner.borrow_mut().on_readable(ctx)
1882 }
1883
1884 fn on_connected(
1885 &mut self,
1886 ctx: &mut DispatchContext<Self::UserCommand>,
1887 listener: ReactorID,
1888 ) -> Result<()> {
1889 self.inner
1890 .borrow_mut()
1891 .stream_reactor
1892 .on_connected(ctx, listener)
1893 }
1894 fn on_close(&mut self, reactorid: ReactorID, cmd_sender: &CmdSender<Self::UserCommand>) {
1895 self.inner
1896 .borrow_mut()
1897 .stream_reactor
1898 .on_close(reactorid, cmd_sender);
1899 }
1900}
1901
1902#[cfg(test)]
1903mod tests {
1904
1905 static EMPTY_COMPLETION_FUNC: fn() = || {};
1906 fn is_empty_function(_fun: &(dyn Fn() + 'static)) -> Option<Box<dyn Fn() + 'static>> {
1907 None
1912 }
1914
1915 #[test]
1916 pub fn test_compare_function() {
1917 assert!(is_empty_function(&EMPTY_COMPLETION_FUNC).is_none());
1918 }
1919}