reactio

Struct CmdSender

Source
pub struct CmdSender<UserCommand>(/* private fields */);
Expand description

CmdSender is owned by a ReactRuntime. Users send commands to a reactor with specific ReactorID.

  • Note that CmdSender can only send command to a reactor that belongs to the same ReactRuntime.

Implementations§

Source§

impl<UserCommand> CmdSender<UserCommand>

Source

pub fn send_connect<AReactor: Reactor<UserCommand = UserCommand> + 'static>( &self, remote_addr: &str, recv_buffer_min_size: usize, reactor: AReactor, deferred: Deferred, completion: impl FnOnce(CommandCompletion) + 'static, ) -> Result<()>

Send a command to create a socket to connect to remote IP:Port. The reactor will receive socket messages once connected.

§Arguments
  • remote_addr - Remote address in format IP:Port.
  • reactor - The reactor to be add to ReactRuntime to handle the socket messages.
  • deferred - Indicate the command to be executed immediately or in a deferred time.
  • completion - Callback to indicate if the command has been executed or failed (e.g. reactorid doesn’t exist).
Examples found in repository?
examples/echo_client.rs (lines 10-24)
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
fn run(port: i32, max_echos: i32, latency_batch: i32) {
    let addr = "127.0.0.1:".to_owned() + &port.to_string();
    let recv_buffer_min_size = 1024;
    let mut runtime = ReactRuntime::new();
    let cmd_sender = runtime.get_cmd_sender();
    cmd_sender
        .send_connect(
            &addr,
            recv_buffer_min_size,
            example::pingpong::PingpongReactor::new_client(
                "client".to_owned(),
                max_echos,
                latency_batch,
            ),
            reactio::Deferred::Immediate,
            |result| {
                if let reactio::CommandCompletion::Error(err) = result {
                    logmsg!("Failed to connect. err: {}", err);
                }
            },
        )
        .unwrap();

    while runtime.process_events() {}
    assert_eq!(runtime.count_reactors(), 0);
}
Source

pub fn send_listen<AReactor: TcpListenerHandler<UserCommand = UserCommand> + 'static>( &self, local_addr: &str, reactor: AReactor, deferred: Deferred, completion: impl FnOnce(CommandCompletion) + 'static, ) -> Result<()>

Send a command to create a listen socket at IP:Port. The reactor will listen on the socket.

Examples found in repository?
examples/echo_server.rs (lines 10-25)
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
fn run(port: i32) {
    let addr = "127.0.0.1:".to_owned() + &port.to_string();
    let recv_buffer_min_size = 1024;
    let mut runtime = ReactRuntime::new();
    let cmd_sender = runtime.get_cmd_sender();
    cmd_sender
        .send_listen(
            &addr,
            DefaultTcpListenerHandler::<example::pingpong::PingpongReactor>::new(
                recv_buffer_min_size,
                example::pingpong::ServerParam {
                    name: "server".to_owned(),
                    latency_batch: 1000,
                },
            ),
            reactio::Deferred::Immediate,
            |result| {
                if let reactio::CommandCompletion::Error(err) = result {
                    logmsg!("Failed to listen. err: {}", err);
                }
            },
        )
        .unwrap();
    while runtime.process_events() {}
    assert_eq!(runtime.count_reactors(), 0);
}
Source

