use crate::dbglog;
use crate::flat_storage::FlatStorage;
use crate::utils;
use crate::{logerr, logtrace};
use core::panic;
use polling::{Event, Events, PollMode, Poller};
use std::io::{ErrorKind, Read, Write};
use std::time::Duration;
use std::{marker::PhantomData, net::TcpStream};
pub trait Reactor {
type UserCommand;
fn on_connected(
&mut self,
_ctx: &mut DispatchContext<Self::UserCommand>,
_listener: ReactorID,
) -> Result<()> {
Ok(()) }
fn on_inbound_message(
&mut self,
buf: &mut [u8],
new_bytes: usize,
decoded_msg_size: usize,
ctx: &mut DispatchContext<Self::UserCommand>,
) -> Result<MessageResult>;
fn on_readable(&mut self, ctx: &mut ReactorReableContext<Self::UserCommand>) -> Result<()> {
ctx.reader.try_read_fast_read(
&mut DispatchContext {
reactorid: ctx.reactorid,
sock: ctx.sock,
sender: ctx.sender,
cmd_sender: ctx.cmd_sender,
},
&mut |buf, new_bytes, decoded_msg_size, ctx| {
self.on_inbound_message(buf, new_bytes, decoded_msg_size, ctx)
},
)
}
fn on_command(
&mut self,
_cmd: Self::UserCommand,
ctx: &mut DispatchContext<Self::UserCommand>,
) -> Result<()> {
panic!("Please impl on_command for reactorid: {}", ctx.reactorid);
}
fn on_close(&mut self, _reactorid: ReactorID, _cmd_sender: &CmdSender<Self::UserCommand>) {
}
}
pub type Result<T> = std::result::Result<T, String>;
pub struct DispatchContext<'a, UserCommand> {
pub reactorid: ReactorID,
pub sock: &'a mut std::net::TcpStream,
pub sender: &'a mut MsgSender, pub cmd_sender: &'a CmdSender<UserCommand>,
}
impl<'a, UserCommand> DispatchContext<'a, UserCommand> {
fn from(data: &'a mut SockData, cmd_sender: &'a CmdSender<UserCommand>) -> Self {
Self {
reactorid: data.reactorid,
sock: &mut data.sock,
sender: &mut data.sender,
cmd_sender,
}
}
pub fn send_no_que(&mut self, msg: &[u8]) -> std::io::Result<usize> {
MsgSender::try_send_all(self.sock, msg)
}
pub fn send_or_que(&mut self, msg: &[u8]) -> Result<SendOrQueResult> {
self.sender.send_or_que(self.sock, msg, None)
}
pub fn acquire_send(&mut self) -> AutoSendBuffer<'_> {
let old_buf_size = self.sender.buf.len();
AutoSendBuffer {
sender: self.sender,
sock: self.sock,
old_buf_size,
}
}
}
pub enum MessageResult {
ExpectMsgSize(usize),
DropMsgSize(usize),
}
pub enum Deferred {
Immediate,
UtilTime(std::time::SystemTime),
}
pub type CommandCompletion = Result<ReactorID>;
pub struct CmdSender<UserCommand>(std::sync::mpsc::Sender<CmdData<UserCommand>>);
unsafe impl<UserCommand> Send for CmdSender<UserCommand> {}
impl<UserCommand> Clone for CmdSender<UserCommand> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<UserCommand> CmdSender<UserCommand> {
pub fn send_connect<AReactor: Reactor<UserCommand = UserCommand> + 'static>(
&self,
remote_addr: &str,
recv_buffer_min_size: usize,
reactor: AReactor,
deferred: Deferred,
completion: impl FnOnce(CommandCompletion) + 'static,
) -> Result<()> {
self.send_cmd(
INVALID_REACTOR_ID,
SysCommand::NewConnect(
Box::new(reactor),
remote_addr.to_owned(),
recv_buffer_min_size,
),
deferred,
completion,
)
}
pub fn send_listen<AReactor: TcpListenerHandler<UserCommand = UserCommand> + 'static>(
&self,
local_addr: &str,
reactor: AReactor,
deferred: Deferred,
completion: impl FnOnce(CommandCompletion) + 'static,
) -> Result<()> {
self.send_cmd(
INVALID_REACTOR_ID,
SysCommand::NewListen(Box::new(reactor), local_addr.to_owned()),
deferred,
completion,
)
}
pub fn send_close(
&self,
reactorid: ReactorID,
deferred: Deferred,
completion: impl FnOnce(CommandCompletion) + 'static,
) -> Result<()> {
self.send_cmd(reactorid, SysCommand::CloseSocket, deferred, completion)
}
pub fn send_user_cmd(
&self,
reactorid: ReactorID,
cmd: UserCommand,
deferred: Deferred,
completion: impl FnOnce(CommandCompletion) + 'static,
) -> Result<()> {
self.send_cmd(reactorid, SysCommand::UserCmd(cmd), deferred, completion)
}
fn send_cmd(
&self,
reactorid: ReactorID,
cmd: SysCommand<UserCommand>,
deferred: Deferred,
completion: impl FnOnce(CommandCompletion) + 'static,
) -> Result<()> {
match &cmd {
SysCommand::NewListen(_, _) | SysCommand::NewConnect(_, _, _) => {
if reactorid != INVALID_REACTOR_ID {
return Err(
"reactorid msut be INVALID_REACTOR_ID if NewConnect/NewListen".to_owned(),
);
}
}
SysCommand::UserCmd(_) => {
if reactorid == INVALID_REACTOR_ID {
return Err("UserCmd must has a valid reactorid.".to_owned());
}
}
_ => {}
}
if self
.0
.send(CmdData::<UserCommand> {
reactorid,
cmd,
deferred,
completion: Box::new(completion),
})
.is_err()
{
return Err("Failed to send. Receiver disconnected.".to_owned());
}
Ok(())
}
}
pub struct ReactorReableContext<'a, UserCommand> {
pub reactorid: ReactorID, pub sock: &'a mut std::net::TcpStream, pub sender: &'a mut MsgSender, pub reader: &'a mut MsgReader, pub cmd_sender: &'a CmdSender<UserCommand>, }
pub trait TcpListenerHandler {
type UserCommand;
fn on_start_listen(
&mut self,
_reactorid: ReactorID,
_cmd_sender: &CmdSender<Self::UserCommand>,
) {
}
fn on_new_connection(
&mut self,
sock: &mut std::net::TcpListener,
new_sock: &mut std::net::TcpStream,
) -> Option<NewStreamConnection<Self::UserCommand>>;
fn on_close_listen(
&mut self,
_reactorid: ReactorID,
_cmd_sender: &CmdSender<Self::UserCommand>,
) {
}
}
pub struct NewStreamConnection<UserCommand> {
pub reactor: Box<dyn Reactor<UserCommand = UserCommand>>,
pub recv_buffer_min_size: usize,
}
pub struct ReactRuntime<UserCommand> {
mgr: ReactorMgr<UserCommand>,
deferred_data: FlatStorage<CmdData<UserCommand>>,
deferred_heap: Vec<DeferredKey>, sock_events: Events, accum_sock_events: usize, accum_commands: usize, }
#[derive(Copy, Clone)]
struct DeferredKey {
millis: i64,
data: usize,
}
impl DeferredKey {
fn get_key(&self) -> i64 {
self.millis
}
}
fn min_heap_push(v: &mut [DeferredKey]) {
let mut k = v.len() - 1; if k == 0 {
return;
}
let mut parent = (k - 1) / 2;
while k > 0 && v[k].get_key() < v[parent].get_key() {
v.swap(k, parent);
k = parent;
parent = (k - 1) / 2;
}
}
fn min_heap_pop(v: &mut [DeferredKey]) {
let mut k = 0;
let value = v[0];
while k < v.len() - 1 {
let (l, r) = ((k + 1) * 2 - 1, (k + 1) * 2);
let min = if r < v.len() - 1 {
if v[l].get_key() < v[r].get_key() {
l
} else {
r
}
} else if l < v.len() - 1 {
l
} else {
break;
};
v.swap(min, k);
k = min;
}
v[v.len() - 1] = value;
}
struct ReactorMgr<UserCommand> {
socket_handlers: FlatStorage<TcpSocketHandler<UserCommand>>,
poller: Poller,
count_streams: usize, cmd_recv: std::sync::mpsc::Receiver<CmdData<UserCommand>>,
cmd_sender: CmdSender<UserCommand>,
}
enum TcpSocketHandler<UserCommand> {
ListenerType(
ReactorID,
std::net::TcpListener,
Box<dyn TcpListenerHandler<UserCommand = UserCommand>>,
), StreamType(SockData, Box<dyn Reactor<UserCommand = UserCommand>>),
}
struct SockData {
pub reactorid: ReactorID,
pub sock: std::net::TcpStream,
pub sender: MsgSender,
pub reader: MsgReader,
interested_writable: bool,
}
impl<'a, UserCommand> ReactorReableContext<'a, UserCommand> {
fn from(data: &'a mut SockData, cmd_sender: &'a CmdSender<UserCommand>) -> Self {
Self {
reactorid: data.reactorid,
sock: &mut data.sock,
sender: &mut data.sender,
reader: &mut data.reader,
cmd_sender,
}
}
}
#[cfg(target_pointer_width = "64")]
type HalfUsize = u32;
#[cfg(target_pointer_width = "32")]
type HalfUSize = u16;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ReactorID {
sockslot: HalfUsize, ver: HalfUsize, }
pub const INVALID_REACTOR_ID: ReactorID = ReactorID {
sockslot: HalfUsize::MAX,
ver: HalfUsize::MAX,
};
impl ReactorID {
pub fn to_usize(&self) -> usize {
let halfbits = std::mem::size_of::<usize>() * 8 / 2;
((self.ver as usize) << halfbits) | (self.sockslot as usize)
}
pub fn from_usize(val: usize) -> Self {
let halfbits = std::mem::size_of::<usize>() * 8 / 2;
Self {
sockslot: val as HalfUsize,
ver: (val >> halfbits) as HalfUsize,
}
}
}
impl std::fmt::Display for ReactorID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", self.sockslot, self.ver)
}
}
impl<UserCommand> ReactorMgr<UserCommand> {
fn new() -> Self {
let (cmd_sender, cmd_recv) = std::sync::mpsc::channel::<CmdData<UserCommand>>();
Self {
socket_handlers: FlatStorage::new(),
poller: Poller::new().unwrap(),
count_streams: 0,
cmd_sender: CmdSender(cmd_sender),
cmd_recv,
}
}
fn len(&self) -> usize {
self.socket_handlers.len()
}
fn add_stream(
&mut self,
recv_buffer_min_size: usize,
sock: std::net::TcpStream,
handler: Box<dyn Reactor<UserCommand = UserCommand>>,
) -> ReactorID {
let key = self.socket_handlers.add(TcpSocketHandler::StreamType(
SockData {
reactorid: INVALID_REACTOR_ID,
sock,
sender: MsgSender::new(),
reader: MsgReader::new(recv_buffer_min_size),
interested_writable: false,
},
handler,
));
let reactorid = ReactorID {
sockslot: key as HalfUsize,
ver: self.socket_handlers.len() as HalfUsize,
};
self.count_streams += 1;
if let TcpSocketHandler::StreamType(sockdata, ref mut _handler) =
self.socket_handlers.get_mut(key).unwrap()
{
sockdata.reactorid = reactorid;
unsafe {
self.poller
.add_with_mode(
&sockdata.sock,
Event::readable(reactorid.to_usize()),
PollMode::Level,
)
.unwrap();
}
logtrace!(
"Added TcpStream reactorid: {}, sock: {:?}",
sockdata.reactorid,
sockdata.sock
);
return sockdata.reactorid;
}
panic!("ERROR! Failed to get new added sockdata!");
}
fn add_listener(
&mut self,
sock: std::net::TcpListener,
handler: Box<dyn TcpListenerHandler<UserCommand = UserCommand>>,
) -> ReactorID {
let key = self.socket_handlers.add(TcpSocketHandler::ListenerType(
INVALID_REACTOR_ID,
sock,
handler,
));
let reactorid = ReactorID {
sockslot: key as HalfUsize,
ver: self.socket_handlers.len() as HalfUsize,
};
if let TcpSocketHandler::ListenerType(areactorid, ref sock, _) =
self.socket_handlers.get_mut(key).unwrap()
{
*areactorid = reactorid;
unsafe {
self.poller
.add_with_mode(sock, Event::readable(reactorid.to_usize()), PollMode::Level)
.unwrap();
}
logtrace!(
"Added TcpListener reactorid: {}, sock: {:?}",
reactorid,
sock
);
}
reactorid
}
fn close_reactor(&mut self, reactorid: ReactorID) -> bool {
if let Some(sockhandler) = self.socket_handlers.remove(reactorid.sockslot as usize) {
match sockhandler {
TcpSocketHandler::StreamType(sockdata, mut reactor) => {
debug_assert_eq!(reactorid, sockdata.reactorid);
logtrace!(
"removing reactorid: {}, sock: {:?}, pending_read_bytes: {}, pending_send_bytes: {}",
reactorid,
sockdata.sock,
sockdata.reader.bytes_in_buffer(),
sockdata.sender.buf.len()
);
self.count_streams -= 1;
self.poller.delete(&sockdata.sock).unwrap();
(reactor).on_close(reactorid, &self.cmd_sender);
}
TcpSocketHandler::ListenerType(areactorid, sock, mut reactor) => {
debug_assert_eq!(reactorid, areactorid);
logtrace!("removing reactorid: {}, sock: {:?}", reactorid, sock);
self.poller.delete(&sock).unwrap();
(reactor).on_close_listen(reactorid, &self.cmd_sender);
}
}
return true;
}
false
}
fn start_listen(
&mut self,
local_addr: &str,
handler: Box<dyn TcpListenerHandler<UserCommand = UserCommand>>,
) -> std::io::Result<ReactorID> {
let socket = std::net::TcpListener::bind(local_addr)?;
socket.set_nonblocking(true)?;
let reactorid = self.add_listener(socket, handler);
if let TcpSocketHandler::ListenerType(_, _, ref mut handler) = self
.socket_handlers
.get_mut(reactorid.sockslot as usize)
.unwrap()
{
handler.on_start_listen(reactorid, &self.cmd_sender);
return std::io::Result::Ok(reactorid);
}
std::io::Result::Ok(INVALID_REACTOR_ID)
}
fn start_connect(
&mut self,
remote_addr: &str,
recv_buffer_min_size: usize,
handler: Box<dyn Reactor<UserCommand = UserCommand>>,
) -> std::io::Result<ReactorID> {
let socket = TcpStream::connect(remote_addr)?;
socket.set_nonblocking(true)?; let reactorid = self.add_stream(recv_buffer_min_size, socket, handler);
if let TcpSocketHandler::StreamType(ref mut sockdata, ref mut handler) = self
.socket_handlers
.get_mut(reactorid.sockslot as usize)
.unwrap()
{
if handler
.on_connected(
&mut DispatchContext::from(sockdata, &self.cmd_sender),
INVALID_REACTOR_ID,
)
.is_ok()
{
return std::io::Result::Ok(reactorid);
}
}
self.close_reactor(reactorid);
std::io::Result::Ok(INVALID_REACTOR_ID)
}
}
impl<UserCommand> Default for ReactRuntime<UserCommand> {
fn default() -> Self {
Self::new()
}
}
impl<UserCommand> ReactRuntime<UserCommand> {
pub fn new() -> Self {
Self {
mgr: ReactorMgr::new(),
deferred_data: FlatStorage::new(),
deferred_heap: Vec::new(),
sock_events: Events::new(),
accum_sock_events: 0,
accum_commands: 0,
}
}
pub fn process_events(&mut self) -> bool {
self.process_events_with(1, 32)
}
pub fn process_events_with(&mut self, sock_timeout_millis: u64, max_commands: usize) -> bool {
let sock_events = self.process_sock_events(sock_timeout_millis);
self.accum_sock_events += sock_events;
self.process_deferred_queue();
let cmds = self.process_command_queue(max_commands);
self.accum_commands += cmds;
sock_events > 0 || cmds > 0 || !self.deferred_heap.is_empty() || self.mgr.len() > 0
}
pub fn count_reactors(&self) -> usize {
self.mgr.len()
}
pub fn count_deferred_queue(&self) -> usize {
self.deferred_data.len()
}
pub fn count_streams(&self) -> usize {
self.mgr.count_streams
}
pub fn count_sock_events(&self) -> usize {
self.accum_sock_events
}
pub fn count_received_commands(&self) -> usize {
self.accum_commands
}
pub fn get_cmd_sender(&self) -> &CmdSender<UserCommand> {
&self.mgr.cmd_sender
}
pub fn process_sock_events(&mut self, timeout_millis: u64) -> usize {
self.sock_events.clear();
self.mgr
.poller
.wait(
&mut self.sock_events,
Some(Duration::from_millis(timeout_millis)),
)
.unwrap();
for ev in self.sock_events.iter() {
let mut removesock = false;
let current_reactorid = ReactorID::from_usize(ev.key);
let mut new_connection_to_add = None;
if let Some(sockhandler) = self
.mgr
.socket_handlers
.get_mut(current_reactorid.sockslot as usize)
{
match sockhandler {
TcpSocketHandler::ListenerType(reactorid, sock, handler) => {
debug_assert_eq!(current_reactorid, *reactorid);
if ev.readable {
let (mut newsock, _) = sock.accept().unwrap();
if let Some(new_stream_connection) =
handler.on_new_connection(sock, &mut newsock)
{
newsock.set_nonblocking(true).unwrap();
new_connection_to_add = Some((
newsock,
new_stream_connection.reactor,
new_stream_connection.recv_buffer_min_size,
));
}
}
if ev.writable {
logerr!("writable listener sock!");
removesock = true;
}
}
TcpSocketHandler::StreamType(ref mut ctx, ref mut handler) => {
debug_assert_eq!(current_reactorid, ctx.reactorid);
if ev.writable {
if !ctx.interested_writable {
dbglog!("WARN: unsolicited writable sock: {:?}", ctx.sock);
}
ctx.interested_writable = true; if !ctx.sender.buf.is_empty() {
if let Err(err) = ctx.sender.send_queued(&mut ctx.sock) {
logtrace!("{err} send_queued failed.");
removesock = true;
}
}
}
if ev.readable {
if let Err(err) = handler.on_readable(&mut ReactorReableContext::from(
ctx,
&self.mgr.cmd_sender,
)) {
if !err.is_empty() {
logtrace!("on_readable requested close current_reactorid: {current_reactorid}, sock: {:?}. Reason: {}", ctx.sock, err);
}
removesock = true;
}
}
if ctx.sender.close_or_error {
removesock = true;
}
if !removesock {
if !ctx.interested_writable && !ctx.sender.buf.is_empty() {
self.mgr
.poller
.modify_with_mode(
&ctx.sock,
Event::all(ev.key),
PollMode::Level,
)
.unwrap();
ctx.interested_writable = true;
} else if ctx.interested_writable && ctx.sender.buf.is_empty() {
self.mgr
.poller
.modify_with_mode(
&ctx.sock,
Event::readable(ev.key),
PollMode::Level,
)
.unwrap();
ctx.interested_writable = false;
}
}
}
}
} else {
dbglog!("[ERROR] socket key has been removed {}!", current_reactorid);
continue;
}
if let Some((newsock, newhandler, recv_buffer_min_size)) = new_connection_to_add {
let newreactorid_to_close = {
let newreactorid =
self.mgr
.add_stream(recv_buffer_min_size, newsock, newhandler);
if let TcpSocketHandler::StreamType(ref mut newsockdata, ref mut newhandler) =
self.mgr
.socket_handlers
.get_mut(newreactorid.sockslot as usize)
.unwrap()
{
match newhandler.on_connected(
&mut DispatchContext::from(newsockdata, &self.mgr.cmd_sender),
current_reactorid,
) {
Ok(_) => INVALID_REACTOR_ID, Err(err) => {
if !err.is_empty() {
logtrace!("Reject new connection for listener_reactorid: {}. Reason: {}", current_reactorid, err);
}
newsockdata.reactorid }
}
} else {
panic!("Failed to find new added stream!");
}
};
if newreactorid_to_close != INVALID_REACTOR_ID {
self.mgr.close_reactor(newreactorid_to_close);
}
continue;
}
if removesock {
self.mgr.close_reactor(current_reactorid);
continue; }
if ev.is_err().unwrap_or(false) {
logerr!("WARN: socket error key: {}", current_reactorid);
removesock = true;
}
if ev.is_interrupt() {
logerr!("WARN: socket interrupt key: {}", current_reactorid);
removesock = true;
}
if removesock {
self.mgr.close_reactor(current_reactorid);
}
}
self.sock_events.len()
}
pub fn process_command_queue(&mut self, max_commands: usize) -> usize {
let mut count_cmd = 0usize;
for _ in 0..max_commands {
let cmddata: CmdData<UserCommand> = match self.mgr.cmd_recv.try_recv() {
Err(err) => {
if err == std::sync::mpsc::TryRecvError::Empty {
return count_cmd;
} else {
panic!("std::sync::mpsc::TryRecvError::Disconnected is not possible. Because both cmd_sender & cmd_recv are saved.");
}
}
Ok(data) => data,
};
count_cmd += 1;
match cmddata.deferred {
Deferred::Immediate => {}
Deferred::UtilTime(time) => {
let millis = time
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
if !ReactRuntime::<UserCommand>::is_deferred_current(millis) {
let key = self.deferred_data.add(cmddata);
self.deferred_heap.push(DeferredKey { millis, data: key });
min_heap_push(&mut self.deferred_heap);
continue; }
}
}
self.execute_immediate_cmd(cmddata);
} count_cmd
}
pub fn process_deferred_queue(&mut self) -> usize {
let mut cmds = 0;
while !self.deferred_heap.is_empty()
&& ReactRuntime::<UserCommand>::is_deferred_current(self.deferred_heap[0].millis)
{
let key = self.deferred_heap[0].data;
min_heap_pop(&mut self.deferred_heap);
self.deferred_heap.pop();
cmds += 1;
if let Some(cmddata) = self.deferred_data.remove(key) {
self.execute_immediate_cmd(cmddata);
} else {
panic!("No deferred CommandData with key: {}", key);
}
}
cmds
}
fn is_deferred_current(millis: i64) -> bool {
let now_nanos = utils::now_nanos();
millis * 1000000 + 5 * 100000 <= now_nanos
}
fn execute_immediate_cmd(&mut self, cmddata: CmdData<UserCommand>) {
let mut reactorid_to_close: ReactorID = INVALID_REACTOR_ID;
match cmddata.cmd {
SysCommand::NewConnect(reactor, remote_addr, recv_buffer_min_size) => {
match self
.mgr
.start_connect(&remote_addr, recv_buffer_min_size, reactor)
{
Err(err) => {
let errmsg =
format!("Failed to connect to {}. Error: {}", remote_addr, err);
(cmddata.completion)(Err(errmsg));
}
Ok(key) => {
(cmddata.completion)(Ok(key));
}
}
}
SysCommand::NewListen(reactor, local_addr) => {
match self.mgr.start_listen(&local_addr, reactor) {
Err(err) => {
let errmsg = format!("Failed to listen on {}. Error: {}", local_addr, err);
(cmddata.completion)(Err(errmsg));
}
Ok(key) => {
(cmddata.completion)(Ok(key));
}
}
}
SysCommand::CloseSocket => {
if self.mgr.close_reactor(cmddata.reactorid) {
(cmddata.completion)(Ok(cmddata.reactorid));
} else {
(cmddata.completion)(Err(format!(
"Failed to remove non existing socket with reactorid: {}",
cmddata.reactorid
)));
}
}
SysCommand::UserCmd(usercmd) => {
if cmddata.reactorid == INVALID_REACTOR_ID {
panic!("UserCommand must be executed on a reactor!");
} else if let Some(handler) = self
.mgr
.socket_handlers
.get_mut(cmddata.reactorid.sockslot as usize)
{
match handler {
TcpSocketHandler::ListenerType(reactorid, _, _) => {
(cmddata.completion)(Err(format!(
"Listener cannot receive user command. cmd reactorid: {}, reactorid: {}",
cmddata.reactorid, *reactorid
)));
}
TcpSocketHandler::StreamType(ctx, reactor) => {
if cmddata.reactorid != ctx.reactorid {
(cmddata.completion)(Err(format!(
"Failed to execute user command with wrong cmd reactorid: {}, found: {}",
cmddata.reactorid , ctx.reactorid
)));
} else {
let res = (reactor).on_command(
usercmd,
&mut DispatchContext {
reactorid: cmddata.reactorid,
sock: &mut ctx.sock,
sender: &mut ctx.sender,
cmd_sender: &self.mgr.cmd_sender,
},
);
(cmddata.completion)(Ok(cmddata.reactorid));
if let Err(err) = res {
logtrace!(
"on_command requested closing reactorid: {}. {}",
cmddata.reactorid,
err
);
reactorid_to_close = cmddata.reactorid;
}
}
}
}
} else {
(cmddata.completion)(Err(format!(
"Failed to execute user command on non existing socket with reactorid: {}",
cmddata.reactorid
)));
}
}
}
if reactorid_to_close != INVALID_REACTOR_ID {
self.mgr.close_reactor(reactorid_to_close);
}
}
}
enum SysCommand<UserCommand> {
NewConnect(
Box<dyn Reactor<UserCommand = UserCommand>>,
String, usize, ),
NewListen(
Box<dyn TcpListenerHandler<UserCommand = UserCommand>>,
String, ), CloseSocket,
UserCmd(UserCommand),
}
struct CmdData<UserCommand> {
reactorid: ReactorID,
cmd: SysCommand<UserCommand>,
deferred: Deferred,
completion: Box<dyn FnOnce(CommandCompletion)>,
}
unsafe impl<UserCommand> Send for CmdData<UserCommand> {}
pub struct MsgSender {
pub buf: Vec<u8>,
pub pending: FlatStorage<PendingSend>, first_pending_id: usize, last_pending_id: usize, pub bytes_sent: usize, close_or_error: bool,
}
pub struct PendingSend {
next_id: usize, startpos: usize, msgsize: usize,
completion: Box<dyn FnOnce()>, }
#[derive(PartialEq, Eq)]
pub enum SendOrQueResult {
Complete,
InQueue,
}
impl Default for MsgSender {
fn default() -> Self {
Self::new()
}
}
impl MsgSender {
pub fn new() -> Self {
Self {
buf: Vec::new(),
pending: FlatStorage::new(),
first_pending_id: usize::MAX, last_pending_id: usize::MAX,
bytes_sent: 0,
close_or_error: false,
}
}
pub fn try_send_all(sock: &mut std::net::TcpStream, buf: &[u8]) -> std::io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
let mut buf = buf;
let mut sentbytes = 0;
loop {
match sock.write(buf) {
std::io::Result::Ok(bytes) => {
if bytes < buf.len() {
buf = &buf[bytes..];
sentbytes += bytes; } else {
return Ok(sentbytes + bytes); }
}
std::io::Result::Err(err) => {
let errkind = err.kind();
if errkind == ErrorKind::WouldBlock {
return Ok(sentbytes); } else if errkind == ErrorKind::ConnectionReset {
logtrace!("sock reset : {sock:?}. close socket");
return Err(err);
} else if errkind == ErrorKind::Interrupted {
logtrace!("[WARN] sock Interrupted : {sock:?}. retry");
return Err(err); } else {
logtrace!("[ERROR]: write on sock {sock:?}, error: {err:?}");
return Err(err);
}
}
}
}
}
pub fn send_or_que(
&mut self,
sock: &mut std::net::TcpStream,
buf: &[u8],
send_completion: Option<Box<dyn FnOnce()>>,
) -> Result<SendOrQueResult> {
if buf.is_empty() {
if let Some(callback) = send_completion {
(callback)();
}
return Ok(SendOrQueResult::Complete);
}
if !self.buf.is_empty() {
self.buf.extend_from_slice(buf);
self.queue_msg_completion(buf.len(), send_completion);
return Ok(SendOrQueResult::InQueue);
}
debug_assert_eq!(self.bytes_sent, 0);
debug_assert_eq!(self.first_pending_id, usize::MAX);
debug_assert_eq!(self.last_pending_id, usize::MAX);
debug_assert_eq!(self.pending.len(), 0);
let sentbytes = match MsgSender::try_send_all(sock, buf) {
Err(err) => {
return Err(err.to_string());
}
Ok(bytes) => bytes,
};
if sentbytes == buf.len() {
if let Some(callback) = send_completion {
(callback)();
}
return Ok(SendOrQueResult::Complete); }
self.buf.extend_from_slice(&buf[sentbytes..]);
self.queue_msg_completion(buf.len() - sentbytes, send_completion);
Ok(SendOrQueResult::InQueue)
}
fn queue_msg_completion(
&mut self,
queued_size: usize,
send_completion: Option<Box<dyn FnOnce()>>,
) {
if let Some(callback) = send_completion {
let prev_id = self.last_pending_id;
self.last_pending_id = self.pending.add(PendingSend {
next_id: usize::MAX,
startpos: self.bytes_sent + self.buf.len() - queued_size,
msgsize: queued_size,
completion: callback,
});
if let Some(prev) = self.pending.get_mut(prev_id) {
prev.next_id = self.last_pending_id;
}
if self.first_pending_id == usize::MAX {
self.first_pending_id = self.last_pending_id;
}
}
}
#[allow(unused_assignments)]
fn send_queued(&mut self, sock: &mut std::net::TcpStream) -> Result<SendOrQueResult> {
if self.buf.is_empty() {
return Ok(SendOrQueResult::Complete);
}
let mut sentbytes = 0;
match sock.write(&self.buf[..]) {
std::io::Result::Ok(bytes) => {
sentbytes = bytes;
if bytes == 0 {
self.close_or_error = true;
return Err(format!("[ERROR] write sock 0 bytes {sock:?}. close socket"));
}
}
std::io::Result::Err(err) => {
let errkind = err.kind();
if errkind == ErrorKind::WouldBlock {
return Ok(SendOrQueResult::InQueue); } else if errkind == ErrorKind::ConnectionReset {
self.close_or_error = true;
return Err(format!(
"[ERROR] Write sock ConnectionReset {sock:?}. close socket"
));
} else if errkind == ErrorKind::Interrupted {
logtrace!("[WARN] sock Interrupted : {sock:?}. retry");
return Ok(SendOrQueResult::InQueue); } else {
self.close_or_error = true;
return Err(format!("[ERROR]: write on sock {sock:?}, error: {err:?}"));
}
}
}
while self.first_pending_id != usize::MAX {
let id = self.first_pending_id;
let (mut sent, mut next_id) = (false, 0);
if let Some(pending) = self.pending.get_mut(id) {
if pending.startpos + pending.msgsize <= self.bytes_sent {
sent = true;
next_id = pending.next_id;
} else {
pending.msgsize -= self.bytes_sent - pending.startpos;
pending.startpos = self.bytes_sent;
break;
}
} else {
panic!("invalid id");
}
if sent {
self.first_pending_id = next_id;
if let Some(pending) = self.pending.remove(id) {
(pending.completion)();
}
}
}
if self.first_pending_id == usize::MAX {
self.last_pending_id = usize::MAX;
}
Ok(self.move_buf_front_after_send(sentbytes))
}
fn move_buf_front_after_send(&mut self, sentbytes: usize) -> SendOrQueResult {
let len = self.buf.len();
self.buf.copy_within(sentbytes..len, 0);
self.buf.resize(len - sentbytes, 0);
if self.buf.is_empty() {
debug_assert_eq!(self.first_pending_id, usize::MAX);
debug_assert_eq!(self.last_pending_id, usize::MAX);
debug_assert_eq!(self.pending.len(), 0);
self.bytes_sent = 0;
SendOrQueResult::Complete
} else {
self.bytes_sent += sentbytes;
SendOrQueResult::InQueue
}
}
}
pub struct AutoSendBuffer<'sender> {
sender: &'sender mut MsgSender,
sock: &'sender mut std::net::TcpStream,
old_buf_size: usize,
}
impl AutoSendBuffer<'_> {
pub fn clear(&mut self) {
self.sender.buf.resize(self.old_buf_size, 0);
}
pub fn count_written(&self) -> usize {
self.sender.buf.len() - self.old_buf_size
}
pub fn get_written(&self) -> &[u8] {
&self.sender.buf[self.old_buf_size..]
}
pub fn send(
&mut self,
send_completion: Option<Box<dyn FnOnce()>>,
) -> std::io::Result<SendOrQueResult> {
let buf = &self.sender.buf[self.old_buf_size..];
let buf_len = buf.len();
if buf.is_empty() {
if let Some(callback) = send_completion {
(callback)();
}
self.old_buf_size = self.sender.buf.len();
return Ok(SendOrQueResult::Complete);
}
if self.old_buf_size > 0 {
self.sender.queue_msg_completion(buf.len(), send_completion);
self.old_buf_size = self.sender.buf.len();
return Ok(SendOrQueResult::InQueue);
}
let sentbytes = match MsgSender::try_send_all(self.sock, buf) {
Err(err) => {
self.sender.close_or_error = true;
self.old_buf_size = self.sender.buf.len();
return Err(err);
}
Ok(bytes) => bytes,
};
if sentbytes > 0 {
self.sender.move_buf_front_after_send(sentbytes);
}
if sentbytes == buf_len {
if let Some(callback) = send_completion {
(callback)();
}
self.old_buf_size = self.sender.buf.len();
return Ok(SendOrQueResult::Complete); }
self.sender
.queue_msg_completion(self.sender.buf.len(), send_completion);
self.old_buf_size = self.sender.buf.len();
Ok(SendOrQueResult::InQueue)
}
}
impl Drop for AutoSendBuffer<'_> {
fn drop(&mut self) {
self.send(None).unwrap(); }
}
impl std::io::Write for AutoSendBuffer<'_> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.sender.buf.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
self.send(None)?;
Ok(())
}
}
pub struct MsgReader {
recv_buffer: Vec<u8>,
min_reserve: usize, startpos: usize, bufsize: usize, decoded_msgsize: usize, }
impl MsgReader {
pub fn new(min_reserved_bytes: usize) -> Self {
Self {
recv_buffer: vec![0u8; min_reserved_bytes],
min_reserve: min_reserved_bytes,
startpos: 0,
bufsize: 0,
decoded_msgsize: 0,
}
}
pub fn bytes_in_buffer(&self) -> usize {
self.bufsize - self.startpos
}
pub fn clear(&mut self) {
self.decoded_msgsize = 0;
self.startpos = 0;
self.bufsize = 0;
}
pub fn try_read_fast_dispatch<UserCommand>(
&mut self,
ctx: &mut DispatchContext<UserCommand>,
dispatcher: &mut impl FnMut(
&mut [u8],
usize,
usize,
&mut DispatchContext<UserCommand>,
) -> Result<MessageResult>,
) -> Result<()> {
loop {
debug_assert!(
self.decoded_msgsize == 0 || self.decoded_msgsize > self.bufsize - self.startpos
);
if self.bufsize + self.min_reserve > self.recv_buffer.len() {
self.recv_buffer.resize(
std::cmp::max(self.bufsize + self.min_reserve, self.recv_buffer.len() * 2),
0,
); }
match ctx.sock.read(&mut self.recv_buffer[self.bufsize..]) {
std::io::Result::Ok(new_bytes) => {
if new_bytes == 0 {
return Err("Peer closed sock".to_owned());
}
debug_assert!(self.bufsize + new_bytes <= self.recv_buffer.len());
self.bufsize += new_bytes;
let should_return = self.bufsize < self.recv_buffer.len();
self.try_dispatch_all(new_bytes, ctx, dispatcher)?;
if should_return {
return Ok(()); } else {
continue; }
}
std::io::Result::Err(err) => {
let errkind = err.kind();
if errkind == ErrorKind::WouldBlock {
return Ok(()); } else if errkind == ErrorKind::ConnectionReset {
return Err("Sock reset".to_owned());
} else if errkind == ErrorKind::Interrupted {
logtrace!("[WARN] sock Interrupted : {:?}. retry", ctx.sock);
return Ok(()); } else if errkind == ErrorKind::ConnectionAborted {
return Err("Sock ConnectionAborted".to_owned()); }
return Err(format!("[ERROR]: Read on sock error: {err:?}"));
}
}
}
}
pub fn try_read_all<UserCommand>(
&mut self,
ctx: &mut DispatchContext<UserCommand>,
) -> Result<()> {
loop {
if self.bufsize + self.min_reserve > self.recv_buffer.len() {
self.recv_buffer.resize(
std::cmp::max(self.bufsize + self.min_reserve, self.recv_buffer.len() * 2),
0,
); }
match ctx.sock.read(&mut self.recv_buffer[self.bufsize..]) {
std::io::Result::Ok(new_bytes) => {
if new_bytes == 0 {
return Err("Peer closed sock".to_owned());
}
debug_assert!(self.bufsize + new_bytes <= self.recv_buffer.len());
self.bufsize += new_bytes;
if self.bufsize < self.recv_buffer.len() {
return Ok(());
}
}
std::io::Result::Err(err) => {
let errkind = err.kind();
if errkind == ErrorKind::WouldBlock {
return Ok(()); } else if errkind == ErrorKind::ConnectionReset {
return Err("Sock ConnectionReset".to_owned());
} else if errkind == ErrorKind::Interrupted {
logtrace!("[WARN] sock Interrupted : {:?}. retry", ctx.sock);
return Ok(()); } else if errkind == ErrorKind::ConnectionAborted {
return Err("sock ConnectionAborted".to_owned()); }
return Err(format!("[ERROR]: Read on sock error: {err:?}"));
}
} } }
pub fn try_dispatch_all<UserCommand>(
&mut self,
new_bytes: usize,
ctx: &mut DispatchContext<UserCommand>,
dispatcher: &mut impl FnMut(
&mut [u8],
usize,
usize,
&mut DispatchContext<UserCommand>,
) -> Result<MessageResult>,
) -> Result<()> {
let mut new_bytes = new_bytes;
while self.startpos < self.bufsize
&& (self.decoded_msgsize == 0 || self.startpos + self.decoded_msgsize <= self.bufsize)
{
match dispatcher(
&mut self.recv_buffer[self.startpos..self.bufsize],
new_bytes,
self.decoded_msgsize,
ctx,
) {
Err(err) => {
self.clear(); return Err(err);
}
Ok(res) => {
match res {
MessageResult::ExpectMsgSize(msgsize) => {
if !(msgsize == 0 || msgsize > self.bufsize - self.startpos) {
logerr!( "[WARN] on_inbound_message should NOT expect a msgsize while full message is already received, which may cause recursive call. msgsize:{msgsize:?} recved: {}",
self.bufsize - self.startpos);
debug_assert!(
false,
"on_inbound_message expects an already full message."
);
}
self.decoded_msgsize = msgsize; break; }
MessageResult::DropMsgSize(msgsize) => {
assert!(msgsize > 0 && msgsize <= self.bufsize - self.startpos); self.startpos += msgsize;
self.decoded_msgsize = 0;
new_bytes = self.bufsize - self.startpos;
}
}
}
}
}
if self.startpos != 0 {
self.recv_buffer.copy_within(self.startpos..self.bufsize, 0); self.bufsize -= self.startpos;
self.startpos = 0;
}
Ok(())
}
pub fn try_read_fast_read<UserCommand>(
&mut self,
ctx: &mut DispatchContext<UserCommand>,
dispatcher: &mut impl FnMut(
&mut [u8],
usize,
usize,
&mut DispatchContext<UserCommand>,
) -> Result<MessageResult>,
) -> Result<()> {
let old_bytes = self.bufsize - self.startpos;
let res = self.try_read_all(ctx);
let res2 = self.try_dispatch_all(self.bufsize - self.startpos - old_bytes, ctx, dispatcher);
res?;
res2?;
Ok(())
}
}
pub trait NewServerReactor: Reactor {
type InitServerParam: Clone;
fn new_server_reactor(count: usize, param: Self::InitServerParam) -> Self;
}
pub struct DefaultTcpListenerHandler<NewReactor: NewServerReactor + 'static> {
pub reactorid: ReactorID,
count_children: usize,
server_param: <NewReactor as NewServerReactor>::InitServerParam,
recv_buffer_min_size: usize,
_phantom: PhantomData<NewReactor>,
}
impl<NewReactor: NewServerReactor + 'static> DefaultTcpListenerHandler<NewReactor> {
pub fn new(
recv_buffer_min_size: usize,
param: <NewReactor as NewServerReactor>::InitServerParam,
) -> Self {
Self {
reactorid: INVALID_REACTOR_ID,
count_children: 0,
server_param: param,
recv_buffer_min_size,
_phantom: PhantomData,
}
}
}
impl<NewReactor: NewServerReactor + 'static> TcpListenerHandler
for DefaultTcpListenerHandler<NewReactor>
{
type UserCommand = <NewReactor as Reactor>::UserCommand;
fn on_start_listen(
&mut self,
reactorid: ReactorID,
_cmd_sender: &CmdSender<Self::UserCommand>,
) {
self.reactorid = reactorid;
}
fn on_new_connection(
&mut self,
_conn: &mut std::net::TcpListener,
_new_conn: &mut std::net::TcpStream,
) -> Option<NewStreamConnection<Self::UserCommand>> {
self.count_children += 1;
Some(NewStreamConnection {
reactor: Box::new(NewReactor::new_server_reactor(
self.count_children,
self.server_param.clone(),
)),
recv_buffer_min_size: self.recv_buffer_min_size,
})
}
}
pub type SimpleIoRuntime = ReactRuntime<()>;
pub type SimpleIoReactorContext<'a> = DispatchContext<'a, ()>;
pub type DynIoReactor = dyn Reactor<UserCommand = ()>;
type OnConnectedHandler<AppData> = dyn FnMut(
&mut SimpleIoReactorContext<'_>,
ReactorID, &mut AppData,
) -> Result<()>;
type OnClosedHandler<AppData> = dyn FnMut(ReactorID, &CmdSender<()>, &mut AppData);
type OnSockMsgHandler<AppData> =
dyn FnMut(&mut [u8], &mut SimpleIoReactorContext<'_>, &mut AppData) -> Result<usize>;
enum DecodeResult<DecodedInfo> {
UnknownMsgSize,
MsgSize(usize, DecodedInfo),
}
type SockMsgDecoder<DecodedInfo> = dyn FnMut(&mut [u8], usize) -> Result<DecodeResult<DecodedInfo>>;
fn null_msg_decoder(buf: &mut [u8], _new_bytes: usize) -> Result<DecodeResult<()>> {
Ok(DecodeResult::MsgSize(buf.len(), ()))
}
pub struct SimpleIoReactor<AppData> {
app_data: AppData,
on_connected_handler: Option<Box<OnConnectedHandler<AppData>>>,
on_closed_handler: Option<Box<OnClosedHandler<AppData>>>,
on_sock_msg_handler: Box<OnSockMsgHandler<AppData>>,
}
impl<AppData: 'static> SimpleIoReactor<AppData> {
pub fn new(
app_data: AppData,
on_connected_handler: Option<Box<OnConnectedHandler<AppData>>>,
on_closed_handler: Option<Box<OnClosedHandler<AppData>>>,
on_sock_msg_handler: impl FnMut(&mut [u8], &mut SimpleIoReactorContext<'_>, &mut AppData) -> Result<usize>
+ 'static,
) -> Self {
Self {
app_data,
on_connected_handler,
on_closed_handler,
on_sock_msg_handler: Box::new(on_sock_msg_handler),
}
}
pub fn new_boxed(
app_data: AppData,
on_connected_handler: Option<Box<OnConnectedHandler<AppData>>>,
on_closed_handler: Option<Box<OnClosedHandler<AppData>>>,
on_sock_msg_handler: impl FnMut(&mut [u8], &mut SimpleIoReactorContext<'_>, &mut AppData) -> Result<usize>
+ 'static,
) -> Box<dyn Reactor<UserCommand = ()>> {
Box::new(Self::new(
app_data,
on_connected_handler,
on_closed_handler,
on_sock_msg_handler,
))
}
}
impl<AppData> Reactor for SimpleIoReactor<AppData> {
type UserCommand = ();
fn on_inbound_message(
&mut self,
buf: &mut [u8],
_new_bytes: usize,
_decoded_msg_size: usize,
ctx: &mut DispatchContext<Self::UserCommand>,
) -> Result<MessageResult> {
let drop_msg_size = (self.on_sock_msg_handler)(buf, ctx, &mut self.app_data)?;
Ok(MessageResult::DropMsgSize(drop_msg_size)) }
fn on_connected(
&mut self,
ctx: &mut DispatchContext<Self::UserCommand>,
listener: ReactorID,
) -> Result<()> {
if let Some(ref mut h) = self.on_connected_handler {
return (h)(ctx, listener, &mut self.app_data);
}
Ok(()) }
fn on_close(&mut self, reactorid: ReactorID, cmd_sender: &CmdSender<Self::UserCommand>) {
if let Some(ref mut h) = self.on_closed_handler {
(h)(reactorid, cmd_sender, &mut self.app_data)
}
}
}
pub struct SimpleIoListener {
count_children: usize,
reactorid: ReactorID,
recv_buffer_min_size: usize,
reactor_creator: Box<dyn FnMut(usize) -> Option<Box<DynIoReactor>>>, }
impl SimpleIoListener {
pub fn new(
recv_buffer_min_size: usize,
reactor_creator: impl FnMut(usize) -> Option<Box<DynIoReactor>> + 'static,
) -> Self {
Self {
count_children: 0,
reactorid: INVALID_REACTOR_ID,
recv_buffer_min_size,
reactor_creator: Box::new(reactor_creator),
}
}
pub fn new_with_io_service<AppData: 'static>(service: SimpleIoService<AppData>) -> Self {
Self {
count_children: 0,
reactorid: INVALID_REACTOR_ID,
recv_buffer_min_size: 0, reactor_creator: Box::new(move |_| Some(Box::new(service.clone()))),
}
}
}
impl TcpListenerHandler for SimpleIoListener {
type UserCommand = ();
fn on_start_listen(
&mut self,
reactorid: ReactorID,
_cmd_sender: &CmdSender<Self::UserCommand>,
) {
self.reactorid = reactorid;
}
fn on_new_connection(
&mut self,
_conn: &mut std::net::TcpListener,
_new_conn: &mut std::net::TcpStream,
) -> Option<NewStreamConnection<Self::UserCommand>> {
self.count_children += 1;
(self.reactor_creator)(self.count_children).map(|reactor| NewStreamConnection {
reactor,
recv_buffer_min_size: self.recv_buffer_min_size,
})
}
}
pub struct SimpleIoService<AppData> {
inner: std::rc::Rc<std::cell::RefCell<IoServiceInner<AppData>>>,
}
pub struct IoServiceInner<AppData> {
stream_reactor: SimpleIoReactor<AppData>,
msg_reader: MsgReader, }
impl<AppData> IoServiceInner<AppData> {
fn on_readable(&mut self, ctx: &mut ReactorReableContext<()>) -> Result<()> {
self.msg_reader.try_read_fast_read(
&mut DispatchContext {
reactorid: ctx.reactorid,
sock: ctx.sock,
sender: ctx.sender,
cmd_sender: ctx.cmd_sender,
},
&mut |buf, new_bytes, decoded_msg_size, ctx| {
self.stream_reactor
.on_inbound_message(buf, new_bytes, decoded_msg_size, ctx)
},
)
}
}
impl<AppData> Clone for SimpleIoService<AppData> {
fn clone(&self) -> Self {
Self {
inner: std::rc::Rc::clone(&self.inner),
}
}
}
impl<AppData: 'static> SimpleIoService<AppData> {
pub fn new(
recv_buf_min_size: usize,
app_data: AppData,
on_connected_handler: Option<Box<OnConnectedHandler<AppData>>>,
on_closed_handler: Option<Box<OnClosedHandler<AppData>>>,
on_sock_msg_handler: impl FnMut(&mut [u8], &mut SimpleIoReactorContext<'_>, &mut AppData) -> Result<usize>
+ 'static,
) -> Self {
Self {
inner: std::rc::Rc::new(std::cell::RefCell::new(IoServiceInner::<AppData> {
stream_reactor: SimpleIoReactor::<AppData>::new(
app_data,
on_connected_handler,
on_closed_handler,
on_sock_msg_handler,
),
msg_reader: MsgReader::new(recv_buf_min_size),
})),
}
}
pub fn new_boxed(
recv_buf_min_size: usize,
app_data: AppData,
on_connected_handler: Option<Box<OnConnectedHandler<AppData>>>,
on_closed_handler: Option<Box<OnClosedHandler<AppData>>>,
on_sock_msg_handler: impl FnMut(&mut [u8], &mut SimpleIoReactorContext<'_>, &mut AppData) -> Result<usize>
+ 'static,
) -> Box<dyn Reactor<UserCommand = ()>> {
Box::new(Self::new(
recv_buf_min_size,
app_data,
on_connected_handler,
on_closed_handler,
on_sock_msg_handler,
))
}
pub fn apply_app_data(&self, func: impl FnOnce(&AppData)) -> Result<()> {
if let Ok(v) = self.inner.try_borrow() {
func(&v.stream_reactor.app_data);
return Ok(());
}
Err("Unable to borrow SimpleIoService".to_owned())
}
pub fn apply_app_data_mut(&self, func: impl FnOnce(&mut AppData)) -> Result<()> {
if let Ok(mut v) = self.inner.try_borrow_mut() {
func(&mut v.stream_reactor.app_data);
return Ok(());
}
Err("Unable to borrow SimpleIoService".to_owned())
}
}
impl<AppData> Reactor for SimpleIoService<AppData> {
type UserCommand = ();
fn on_inbound_message(
&mut self,
_buf: &mut [u8],
_new_bytes: usize,
_decoded_msg_size: usize,
_ctx: &mut DispatchContext<Self::UserCommand>,
) -> Result<MessageResult> {
panic!("IoServiceInner handles on_inbound_message. this function should not be called!");
}
fn on_readable(&mut self, ctx: &mut ReactorReableContext<Self::UserCommand>) -> Result<()> {
self.inner.borrow_mut().on_readable(ctx)
}
fn on_connected(
&mut self,
ctx: &mut DispatchContext<Self::UserCommand>,
listener: ReactorID,
) -> Result<()> {
self.inner
.borrow_mut()
.stream_reactor
.on_connected(ctx, listener)
}
fn on_close(&mut self, reactorid: ReactorID, cmd_sender: &CmdSender<Self::UserCommand>) {
self.inner
.borrow_mut()
.stream_reactor
.on_close(reactorid, cmd_sender);
}
}
#[cfg(test)]
mod tests {
static EMPTY_COMPLETION_FUNC: fn() = || {};
fn is_empty_function(_fun: &(dyn Fn() + 'static)) -> Option<Box<dyn Fn() + 'static>> {
None
}
#[test]
pub fn test_compare_function() {
assert!(is_empty_function(&EMPTY_COMPLETION_FUNC).is_none());
}
}