razor_stream/client/
mod.rs1use crate::{Codec, error::RpcIntErr};
4use captains_log::filter::LogFilter;
5use crossfire::{AsyncRx, mpsc};
6use std::future::Future;
7use std::sync::Arc;
8use std::time::Duration;
9use std::{fmt, io};
10
11pub mod task;
12use task::{ClientTask, ClientTaskDone};
13
14pub mod stream;
15pub mod timer;
16use timer::ClientTaskTimer;
17
18mod pool;
19pub use pool::ClientPool;
20mod failover;
21pub use failover::FailoverPool;
22
23mod throttler;
24
25#[derive(Clone)]
27pub struct ClientConfig {
28 pub task_timeout: usize,
30 pub read_timeout: Duration,
32 pub write_timeout: Duration,
34 pub idle_timeout: Duration,
36 pub connect_timeout: Duration,
38 pub thresholds: usize,
40 pub stream_buf_size: usize,
42}
43
44impl Default for ClientConfig {
45 fn default() -> Self {
46 Self {
47 task_timeout: 20,
48 read_timeout: Duration::from_secs(5),
49 write_timeout: Duration::from_secs(5),
50 idle_timeout: Duration::from_secs(120),
51 connect_timeout: Duration::from_secs(10),
52 thresholds: 128,
53 stream_buf_size: 0,
54 }
55 }
56}
57
58pub trait ClientFacts: orb::AsyncRuntime + Send + Sync + Sized + 'static {
67 type Codec: Codec;
71
72 type Task: ClientTask;
79
80 fn get_config(&self) -> &ClientConfig;
82
83 fn new_logger(&self) -> Arc<LogFilter>;
85 #[inline(always)]
91 fn error_handle(&self, task: Self::Task) {
92 task.done();
93 }
94
95 #[inline(always)]
97 fn get_client_id(&self) -> u64 {
98 0
99 }
100}
101
102pub trait ClientCaller: Send {
105 type Facts: ClientFacts;
106 fn send_req(&self, task: <Self::Facts as ClientFacts>::Task)
107 -> impl Future<Output = ()> + Send;
108}
109
110pub trait ClientCallerBlocking: Send {
113 type Facts: ClientFacts;
114 fn send_req_blocking(&self, task: <Self::Facts as ClientFacts>::Task);
115}
116
117impl<C: ClientCaller + Send + Sync> ClientCaller for Arc<C> {
118 type Facts = C::Facts;
119 #[inline(always)]
120 async fn send_req(&self, task: <Self::Facts as ClientFacts>::Task) {
121 self.as_ref().send_req(task).await
122 }
123}
124
125impl<C: ClientCallerBlocking + Send + Sync> ClientCallerBlocking for Arc<C> {
126 type Facts = C::Facts;
127
128 #[inline(always)]
129 fn send_req_blocking(&self, task: <Self::Facts as ClientFacts>::Task) {
130 self.as_ref().send_req_blocking(task);
131 }
132}
133
134pub trait ClientTransport: fmt::Debug + Send + Sized + 'static {
146 fn connect(
150 addr: &str, conn_id: &str, config: &ClientConfig,
151 ) -> impl Future<Output = Result<Self, RpcIntErr>> + Send;
152
153 fn close_conn<F: ClientFacts>(&self, logger: &LogFilter) -> impl Future<Output = ()> + Send;
155
156 fn flush_req<F: ClientFacts>(
158 &self, logger: &LogFilter,
159 ) -> impl Future<Output = io::Result<()>> + Send;
160
161 fn write_req<'a, F: ClientFacts>(
163 &'a self, logger: &LogFilter, buf: &'a [u8], blob: Option<&'a [u8]>, need_flush: bool,
164 ) -> impl Future<Output = io::Result<()>> + Send;
165
166 fn read_resp<F: ClientFacts>(
168 &self, facts: &F, logger: &LogFilter, codec: &F::Codec,
169 close_ch: Option<&mut AsyncRx<mpsc::Null>>, task_reg: &mut ClientTaskTimer<F>,
170 ) -> impl std::future::Future<Output = Result<bool, RpcIntErr>> + Send;
171}
172
173pub struct ClientDefault<T: ClientTask, RT: orb::AsyncRuntime, C: Codec> {
175 pub logger: Arc<LogFilter>,
176 config: ClientConfig,
177 rt: RT,
178 _phan: std::marker::PhantomData<fn(&C, &T)>,
179}
180
181impl<T: ClientTask, RT: orb::AsyncRuntime, C: Codec> ClientDefault<T, RT, C> {
182 pub fn new(config: ClientConfig, rt: RT) -> Arc<Self> {
183 Arc::new(Self { logger: Arc::new(LogFilter::new()), config, rt, _phan: Default::default() })
184 }
185
186 #[inline]
187 pub fn set_log_level(&self, level: log::Level) {
188 self.logger.set_level(level);
189 }
190}
191
192impl<T: ClientTask, RT: orb::AsyncRuntime, C: Codec> std::ops::Deref for ClientDefault<T, RT, C> {
193 type Target = RT;
194 fn deref(&self) -> &Self::Target {
195 &self.rt
196 }
197}
198
199impl<T: ClientTask, RT: orb::AsyncRuntime, C: Codec> ClientFacts for ClientDefault<T, RT, C> {
200 type Codec = C;
201 type Task = T;
202
203 #[inline]
204 fn new_logger(&self) -> Arc<LogFilter> {
205 self.logger.clone()
206 }
207
208 #[inline]
209 fn get_config(&self) -> &ClientConfig {
210 &self.config
211 }
212}