1use std::cell::RefCell;
2use std::rc::{Rc, Weak};
3
4use log::debug;
5
6use crate::connection::{Connection, DefaultConnection};
7use crate::detail::coder;
8use crate::detail::msg_dispatcher::{MsgDispatcher, TimeoutCb};
9use crate::detail::msg_wrapper::{MsgType, MsgWrapper};
10use crate::request::Request;
11use crate::type_def::SeqType;
12
13pub trait RpcProto {
14 fn make_seq(&self) -> SeqType;
15 fn send_request(&self, request: &Request);
16 fn is_ready(&self) -> bool;
17}
18
19pub struct RpcImpl {
20 weak: Weak<Rpc>,
21 connection: Rc<RefCell<dyn Connection>>,
22 dispatcher: Rc<RefCell<MsgDispatcher>>,
23 seq: SeqType,
24 is_ready: bool,
25}
26
27pub struct Rpc {
28 inner: RefCell<RpcImpl>,
29}
30
31impl Rpc {
32 pub fn new(connection: Option<Rc<RefCell<dyn Connection>>>) -> Rc<Rpc> {
33 let connection = connection.unwrap_or(DefaultConnection::new());
34 let rpc = Rc::new(Rpc {
35 inner: RefCell::new(RpcImpl {
36 weak: Weak::new(),
37 connection: connection.clone(),
38 dispatcher: MsgDispatcher::new(connection),
39 seq: 0,
40 is_ready: false,
41 }),
42 });
43 rpc.inner.borrow_mut().weak = Rc::downgrade(&rpc);
44 rpc
45 }
46
47 pub fn subscribe<C, F, P, R>(&self, cmd: C, handle: F)
48 where
49 C: ToString,
50 P: for<'de> serde::Deserialize<'de>,
51 R: serde::Serialize,
52 F: Fn(P) -> R + 'static,
53 {
54 self.inner
55 .borrow_mut()
56 .dispatcher
57 .borrow_mut()
58 .subscribe_cmd(
59 cmd.to_string(),
60 Box::new(move |msg| -> Option<MsgWrapper> {
61 if let Ok(value) = msg.unpack_as::<P>() {
62 let rsp: R = handle(value);
63 Some(MsgWrapper::make_rsp(msg.seq, rsp))
64 } else {
65 None
66 }
67 }),
68 );
69 }
70
71 pub fn unsubscribe<C>(&self, cmd: C)
72 where
73 C: ToString,
74 {
75 self.inner
76 .borrow_mut()
77 .dispatcher
78 .borrow_mut()
79 .unsubscribe_cmd(cmd.to_string());
80 }
81
82 pub fn create_request(&self) -> Rc<Request> {
83 Request::create_with_rpc(self.inner.borrow().weak.clone())
84 }
85
86 pub fn cmd<T>(&self, cmd: T) -> Rc<Request>
87 where
88 T: ToString,
89 {
90 let r = self.create_request();
91 r.cmd(cmd.to_string());
92 r
93 }
94
95 pub fn ping(&self) -> Rc<Request> {
96 let r = self.create_request();
97 r.ping();
98 r
99 }
100
101 pub fn ping_msg(&self, payload: impl ToString) -> Rc<Request> {
102 let r = self.create_request();
103 r.ping().msg(payload.to_string());
104 r
105 }
106
107 pub fn set_timer<F>(&self, timer_impl: F)
108 where
109 F: Fn(u32, Box<TimeoutCb>) + 'static,
110 {
111 self.inner
112 .borrow_mut()
113 .dispatcher
114 .borrow_mut()
115 .set_timer_impl(timer_impl);
116 }
117
118 pub fn set_ready(&self, ready: bool) {
119 self.inner.borrow_mut().is_ready = ready;
120 }
121
122 pub fn get_connection(&self) -> Rc<RefCell<dyn Connection>> {
123 self.inner.borrow().connection.clone()
124 }
125}
126
127impl RpcProto for Rpc {
128 fn make_seq(&self) -> SeqType {
129 let mut inner = self.inner.borrow_mut();
130 let seq = inner.seq;
131 inner.seq += 1;
132 seq
133 }
134
135 fn send_request(&self, request: &Request) {
136 let msg;
137 let payload;
138 let connection;
139 {
140 let inner = self.inner.borrow_mut();
141 let request = request.inner.borrow_mut();
142 if request.need_rsp {
143 inner.dispatcher.borrow_mut().subscribe_rsp(
144 request.seq,
145 request.rsp_handle.as_ref().unwrap().clone(),
146 request.timeout_cb.clone(),
147 request.timeout_ms,
148 );
149 }
150 msg = MsgWrapper {
151 seq: request.seq,
152 type_: (|| {
153 let mut type_val = MsgType::Command;
154 if request.is_ping {
155 type_val |= MsgType::Ping;
156 }
157 if request.need_rsp {
158 type_val |= MsgType::NeedRsp;
159 }
160 type_val
161 })(),
162 cmd: request.cmd.clone(),
163 data: request.payload.clone().unwrap_or(vec![]),
164 request_payload: None,
165 };
166
167 payload = coder::serialize(&msg);
168 connection = inner.connection.clone();
169 }
170 debug!(
171 "=> seq:{} type:{} {}",
172 msg.seq,
173 if msg.type_.contains(MsgType::Ping) {
174 "ping"
175 } else {
176 "cmd"
177 },
178 msg.cmd
179 );
180 connection.borrow().send_package(payload);
181 }
182
183 fn is_ready(&self) -> bool {
184 self.inner.borrow().is_ready
185 }
186}