pub struct CmdSender<UserCommand>(/* private fields */);
Expand description
CmdSender is owned by a ReactRuntime. Users send commands to a reactor with specific ReactorID.
- Note that CmdSender can only send command to a reactor that belongs to the same ReactRuntime.
Implementations§
Source§impl<UserCommand> CmdSender<UserCommand>
impl<UserCommand> CmdSender<UserCommand>
Sourcepub fn send_connect<AReactor: Reactor<UserCommand = UserCommand> + 'static>(
&self,
remote_addr: &str,
recv_buffer_min_size: usize,
reactor: AReactor,
deferred: Deferred,
completion: impl FnOnce(CommandCompletion) + 'static,
) -> Result<()>
pub fn send_connect<AReactor: Reactor<UserCommand = UserCommand> + 'static>( &self, remote_addr: &str, recv_buffer_min_size: usize, reactor: AReactor, deferred: Deferred, completion: impl FnOnce(CommandCompletion) + 'static, ) -> Result<()>
Send a command to create a socket to connect to remote IP:Port. The reactor will receive socket messages once connected.
§Arguments
remote_addr
- Remote address in format IP:Port.reactor
- The reactor to be add to ReactRuntime to handle the socket messages.deferred
- Indicate the command to be executed immediately or in a deferred time.completion
- Callback to indicate if the command has been executed or failed (e.g. reactorid doesn’t exist).
Examples found in repository?
examples/echo_client.rs (lines 10-24)
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
fn run(port: i32, max_echos: i32, latency_batch: i32) {
let addr = "127.0.0.1:".to_owned() + &port.to_string();
let recv_buffer_min_size = 1024;
let mut runtime = ReactRuntime::new();
let cmd_sender = runtime.get_cmd_sender();
cmd_sender
.send_connect(
&addr,
recv_buffer_min_size,
example::pingpong::PingpongReactor::new_client(
"client".to_owned(),
max_echos,
latency_batch,
),
reactio::Deferred::Immediate,
|result| {
if let reactio::CommandCompletion::Error(err) = result {
logmsg!("Failed to connect. err: {}", err);
}
},
)
.unwrap();
while runtime.process_events() {}
assert_eq!(runtime.count_reactors(), 0);
}
Sourcepub fn send_listen<AReactor: TcpListenerHandler<UserCommand = UserCommand> + 'static>(
&self,
local_addr: &str,
reactor: AReactor,
deferred: Deferred,
completion: impl FnOnce(CommandCompletion) + 'static,
) -> Result<()>
pub fn send_listen<AReactor: TcpListenerHandler<UserCommand = UserCommand> + 'static>( &self, local_addr: &str, reactor: AReactor, deferred: Deferred, completion: impl FnOnce(CommandCompletion) + 'static, ) -> Result<()>
Send a command to create a listen socket at IP:Port. The reactor will listen on the socket.
Examples found in repository?
examples/echo_server.rs (lines 10-25)
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
fn run(port: i32) {
let addr = "127.0.0.1:".to_owned() + &port.to_string();
let recv_buffer_min_size = 1024;
let mut runtime = ReactRuntime::new();
let cmd_sender = runtime.get_cmd_sender();
cmd_sender
.send_listen(
&addr,
DefaultTcpListenerHandler::<example::pingpong::PingpongReactor>::new(
recv_buffer_min_size,
example::pingpong::ServerParam {
name: "server".to_owned(),
latency_batch: 1000,
},
),
reactio::Deferred::Immediate,
|result| {
if let reactio::CommandCompletion::Error(err) = result {
logmsg!("Failed to listen. err: {}", err);
}
},
)
.unwrap();
while runtime.process_events() {}
assert_eq!(runtime.count_reactors(), 0);
}
Sourcepub fn send_close(
&self,
reactorid: ReactorID,
deferred: Deferred,
completion: impl FnOnce(CommandCompletion) + 'static,
) -> Result<()>
pub fn send_close( &self, reactorid: ReactorID, deferred: Deferred, completion: impl FnOnce(CommandCompletion) + 'static, ) -> Result<()>
Send a command to close a reactor and it’s socket.
Examples found in repository?
examples/example/pingpong.rs (line 81)
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
fn on_connected(
&mut self,
ctx: &mut DispatchContext<MyUserCommand>,
listener: ReactorID,
) -> Result<()> {
self.parent_listener = listener;
logmsg!("[{}] sock connected: {:?}", self.name, ctx.sock);
if self.is_client {
self.send_msg(ctx, "test msg000")?;
} else {
// if it's not client. close parent listener socket.
ctx.cmd_sender
.send_close(listener, Deferred::Immediate, |_| {})?;
}
Ok(())
}
More examples
examples/example/threaded_pingpong.rs (line 101)
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
fn on_connected(
&mut self,
ctx: &mut DispatchContext<Self::UserCommand>,
listener: ReactorID,
) -> Result<()> {
self.inner.parent_listener = listener;
logmsg!("[{}] connected sock: {:?}", self.inner.name, ctx.sock);
// register <name, uid>
self.reactormgr.add_reactor_uid(
self.inner.name.clone(),
ReactorUID {
runtimeid: self.runtimeid,
reactorid: ctx.reactorid,
},
)?;
if self.inner.is_client {
// send cmd to self to start sending msg to server.
ctx.cmd_sender.send_user_cmd(
ctx.reactorid,
"StartSending".to_owned(),
Deferred::UtilTime(
std::time::SystemTime::now()
.checked_add(std::time::Duration::from_millis(10))
.expect("Failed att time!"),
),
|_| {},
)?;
} else {
// server
ctx.cmd_sender
.send_close(listener, Deferred::Immediate, |_| {})?;
}
Ok(())
// return self.reactor.on_connected(ctx, listener);
}
Sourcepub fn send_user_cmd(
&self,
reactorid: ReactorID,
cmd: UserCommand,
deferred: Deferred,
completion: impl FnOnce(CommandCompletion) + 'static,
) -> Result<()>
pub fn send_user_cmd( &self, reactorid: ReactorID, cmd: UserCommand, deferred: Deferred, completion: impl FnOnce(CommandCompletion) + 'static, ) -> Result<()>
Send a UserCommand to a reactor with specified reactorid
.
The existance of reactorid is not check in this function.
When process_events
is called and the deferred time becomes current,
reactorid
is checked before passing the cmd to reactor.
Examples found in repository?
examples/example/threaded_pingpong.rs (lines 88-97)
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
fn on_connected(
&mut self,
ctx: &mut DispatchContext<Self::UserCommand>,
listener: ReactorID,
) -> Result<()> {
self.inner.parent_listener = listener;
logmsg!("[{}] connected sock: {:?}", self.inner.name, ctx.sock);
// register <name, uid>
self.reactormgr.add_reactor_uid(
self.inner.name.clone(),
ReactorUID {
runtimeid: self.runtimeid,
reactorid: ctx.reactorid,
},
)?;
if self.inner.is_client {
// send cmd to self to start sending msg to server.
ctx.cmd_sender.send_user_cmd(
ctx.reactorid,
"StartSending".to_owned(),
Deferred::UtilTime(
std::time::SystemTime::now()
.checked_add(std::time::Duration::from_millis(10))
.expect("Failed att time!"),
),
|_| {},
)?;
} else {
// server
ctx.cmd_sender
.send_close(listener, Deferred::Immediate, |_| {})?;
}
Ok(())
// return self.reactor.on_connected(ctx, listener);
}
fn on_inbound_message(
&mut self,
buf: &mut [u8],
new_bytes: usize,
decoded_msg_size: usize,
ctx: &mut DispatchContext<Self::UserCommand>,
) -> Result<MessageResult> {
self.inner
.on_inbound_message(buf, new_bytes, decoded_msg_size, ctx)
}
fn on_command(
&mut self,
cmd: Self::UserCommand,
ctx: &mut DispatchContext<Self::UserCommand>,
) -> Result<()> {
logmsg!("[{}] **Recv user cmd** {}", &self.inner.name, &cmd);
if self.inner.is_client {
//-- test send cmd to server
let server_uid = self
.reactormgr
.find_reactor_uid("server-1")
.ok_or_else(|| format!("ERROR: Failed to find server name: {}", "server-1"))?;
let sender_to_server = self
.reactormgr
.get_cmd_sender(server_uid.runtimeid)
.ok_or_else(|| {
format!(
"ERROR: failed to find sender by runtimeid: {}",
server_uid.runtimeid
)
})?;
sender_to_server.send_user_cmd(
server_uid.reactorid,
"TestCmdFromClient".to_owned(),
Deferred::Immediate,
|_| {},
)?;
//-- send initial msg
self.inner.send_msg(ctx, "hello world")
} else {
Ok(())
}
// self.reactor.on_command(cmd, ctx);
}
Trait Implementations§
impl<UserCommand> Send for CmdSender<UserCommand>
CmdSender
is a Send
so that is can be passed through threads.
Auto Trait Implementations§
impl<UserCommand> Freeze for CmdSender<UserCommand>
impl<UserCommand> RefUnwindSafe for CmdSender<UserCommand>
impl<UserCommand> Sync for CmdSender<UserCommand>
impl<UserCommand> Unpin for CmdSender<UserCommand>
impl<UserCommand> UnwindSafe for CmdSender<UserCommand>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§unsafe fn clone_to_uninit(&self, dst: *mut T)
unsafe fn clone_to_uninit(&self, dst: *mut T)
🔬This is a nightly-only experimental API. (
clone_to_uninit
)