rpc_core/
request.rs

1use std::cell::RefCell;
2use std::future::Future;
3use std::pin::Pin;
4use std::rc::{Rc, Weak};
5use std::task::{Context, Poll, Waker};
6
7use log::debug;
8
9use crate::detail::msg_dispatcher::{RspHandle, TimeoutCb};
10use crate::detail::msg_wrapper::MsgType;
11use crate::dispose::Dispose;
12use crate::rpc::RpcProto;
13use crate::type_def::{CmdType, SeqType};
14
15pub struct RequestImpl {
16    rpc: Option<Weak<dyn RpcProto>>,
17    self_weak: Weak<Request>,
18    self_keeper: Option<Rc<Request>>,
19    pub(crate) seq: SeqType,
20    pub(crate) cmd: CmdType,
21    pub(crate) payload: Option<Vec<u8>>,
22    pub(crate) need_rsp: bool,
23    canceled: bool,
24    pub(crate) rsp_handle: Option<Rc<RspHandle>>,
25    pub(crate) timeout_ms: u32,
26    pub(crate) timeout_cb: Option<Rc<TimeoutCb>>,
27    finally_type: FinallyType,
28    finally: Option<Box<dyn Fn(FinallyType)>>,
29    retry_count: i32,
30    waiting_rsp: bool,
31    pub(crate) is_ping: bool,
32}
33
34pub struct Request {
35    pub(crate) inner: RefCell<RequestImpl>,
36}
37
38pub trait DisposeProto {
39    fn add(&mut self, request: &Rc<Request>);
40    fn remove(&mut self, request: &Rc<Request>);
41}
42
43#[derive(Debug, Clone, PartialEq)]
44pub enum FinallyType {
45    Normal = 0,
46    NoNeedRsp = 1,
47    Timeout = 2,
48    Canceled = 3,
49    RpcExpired = 4,
50    RpcNotReady = 5,
51    RspSerializeError = 6,
52    NoSuchCmd = 7,
53}
54
55impl FinallyType {
56    pub fn to_str(&self) -> &'static str {
57        match self {
58            FinallyType::Normal => "normal",
59            FinallyType::NoNeedRsp => "no_need_rsp",
60            FinallyType::Timeout => "timeout",
61            FinallyType::Canceled => "canceled",
62            FinallyType::RpcExpired => "rpc_expired",
63            FinallyType::RpcNotReady => "rpc_not_ready",
64            FinallyType::RspSerializeError => "rsp_serialize_error",
65            FinallyType::NoSuchCmd => "no_such_cmd",
66        }
67    }
68}
69
70// public
71impl Request {
72    pub fn new() -> Rc<Self> {
73        let r = Rc::new(Self {
74            inner: RefCell::new(RequestImpl {
75                rpc: None,
76                self_weak: Default::default(),
77                self_keeper: None,
78                seq: 0,
79                cmd: "".to_string(),
80                payload: None,
81                need_rsp: false,
82                canceled: false,
83                rsp_handle: None,
84                timeout_ms: 3000,
85                timeout_cb: None,
86                finally_type: FinallyType::Normal,
87                finally: None,
88                retry_count: 0,
89                waiting_rsp: false,
90                is_ping: false,
91            }),
92        });
93        r.inner.borrow_mut().self_weak = Rc::downgrade(&r);
94        r.timeout(|| {});
95        r
96    }
97
98    pub fn create_with_rpc(rpc: Weak<dyn RpcProto>) -> Rc<Self> {
99        let r = Self::new();
100        r.inner.borrow_mut().rpc = Some(rpc);
101        r
102    }
103
104    pub fn cmd<'a>(self: &'a Rc<Self>, cmd: impl ToString) -> &'a Rc<Self> {
105        self.inner.borrow_mut().cmd = cmd.to_string();
106        self
107    }
108
109    pub fn msg<'a, T>(self: &'a Rc<Self>, msg: T) -> &'a Rc<Self>
110    where
111        T: serde::Serialize,
112    {
113        self.inner.borrow_mut().payload = serde_json::to_string(&msg).unwrap().into_bytes().into();
114        self
115    }
116
117    pub fn rsp<'a, F, P>(self: &'a Rc<Self>, cb: F) -> &'a Rc<Self>
118    where
119        P: for<'de> serde::Deserialize<'de>,
120        F: Fn(P) + 'static,
121    {
122        {
123            let weak = Rc::downgrade(&self);
124            let mut request = self.inner.borrow_mut();
125            request.need_rsp = true;
126            request.rsp_handle = Some(Rc::new(move |msg| -> bool {
127                let this = weak.upgrade();
128                if this.is_none() {
129                    return false;
130                }
131
132                let this = this.unwrap();
133                if this.is_canceled() {
134                    this.on_finish(FinallyType::Canceled);
135                    return true;
136                }
137
138                if msg.type_.contains(MsgType::NoSuchCmd) {
139                    this.on_finish(FinallyType::NoSuchCmd);
140                    return true;
141                }
142
143                if let Ok(value) = msg.unpack_as::<P>() {
144                    cb(value);
145                    this.on_finish(FinallyType::Normal);
146                    true
147                } else {
148                    this.on_finish(FinallyType::RspSerializeError);
149                    false
150                }
151            }));
152        }
153        self
154    }
155
156    pub fn finally<'a, F>(self: &'a Rc<Self>, finally: F) -> &'a Rc<Self>
157    where
158        F: Fn(FinallyType) + 'static,
159    {
160        self.inner.borrow_mut().finally = Some(Box::new(finally));
161        self
162    }
163
164    pub fn call(self: &Rc<Self>) {
165        self.inner.borrow_mut().waiting_rsp = true;
166
167        if self.inner.borrow().canceled {
168            self.on_finish(FinallyType::Canceled);
169            return;
170        }
171
172        self.inner.borrow_mut().self_keeper = Some(self.clone());
173
174        if self.inner.borrow().rpc.is_none()
175            || self.inner.borrow().rpc.as_ref().unwrap().strong_count() == 0
176        {
177            self.on_finish(FinallyType::RpcExpired);
178            return;
179        }
180
181        let r = self.inner.borrow().rpc.as_ref().unwrap().upgrade().unwrap();
182        if !r.is_ready() {
183            self.on_finish(FinallyType::RpcNotReady);
184            return;
185        }
186
187        self.inner.borrow_mut().seq = r.make_seq();
188        r.send_request(self.as_ref());
189
190        if !self.inner.borrow().need_rsp {
191            self.on_finish(FinallyType::NoNeedRsp)
192        }
193    }
194
195    pub fn call_with_rpc(self: &Rc<Self>, rpc: Rc<dyn RpcProto>) {
196        self.inner.borrow_mut().rpc = Some(Rc::downgrade(&rpc));
197        self.call();
198    }
199
200    pub fn ping<'a>(self: &'a Rc<Self>) -> &'a Rc<Self> {
201        self.inner.borrow_mut().is_ping = true;
202        self
203    }
204
205    pub fn timeout_ms<'a>(self: &'a Rc<Self>, timeout_ms: u32) -> &'a Rc<Self> {
206        self.inner.borrow_mut().timeout_ms = timeout_ms;
207        self
208    }
209
210    pub fn timeout<'a, F>(self: &'a Rc<Self>, timeout_cb: F) -> &'a Rc<Self>
211    where
212        F: Fn() + 'static,
213    {
214        let weak = Rc::downgrade(&self);
215        self.inner.borrow_mut().timeout_cb = Some(Rc::new(Box::new(move || {
216            let this = weak.upgrade();
217            if this.is_none() {
218                return;
219            }
220            let this = this.unwrap();
221            timeout_cb();
222            if this.inner.borrow().retry_count == -1 {
223                this.call();
224            } else if this.inner.borrow().retry_count > 0 {
225                this.inner.borrow_mut().retry_count -= 1;
226                this.call();
227            } else {
228                this.on_finish(FinallyType::Timeout);
229            }
230        })));
231        self
232    }
233
234    pub fn add_to<'a>(self: &'a Rc<Self>, dispose: &mut Dispose) -> &'a Rc<Self> {
235        dispose.add(&self);
236        self
237    }
238
239    pub fn cancel<'a>(self: &'a Rc<Self>) -> &'a Rc<Self> {
240        self.canceled(true);
241        self.on_finish(FinallyType::Canceled);
242        self
243    }
244
245    pub fn reset_cancel<'a>(self: &'a Rc<Self>) -> &'a Rc<Self> {
246        self.canceled(false);
247        self
248    }
249
250    pub fn retry<'a>(self: &'a Rc<Self>, count: i32) -> &'a Rc<Self> {
251        self.inner.borrow_mut().retry_count = count;
252        self
253    }
254
255    pub fn disable_rsp<'a>(self: &'a Rc<Self>) -> &'a Rc<Self> {
256        self.inner.borrow_mut().need_rsp = false;
257        self
258    }
259
260    pub fn rpc<'a>(self: &'a Rc<Self>, rpc: Weak<dyn RpcProto>) -> &'a Rc<Self> {
261        self.inner.borrow_mut().rpc = Some(rpc);
262        self
263    }
264
265    pub fn get_rpc(&self) -> Option<Weak<dyn RpcProto>> {
266        self.inner.borrow().rpc.clone()
267    }
268
269    pub fn is_canceled(&self) -> bool {
270        self.inner.borrow().canceled
271    }
272
273    pub fn canceled<'a>(self: &'a Rc<Self>, canceled: bool) -> &'a Rc<Self> {
274        self.inner.borrow_mut().canceled = canceled;
275        self
276    }
277}
278
279#[derive(Debug)]
280pub struct FutureRet<R> {
281    pub type_: FinallyType,
282    pub result: Option<R>,
283}
284
285impl<R> FutureRet<R> {
286    pub fn unwrap(self) -> R {
287        if self.type_ != FinallyType::Normal {
288            panic!(
289                "called `FutureRet::unwrap()` on FinallyType::{:?}",
290                self.type_
291            );
292        }
293        match self.result {
294            Some(val) => val,
295            None => panic!("called `FutureRet::unwrap()` on a `None` value"),
296        }
297    }
298}
299
300impl Request {
301    pub async fn future<R>(self: &Rc<Self>) -> FutureRet<R>
302    where
303        R: for<'de> serde::Deserialize<'de> + 'static,
304    {
305        struct FutureResultInner<R> {
306            ready: bool,
307            result: Option<R>,
308            finally_type: FinallyType,
309            waker: Option<Waker>,
310        }
311        struct FutureResult<R> {
312            inner: Rc<RefCell<FutureResultInner<R>>>,
313        }
314        let result = FutureResult {
315            inner: Rc::new(RefCell::new(FutureResultInner {
316                ready: false,
317                result: None,
318                finally_type: FinallyType::Normal,
319                waker: None,
320            })),
321        };
322
323        let result_c1 = result.inner.clone();
324        let result_c2 = result.inner.clone();
325        self.rsp(move |msg: R| {
326            let mut result = result_c1.borrow_mut();
327            result.result = Some(msg);
328        })
329        .finally(move |finally| {
330            let mut result = result_c2.borrow_mut();
331            result.ready = true;
332            result.finally_type = finally;
333            if let Some(waker) = std::mem::replace(&mut result.waker, None) {
334                waker.wake();
335            }
336        })
337        .call();
338
339        impl<R> Future for FutureResult<R> {
340            type Output = FutureRet<R>;
341            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
342                let mut result = self.inner.borrow_mut();
343                if result.ready {
344                    Poll::Ready(FutureRet {
345                        type_: result.finally_type.clone(),
346                        result: result.result.take(),
347                    })
348                } else {
349                    result.waker = Some(cx.waker().clone());
350                    Poll::Pending
351                }
352            }
353        }
354
355        result.await
356    }
357}
358
359// private
360impl Request {
361    fn on_finish(&self, type_: FinallyType) {
362        let mut request = self.inner.borrow_mut();
363        if !request.waiting_rsp {
364            return;
365        }
366        request.waiting_rsp = false;
367        debug!("on_finish: cmd:{} type:{:?}", request.cmd, type_);
368        request.finally_type = type_;
369        if let Some(finally) = request.finally.take() {
370            finally(request.finally_type.clone());
371        }
372        request.self_keeper = None;
373    }
374}