1pub mod task;
2use captains_log::filter::LogFilter;
3use crossfire::oneshot::oneshot;
4pub use razor_rpc_macros::{endpoint_async, endpoint_client};
5pub use razor_stream::client::ClientCaller;
6pub use task::*;
7
8use crate::Codec;
9use crate::error::{RpcErrCodec, RpcError, RpcIntErr};
10pub use razor_stream::client::{
11 ClientCallerBlocking, ClientConfig, ClientFacts, ClientTransport, ConnPool, FailoverPool,
12};
13use std::fmt;
14use std::future::Future;
15use std::sync::Arc;
16
17pub struct APIFact<C: Codec> {
18 pub logger: Arc<LogFilter>,
19 config: ClientConfig,
20 _phan: std::marker::PhantomData<fn(&C)>,
21}
22
23pub type APIConnPool<C, P> = ConnPool<APIFact<C>, P>;
24pub type APIFailoverPool<C, P> = FailoverPool<APIFact<C>, P>;
25
26impl<C: Codec> APIFact<C> {
27 pub fn new(config: ClientConfig) -> Arc<Self> {
28 Arc::new(Self { logger: Arc::new(LogFilter::new()), config, _phan: Default::default() })
29 }
30
31 #[inline]
32 pub fn set_log_level(&self, level: log::Level) {
33 self.logger.set_level(level);
34 }
35
36 pub fn new_conn_pool<P: ClientTransport>(
37 self: Arc<Self>, rt: &P::RT, addr: &str,
38 ) -> APIConnPool<C, P> {
39 ConnPool::<APIFact<C>, P>::new(self.clone(), rt, addr, 0)
40 }
41
42 pub fn new_failover<P: ClientTransport>(
43 self: Arc<Self>, rt: &P::RT, addrs: Vec<String>, stateless: bool, retry_limit: usize,
44 ) -> APIFailoverPool<C, P> {
45 FailoverPool::<APIFact<C>, P>::new(self.clone(), rt, addrs, stateless, retry_limit, 0)
46 }
47}
48
49impl<C: Codec> ClientFacts for APIFact<C> {
50 type Codec = C;
51 type Task = APIClientReq;
52
53 #[inline]
54 fn new_logger(&self) -> Arc<LogFilter> {
55 self.logger.clone()
56 }
57
58 #[inline]
59 fn get_config(&self) -> &ClientConfig {
60 &self.config
61 }
62}
63
64pub trait APIClientCaller: ClientCaller<Facts: ClientFacts<Task = APIClientReq>> {
65 fn call<Req, Resp, E>(
66 &self, service_method: &'static str, req: &Req,
67 ) -> impl Future<Output = Result<Resp, RpcError<E>>> + Send
68 where
69 Req: serde::Serialize + fmt::Debug + Send + Sync,
70 Resp: for<'a> serde::Deserialize<'a> + Send + fmt::Debug + 'static + Default,
71 E: RpcErrCodec;
72}
73
74impl<F, P> APIClientCaller for ConnPool<F, P>
75where
76 F: ClientFacts<Task = APIClientReq>,
77 P: ClientTransport,
78{
79 async fn call<Req, Resp, E>(
80 &self, service_method: &'static str, req: &Req,
81 ) -> Result<Resp, RpcError<E>>
82 where
83 Req: serde::Serialize + fmt::Debug + Send + Sync,
84 Resp: for<'a> serde::Deserialize<'a> + Send + fmt::Debug + 'static + Default,
85 E: RpcErrCodec,
86 {
87 let (tx, rx) = oneshot::<APIClientReq>();
88 let codec = <Self as ClientCaller>::get_codec(self);
89 let mut task = APIClientReq::new(&codec, service_method, req);
90 task.set_noti(tx);
91 self.send_req(task).await;
92 if let Ok(mut task) = rx.recv_async().await {
93 return task.process_res(&codec);
94 } else {
95 return Err(RpcIntErr::Internal.into());
96 }
97 }
98}
99
100impl<F, P> APIClientCaller for FailoverPool<F, P>
101where
102 F: ClientFacts<Task = APIClientReq>,
103 P: ClientTransport,
104{
105 async fn call<Req, Resp, E>(
106 &self, service_method: &'static str, req: &Req,
107 ) -> Result<Resp, RpcError<E>>
108 where
109 Req: serde::Serialize + fmt::Debug + Send + Sync,
110 Resp: for<'a> serde::Deserialize<'a> + Send + fmt::Debug + 'static + Default,
111 E: RpcErrCodec,
112 {
113 let codec = <Self as ClientCaller>::get_codec(self);
114 let mut retry_count = 0;
115 let max_retries = self.get_retry_limit();
116 let mut task = APIClientReq::new(&codec, service_method, req);
117 let (tx, mut rx) = oneshot::<APIClientReq>();
118 task.set_noti(tx);
119 self.send_req(task).await;
120 loop {
121 if let Ok(mut task) = rx.recv_async().await {
122 let result = task.process_res::<_, Resp, E>(&codec);
123 match result {
124 Ok(resp) => return Ok(resp),
125 Err(RpcError::Rpc(e)) => {
126 return Err(RpcError::Rpc(e));
128 }
129 Err(RpcError::User(e)) => {
130 retry_count += 1;
131 match e.should_failover() {
132 Ok(Some(redirect_addr)) => {
133 if retry_count < max_retries {
134 let (tx, _rx) = oneshot::<APIClientReq>();
136 task.set_noti(tx);
137 rx = _rx;
138 self.resubmit(
139 task,
140 Ok(redirect_addr.to_string()),
141 retry_count,
142 None,
143 )
144 .await;
145 continue;
146 }
147 }
148 Ok(None) => {
149 if retry_count < max_retries {
150 let (tx, _rx) = oneshot::<APIClientReq>();
152 task.set_noti(tx);
153 rx = _rx;
154 let last_index = task.last_index;
155 self.resubmit(task, Err(last_index), retry_count, None).await;
156 continue;
157 }
158 }
159 Err(()) => return Err(RpcError::User(e)),
160 }
161 return Err(RpcError::User(e));
162 }
163 }
164 } else {
165 return Err(RpcIntErr::Internal.into());
166 }
167 }
168 }
169}