pub fn send_close( &self, reactorid: ReactorID, deferred: Deferred, completion: impl FnOnce(CommandCompletion) + 'static, ) -> Result<()>

Send a command to close a reactor and it’s socket.

Examples found in repository?
examples/example/pingpong.rs (line 81)
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
    fn on_connected(
        &mut self,
        ctx: &mut DispatchContext<MyUserCommand>,
        listener: ReactorID,
    ) -> Result<()> {
        self.parent_listener = listener;
        logmsg!("[{}] sock connected: {:?}", self.name, ctx.sock);
        if self.is_client {
            self.send_msg(ctx, "test msg000")?;
        } else {
            // if it's not client. close parent listener socket.
            ctx.cmd_sender
                .send_close(listener, Deferred::Immediate, |_| {})?;
        }
        Ok(())
    }
More examples
Hide additional examples
examples/example/threaded_pingpong.rs (line 101)
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
    fn on_connected(
        &mut self,
        ctx: &mut DispatchContext<Self::UserCommand>,
        listener: ReactorID,
    ) -> Result<()> {
        self.inner.parent_listener = listener;
        logmsg!("[{}] connected sock: {:?}", self.inner.name, ctx.sock);
        // register <name, uid>
        self.reactormgr.add_reactor_uid(
            self.inner.name.clone(),
            ReactorUID {
                runtimeid: self.runtimeid,
                reactorid: ctx.reactorid,
            },
        )?;
        if self.inner.is_client {
            // send cmd to self to start sending msg to server.
            ctx.cmd_sender.send_user_cmd(
                ctx.reactorid,
                "StartSending".to_owned(),
                Deferred::UtilTime(
                    std::time::SystemTime::now()
                        .checked_add(std::time::Duration::from_millis(10))
                        .expect("Failed att time!"),
                ),
                |_| {},
            )?;
        } else {
            // server
            ctx.cmd_sender
                .send_close(listener, Deferred::Immediate, |_| {})?;
        }
        Ok(())
        // return self.reactor.on_connected(ctx, listener);
    }
Source

pub fn send_user_cmd( &self, reactorid: ReactorID, cmd: UserCommand, deferred: Deferred, completion: impl FnOnce(CommandCompletion) + 'static, ) -> Result<()>

Send a UserCommand to a reactor with specified reactorid. The existance of reactorid is not check in this function. When process_events is called and the deferred time becomes current, reactorid is checked before passing the cmd to reactor.

Examples found in repository?
examples/example/threaded_pingpong.rs (lines 88-97)
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
    fn on_connected(
        &mut self,
        ctx: &mut DispatchContext<Self::UserCommand>,
        listener: ReactorID,
    ) -> Result<()> {
        self.inner.parent_listener = listener;
        logmsg!("[{}] connected sock: {:?}", self.inner.name, ctx.sock);
        // register <name, uid>
        self.reactormgr.add_reactor_uid(
            self.inner.name.clone(),
            ReactorUID {
                runtimeid: self.runtimeid,
                reactorid: ctx.reactorid,
            },
        )?;
        if self.inner.is_client {
            // send cmd to self to start sending msg to server.
            ctx.cmd_sender.send_user_cmd(
                ctx.reactorid,
                "StartSending".to_owned(),
                Deferred::UtilTime(
                    std::time::SystemTime::now()
                        .checked_add(std::time::Duration::from_millis(10))
                        .expect("Failed att time!"),
                ),
                |_| {},
            )?;
        } else {
            // server
            ctx.cmd_sender
                .send_close(listener, Deferred::Immediate, |_| {})?;
        }
        Ok(())
        // return self.reactor.on_connected(ctx, listener);
    }

    fn on_inbound_message(
        &mut self,
        buf: &mut [u8],
        new_bytes: usize,
        decoded_msg_size: usize,
        ctx: &mut DispatchContext<Self::UserCommand>,
    ) -> Result<MessageResult> {
        self.inner
            .on_inbound_message(buf, new_bytes, decoded_msg_size, ctx)
    }

    fn on_command(
        &mut self,
        cmd: Self::UserCommand,
        ctx: &mut DispatchContext<Self::UserCommand>,
    ) -> Result<()> {
        logmsg!("[{}] **Recv user cmd** {}", &self.inner.name, &cmd);
        if self.inner.is_client {
            //-- test send cmd to server
            let server_uid = self
                .reactormgr
                .find_reactor_uid("server-1")
                .ok_or_else(|| format!("ERROR: Failed to find server name: {}", "server-1"))?;
            let sender_to_server = self
                .reactormgr
                .get_cmd_sender(server_uid.runtimeid)
                .ok_or_else(|| {
                    format!(
                        "ERROR: failed to find sender by runtimeid: {}",
                        server_uid.runtimeid
                    )
                })?;
            sender_to_server.send_user_cmd(
                server_uid.reactorid,
                "TestCmdFromClient".to_owned(),
                Deferred::Immediate,
                |_| {},
            )?;

            //-- send initial msg
            self.inner.send_msg(ctx, "hello world")
        } else {
            Ok(())
        }
        // self.reactor.on_command(cmd, ctx);
    }

Trait Implementations§

Source§

impl<UserCommand> Clone for CmdSender<UserCommand>

Source§

fn clone(&self) -> Self

Returns a copy of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<UserCommand> Send for CmdSender<UserCommand>

CmdSender is a Send so that is can be passed through threads.

Auto Trait Implementations§

§

impl<UserCommand> Freeze for CmdSender<UserCommand>

§

impl<UserCommand> RefUnwindSafe for CmdSender<UserCommand>

§

impl<UserCommand> Sync for CmdSender<UserCommand>

§

impl<UserCommand> Unpin for CmdSender<UserCommand>

§

impl<UserCommand> UnwindSafe for CmdSender<UserCommand>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dst: *mut T)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dst. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more