Skip to main content

razor_rpc/client/
mod.rs

1pub mod task;
2use captains_log::filter::LogFilter;
3use crossfire::oneshot::oneshot;
4pub use razor_rpc_macros::{endpoint_async, endpoint_client};
5pub use razor_stream::client::ClientCaller;
6pub use task::*;
7
8use crate::Codec;
9use crate::error::{RpcErrCodec, RpcError, RpcIntErr};
10pub use razor_stream::client::{
11    ClientCallerBlocking, ClientConfig, ClientFacts, ClientTransport, ConnPool, FailoverPool,
12};
13use std::fmt;
14use std::future::Future;
15use std::sync::Arc;
16
17pub struct APIFact<C: Codec> {
18    pub logger: Arc<LogFilter>,
19    config: ClientConfig,
20    _phan: std::marker::PhantomData<fn(&C)>,
21}
22
23pub type APIConnPool<C, P> = ConnPool<APIFact<C>, P>;
24pub type APIFailoverPool<C, P> = FailoverPool<APIFact<C>, P>;
25
26impl<C: Codec> APIFact<C> {
27    pub fn new(config: ClientConfig) -> Arc<Self> {
28        Arc::new(Self { logger: Arc::new(LogFilter::new()), config, _phan: Default::default() })
29    }
30
31    #[inline]
32    pub fn set_log_level(&self, level: log::Level) {
33        self.logger.set_level(level);
34    }
35
36    pub fn new_conn_pool<P: ClientTransport>(
37        self: Arc<Self>, rt: &P::RT, addr: &str,
38    ) -> APIConnPool<C, P> {
39        ConnPool::<APIFact<C>, P>::new(self.clone(), rt, addr, 0)
40    }
41
42    pub fn new_failover<P: ClientTransport>(
43        self: Arc<Self>, rt: &P::RT, addrs: Vec<String>, stateless: bool, retry_limit: usize,
44    ) -> APIFailoverPool<C, P> {
45        FailoverPool::<APIFact<C>, P>::new(self.clone(), rt, addrs, stateless, retry_limit, 0)
46    }
47}
48
49impl<C: Codec> ClientFacts for APIFact<C> {
50    type Codec = C;
51    type Task = APIClientReq;
52
53    #[inline]
54    fn new_logger(&self) -> Arc<LogFilter> {
55        self.logger.clone()
56    }
57
58    #[inline]
59    fn get_config(&self) -> &ClientConfig {
60        &self.config
61    }
62}
63
64pub trait APIClientCaller: ClientCaller<Facts: ClientFacts<Task = APIClientReq>> {
65    fn call<Req, Resp, E>(
66        &self, service_method: &'static str, req: &Req,
67    ) -> impl Future<Output = Result<Resp, RpcError<E>>> + Send
68    where
69        Req: serde::Serialize + fmt::Debug + Send + Sync,
70        Resp: for<'a> serde::Deserialize<'a> + Send + fmt::Debug + 'static + Default,
71        E: RpcErrCodec;
72}
73
74impl<F, P> APIClientCaller for ConnPool<F, P>
75where
76    F: ClientFacts<Task = APIClientReq>,
77    P: ClientTransport,
78{
79    async fn call<Req, Resp, E>(
80        &self, service_method: &'static str, req: &Req,
81    ) -> Result<Resp, RpcError<E>>
82    where
83        Req: serde::Serialize + fmt::Debug + Send + Sync,
84        Resp: for<'a> serde::Deserialize<'a> + Send + fmt::Debug + 'static + Default,
85        E: RpcErrCodec,
86    {
87        let (tx, rx) = oneshot::<APIClientReq>();
88        let codec = <Self as ClientCaller>::get_codec(self);
89        let mut task = APIClientReq::new(&codec, service_method, req);
90        task.set_noti(tx);
91        self.send_req(task).await;
92        if let Ok(mut task) = rx.recv_async().await {
93            return task.process_res(&codec);
94        } else {
95            return Err(RpcIntErr::Internal.into());
96        }
97    }
98}
99
100impl<F, P> APIClientCaller for FailoverPool<F, P>
101where
102    F: ClientFacts<Task = APIClientReq>,
103    P: ClientTransport,
104{
105    async fn call<Req, Resp, E>(
106        &self, service_method: &'static str, req: &Req,
107    ) -> Result<Resp, RpcError<E>>
108    where
109        Req: serde::Serialize + fmt::Debug + Send + Sync,
110        Resp: for<'a> serde::Deserialize<'a> + Send + fmt::Debug + 'static + Default,
111        E: RpcErrCodec,
112    {
113        let codec = <Self as ClientCaller>::get_codec(self);
114        let mut retry_count = 0;
115        let max_retries = self.get_retry_limit();
116        let mut task = APIClientReq::new(&codec, service_method, req);
117        let (tx, mut rx) = oneshot::<APIClientReq>();
118        task.set_noti(tx);
119        self.send_req(task).await;
120        loop {
121            if let Ok(mut task) = rx.recv_async().await {
122                let result = task.process_res::<_, Resp, E>(&codec);
123                match result {
124                    Ok(resp) => return Ok(resp),
125                    Err(RpcError::Rpc(e)) => {
126                        // RpcIntErr less than Method is retriable by FailoverPool internally
127                        return Err(RpcError::Rpc(e));
128                    }
129                    Err(RpcError::User(e)) => {
130                        retry_count += 1;
131                        match e.should_failover() {
132                            Ok(Some(redirect_addr)) => {
133                                if retry_count < max_retries {
134                                    // Retry to specific address
135                                    let (tx, _rx) = oneshot::<APIClientReq>();
136                                    task.set_noti(tx);
137                                    rx = _rx;
138                                    self.resubmit(
139                                        task,
140                                        Ok(redirect_addr.to_string()),
141                                        retry_count,
142                                        None,
143                                    )
144                                    .await;
145                                    continue;
146                                }
147                            }
148                            Ok(None) => {
149                                if retry_count < max_retries {
150                                    // Retry to next node
151                                    let (tx, _rx) = oneshot::<APIClientReq>();
152                                    task.set_noti(tx);
153                                    rx = _rx;
154                                    let last_index = task.last_index;
155                                    self.resubmit(task, Err(last_index), retry_count, None).await;
156                                    continue;
157                                }
158                            }
159                            Err(()) => return Err(RpcError::User(e)),
160                        }
161                        return Err(RpcError::User(e));
162                    }
163                }
164            } else {
165                return Err(RpcIntErr::Internal.into());
166            }
167        }
168    }
169}