1use std::cell::RefCell;
2use std::future::Future;
3use std::pin::Pin;
4use std::rc::{Rc, Weak};
5use std::task::{Context, Poll, Waker};
6
7use log::debug;
8
9use crate::detail::msg_dispatcher::{RspHandle, TimeoutCb};
10use crate::detail::msg_wrapper::MsgType;
11use crate::dispose::Dispose;
12use crate::rpc::RpcProto;
13use crate::type_def::{CmdType, SeqType};
14
15pub struct RequestImpl {
16 rpc: Option<Weak<dyn RpcProto>>,
17 self_weak: Weak<Request>,
18 self_keeper: Option<Rc<Request>>,
19 pub(crate) seq: SeqType,
20 pub(crate) cmd: CmdType,
21 pub(crate) payload: Option<Vec<u8>>,
22 pub(crate) need_rsp: bool,
23 canceled: bool,
24 pub(crate) rsp_handle: Option<Rc<RspHandle>>,
25 pub(crate) timeout_ms: u32,
26 pub(crate) timeout_cb: Option<Rc<TimeoutCb>>,
27 finally_type: FinallyType,
28 finally: Option<Box<dyn Fn(FinallyType)>>,
29 retry_count: i32,
30 waiting_rsp: bool,
31 pub(crate) is_ping: bool,
32}
33
34pub struct Request {
35 pub(crate) inner: RefCell<RequestImpl>,
36}
37
38pub trait DisposeProto {
39 fn add(&mut self, request: &Rc<Request>);
40 fn remove(&mut self, request: &Rc<Request>);
41}
42
43#[derive(Debug, Clone, PartialEq)]
44pub enum FinallyType {
45 Normal = 0,
46 NoNeedRsp = 1,
47 Timeout = 2,
48 Canceled = 3,
49 RpcExpired = 4,
50 RpcNotReady = 5,
51 RspSerializeError = 6,
52 NoSuchCmd = 7,
53}
54
55impl FinallyType {
56 pub fn to_str(&self) -> &'static str {
57 match self {
58 FinallyType::Normal => "normal",
59 FinallyType::NoNeedRsp => "no_need_rsp",
60 FinallyType::Timeout => "timeout",
61 FinallyType::Canceled => "canceled",
62 FinallyType::RpcExpired => "rpc_expired",
63 FinallyType::RpcNotReady => "rpc_not_ready",
64 FinallyType::RspSerializeError => "rsp_serialize_error",
65 FinallyType::NoSuchCmd => "no_such_cmd",
66 }
67 }
68}
69
70impl Request {
72 pub fn new() -> Rc<Self> {
73 let r = Rc::new(Self {
74 inner: RefCell::new(RequestImpl {
75 rpc: None,
76 self_weak: Default::default(),
77 self_keeper: None,
78 seq: 0,
79 cmd: "".to_string(),
80 payload: None,
81 need_rsp: false,
82 canceled: false,
83 rsp_handle: None,
84 timeout_ms: 3000,
85 timeout_cb: None,
86 finally_type: FinallyType::Normal,
87 finally: None,
88 retry_count: 0,
89 waiting_rsp: false,
90 is_ping: false,
91 }),
92 });
93 r.inner.borrow_mut().self_weak = Rc::downgrade(&r);
94 r.timeout(|| {});
95 r
96 }
97
98 pub fn create_with_rpc(rpc: Weak<dyn RpcProto>) -> Rc<Self> {
99 let r = Self::new();
100 r.inner.borrow_mut().rpc = Some(rpc);
101 r
102 }
103
104 pub fn cmd<'a>(self: &'a Rc<Self>, cmd: impl ToString) -> &'a Rc<Self> {
105 self.inner.borrow_mut().cmd = cmd.to_string();
106 self
107 }
108
109 pub fn msg<'a, T>(self: &'a Rc<Self>, msg: T) -> &'a Rc<Self>
110 where
111 T: serde::Serialize,
112 {
113 self.inner.borrow_mut().payload = serde_json::to_string(&msg).unwrap().into_bytes().into();
114 self
115 }
116
117 pub fn rsp<'a, F, P>(self: &'a Rc<Self>, cb: F) -> &'a Rc<Self>
118 where
119 P: for<'de> serde::Deserialize<'de>,
120 F: Fn(P) + 'static,
121 {
122 {
123 let weak = Rc::downgrade(&self);
124 let mut request = self.inner.borrow_mut();
125 request.need_rsp = true;
126 request.rsp_handle = Some(Rc::new(move |msg| -> bool {
127 let this = weak.upgrade();
128 if this.is_none() {
129 return false;
130 }
131
132 let this = this.unwrap();
133 if this.is_canceled() {
134 this.on_finish(FinallyType::Canceled);
135 return true;
136 }
137
138 if msg.type_.contains(MsgType::NoSuchCmd) {
139 this.on_finish(FinallyType::NoSuchCmd);
140 return true;
141 }
142
143 if let Ok(value) = msg.unpack_as::<P>() {
144 cb(value);
145 this.on_finish(FinallyType::Normal);
146 true
147 } else {
148 this.on_finish(FinallyType::RspSerializeError);
149 false
150 }
151 }));
152 }
153 self
154 }
155
156 pub fn finally<'a, F>(self: &'a Rc<Self>, finally: F) -> &'a Rc<Self>
157 where
158 F: Fn(FinallyType) + 'static,
159 {
160 self.inner.borrow_mut().finally = Some(Box::new(finally));
161 self
162 }
163
164 pub fn call(self: &Rc<Self>) {
165 self.inner.borrow_mut().waiting_rsp = true;
166
167 if self.inner.borrow().canceled {
168 self.on_finish(FinallyType::Canceled);
169 return;
170 }
171
172 self.inner.borrow_mut().self_keeper = Some(self.clone());
173
174 if self.inner.borrow().rpc.is_none()
175 || self.inner.borrow().rpc.as_ref().unwrap().strong_count() == 0
176 {
177 self.on_finish(FinallyType::RpcExpired);
178 return;
179 }
180
181 let r = self.inner.borrow().rpc.as_ref().unwrap().upgrade().unwrap();
182 if !r.is_ready() {
183 self.on_finish(FinallyType::RpcNotReady);
184 return;
185 }
186
187 self.inner.borrow_mut().seq = r.make_seq();
188 r.send_request(self.as_ref());
189
190 if !self.inner.borrow().need_rsp {
191 self.on_finish(FinallyType::NoNeedRsp)
192 }
193 }
194
195 pub fn call_with_rpc(self: &Rc<Self>, rpc: Rc<dyn RpcProto>) {
196 self.inner.borrow_mut().rpc = Some(Rc::downgrade(&rpc));
197 self.call();
198 }
199
200 pub fn ping<'a>(self: &'a Rc<Self>) -> &'a Rc<Self> {
201 self.inner.borrow_mut().is_ping = true;
202 self
203 }
204
205 pub fn timeout_ms<'a>(self: &'a Rc<Self>, timeout_ms: u32) -> &'a Rc<Self> {
206 self.inner.borrow_mut().timeout_ms = timeout_ms;
207 self
208 }
209
210 pub fn timeout<'a, F>(self: &'a Rc<Self>, timeout_cb: F) -> &'a Rc<Self>
211 where
212 F: Fn() + 'static,
213 {
214 let weak = Rc::downgrade(&self);
215 self.inner.borrow_mut().timeout_cb = Some(Rc::new(Box::new(move || {
216 let this = weak.upgrade();
217 if this.is_none() {
218 return;
219 }
220 let this = this.unwrap();
221 timeout_cb();
222 if this.inner.borrow().retry_count == -1 {
223 this.call();
224 } else if this.inner.borrow().retry_count > 0 {
225 this.inner.borrow_mut().retry_count -= 1;
226 this.call();
227 } else {
228 this.on_finish(FinallyType::Timeout);
229 }
230 })));
231 self
232 }
233
234 pub fn add_to<'a>(self: &'a Rc<Self>, dispose: &mut Dispose) -> &'a Rc<Self> {
235 dispose.add(&self);
236 self
237 }
238
239 pub fn cancel<'a>(self: &'a Rc<Self>) -> &'a Rc<Self> {
240 self.canceled(true);
241 self.on_finish(FinallyType::Canceled);
242 self
243 }
244
245 pub fn reset_cancel<'a>(self: &'a Rc<Self>) -> &'a Rc<Self> {
246 self.canceled(false);
247 self
248 }
249
250 pub fn retry<'a>(self: &'a Rc<Self>, count: i32) -> &'a Rc<Self> {
251 self.inner.borrow_mut().retry_count = count;
252 self
253 }
254
255 pub fn disable_rsp<'a>(self: &'a Rc<Self>) -> &'a Rc<Self> {
256 self.inner.borrow_mut().need_rsp = false;
257 self
258 }
259
260 pub fn rpc<'a>(self: &'a Rc<Self>, rpc: Weak<dyn RpcProto>) -> &'a Rc<Self> {
261 self.inner.borrow_mut().rpc = Some(rpc);
262 self
263 }
264
265 pub fn get_rpc(&self) -> Option<Weak<dyn RpcProto>> {
266 self.inner.borrow().rpc.clone()
267 }
268
269 pub fn is_canceled(&self) -> bool {
270 self.inner.borrow().canceled
271 }
272
273 pub fn canceled<'a>(self: &'a Rc<Self>, canceled: bool) -> &'a Rc<Self> {
274 self.inner.borrow_mut().canceled = canceled;
275 self
276 }
277}
278
279#[derive(Debug)]
280pub struct FutureRet<R> {
281 pub type_: FinallyType,
282 pub result: Option<R>,
283}
284
285impl<R> FutureRet<R> {
286 pub fn unwrap(self) -> R {
287 if self.type_ != FinallyType::Normal {
288 panic!(
289 "called `FutureRet::unwrap()` on FinallyType::{:?}",
290 self.type_
291 );
292 }
293 match self.result {
294 Some(val) => val,
295 None => panic!("called `FutureRet::unwrap()` on a `None` value"),
296 }
297 }
298}
299
300impl Request {
301 pub async fn future<R>(self: &Rc<Self>) -> FutureRet<R>
302 where
303 R: for<'de> serde::Deserialize<'de> + 'static,
304 {
305 struct FutureResultInner<R> {
306 ready: bool,
307 result: Option<R>,
308 finally_type: FinallyType,
309 waker: Option<Waker>,
310 }
311 struct FutureResult<R> {
312 inner: Rc<RefCell<FutureResultInner<R>>>,
313 }
314 let result = FutureResult {
315 inner: Rc::new(RefCell::new(FutureResultInner {
316 ready: false,
317 result: None,
318 finally_type: FinallyType::Normal,
319 waker: None,
320 })),
321 };
322
323 let result_c1 = result.inner.clone();
324 let result_c2 = result.inner.clone();
325 self.rsp(move |msg: R| {
326 let mut result = result_c1.borrow_mut();
327 result.result = Some(msg);
328 })
329 .finally(move |finally| {
330 let mut result = result_c2.borrow_mut();
331 result.ready = true;
332 result.finally_type = finally;
333 if let Some(waker) = std::mem::replace(&mut result.waker, None) {
334 waker.wake();
335 }
336 })
337 .call();
338
339 impl<R> Future for FutureResult<R> {
340 type Output = FutureRet<R>;
341 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
342 let mut result = self.inner.borrow_mut();
343 if result.ready {
344 Poll::Ready(FutureRet {
345 type_: result.finally_type.clone(),
346 result: result.result.take(),
347 })
348 } else {
349 result.waker = Some(cx.waker().clone());
350 Poll::Pending
351 }
352 }
353 }
354
355 result.await
356 }
357}
358
359impl Request {
361 fn on_finish(&self, type_: FinallyType) {
362 let mut request = self.inner.borrow_mut();
363 if !request.waiting_rsp {
364 return;
365 }
366 request.waiting_rsp = false;
367 debug!("on_finish: cmd:{} type:{:?}", request.cmd, type_);
368 request.finally_type = type_;
369 if let Some(finally) = request.finally.take() {
370 finally(request.finally_type.clone());
371 }
372 request.self_keeper = None;
373 }
374}