rpc_core/
rpc.rs

1use std::cell::RefCell;
2use std::rc::{Rc, Weak};
3
4use log::debug;
5
6use crate::connection::{Connection, DefaultConnection};
7use crate::detail::coder;
8use crate::detail::msg_dispatcher::{MsgDispatcher, TimeoutCb};
9use crate::detail::msg_wrapper::{MsgType, MsgWrapper};
10use crate::request::Request;
11use crate::type_def::SeqType;
12
13pub trait RpcProto {
14    fn make_seq(&self) -> SeqType;
15    fn send_request(&self, request: &Request);
16    fn is_ready(&self) -> bool;
17}
18
19pub struct RpcImpl {
20    weak: Weak<Rpc>,
21    connection: Rc<RefCell<dyn Connection>>,
22    dispatcher: Rc<RefCell<MsgDispatcher>>,
23    seq: SeqType,
24    is_ready: bool,
25}
26
27pub struct Rpc {
28    inner: RefCell<RpcImpl>,
29}
30
31impl Rpc {
32    pub fn new(connection: Option<Rc<RefCell<dyn Connection>>>) -> Rc<Rpc> {
33        let connection = connection.unwrap_or(DefaultConnection::new());
34        let rpc = Rc::new(Rpc {
35            inner: RefCell::new(RpcImpl {
36                weak: Weak::new(),
37                connection: connection.clone(),
38                dispatcher: MsgDispatcher::new(connection),
39                seq: 0,
40                is_ready: false,
41            }),
42        });
43        rpc.inner.borrow_mut().weak = Rc::downgrade(&rpc);
44        rpc
45    }
46
47    pub fn subscribe<C, F, P, R>(&self, cmd: C, handle: F)
48    where
49        C: ToString,
50        P: for<'de> serde::Deserialize<'de>,
51        R: serde::Serialize,
52        F: Fn(P) -> R + 'static,
53    {
54        self.inner
55            .borrow_mut()
56            .dispatcher
57            .borrow_mut()
58            .subscribe_cmd(
59                cmd.to_string(),
60                Box::new(move |msg| -> Option<MsgWrapper> {
61                    if let Ok(value) = msg.unpack_as::<P>() {
62                        let rsp: R = handle(value);
63                        Some(MsgWrapper::make_rsp(msg.seq, rsp))
64                    } else {
65                        None
66                    }
67                }),
68            );
69    }
70
71    pub fn unsubscribe<C>(&self, cmd: C)
72    where
73        C: ToString,
74    {
75        self.inner
76            .borrow_mut()
77            .dispatcher
78            .borrow_mut()
79            .unsubscribe_cmd(cmd.to_string());
80    }
81
82    pub fn create_request(&self) -> Rc<Request> {
83        Request::create_with_rpc(self.inner.borrow().weak.clone())
84    }
85
86    pub fn cmd<T>(&self, cmd: T) -> Rc<Request>
87    where
88        T: ToString,
89    {
90        let r = self.create_request();
91        r.cmd(cmd.to_string());
92        r
93    }
94
95    pub fn ping(&self) -> Rc<Request> {
96        let r = self.create_request();
97        r.ping();
98        r
99    }
100
101    pub fn ping_msg(&self, payload: impl ToString) -> Rc<Request> {
102        let r = self.create_request();
103        r.ping().msg(payload.to_string());
104        r
105    }
106
107    pub fn set_timer<F>(&self, timer_impl: F)
108    where
109        F: Fn(u32, Box<TimeoutCb>) + 'static,
110    {
111        self.inner
112            .borrow_mut()
113            .dispatcher
114            .borrow_mut()
115            .set_timer_impl(timer_impl);
116    }
117
118    pub fn set_ready(&self, ready: bool) {
119        self.inner.borrow_mut().is_ready = ready;
120    }
121
122    pub fn get_connection(&self) -> Rc<RefCell<dyn Connection>> {
123        self.inner.borrow().connection.clone()
124    }
125}
126
127impl RpcProto for Rpc {
128    fn make_seq(&self) -> SeqType {
129        let mut inner = self.inner.borrow_mut();
130        let seq = inner.seq;
131        inner.seq += 1;
132        seq
133    }
134
135    fn send_request(&self, request: &Request) {
136        let msg;
137        let payload;
138        let connection;
139        {
140            let inner = self.inner.borrow_mut();
141            let request = request.inner.borrow_mut();
142            if request.need_rsp {
143                inner.dispatcher.borrow_mut().subscribe_rsp(
144                    request.seq,
145                    request.rsp_handle.as_ref().unwrap().clone(),
146                    request.timeout_cb.clone(),
147                    request.timeout_ms,
148                );
149            }
150            msg = MsgWrapper {
151                seq: request.seq,
152                type_: (|| {
153                    let mut type_val = MsgType::Command;
154                    if request.is_ping {
155                        type_val |= MsgType::Ping;
156                    }
157                    if request.need_rsp {
158                        type_val |= MsgType::NeedRsp;
159                    }
160                    type_val
161                })(),
162                cmd: request.cmd.clone(),
163                data: request.payload.clone().unwrap_or(vec![]),
164                request_payload: None,
165            };
166
167            payload = coder::serialize(&msg);
168            connection = inner.connection.clone();
169        }
170        debug!(
171            "=> seq:{} type:{} {}",
172            msg.seq,
173            if msg.type_.contains(MsgType::Ping) {
174                "ping"
175            } else {
176                "cmd"
177            },
178            msg.cmd
179        );
180        connection.borrow().send_package(payload);
181    }
182
183    fn is_ready(&self) -> bool {
184        self.inner.borrow().is_ready
185    }
186}