razor_stream/client/
mod.rs1use crate::{Codec, error::RpcIntErr};
4use captains_log::filter::LogFilter;
5use crossfire::{AsyncRx, mpsc};
6use orb::AsyncRuntime;
7use std::future::Future;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10use std::{fmt, io};
11
12pub mod task;
13use task::{ClientTask, ClientTaskDone};
14
15pub mod stream;
16pub mod timer;
17use timer::ClientTaskTimer;
18
19mod pool;
20pub use pool::ConnPool;
21mod failover;
22pub use failover::FailoverPool;
23
24mod throttler;
25
26#[derive(Clone)]
28pub struct ClientConfig {
29 pub task_timeout: usize,
31 pub read_timeout: Duration,
33 pub write_timeout: Duration,
35 pub idle_timeout: Duration,
37 pub connect_timeout: Duration,
39 pub thresholds: usize,
41 pub stream_buf_size: usize,
43}
44
45impl Default for ClientConfig {
46 fn default() -> Self {
47 Self {
48 task_timeout: 20,
49 read_timeout: Duration::from_secs(5),
50 write_timeout: Duration::from_secs(5),
51 idle_timeout: Duration::from_secs(120),
52 connect_timeout: Duration::from_secs(10),
53 thresholds: 128,
54 stream_buf_size: 0,
55 }
56 }
57}
58
59pub trait ClientFacts: Send + Sync + Sized + 'static {
61 type Codec: Codec;
65
66 type Task: ClientTask;
73
74 fn get_config(&self) -> &ClientConfig;
76
77 fn new_logger(&self) -> Arc<LogFilter>;
81
82 #[inline(always)]
86 fn error_handle(&self, task: Self::Task) {
87 task.done();
88 }
89
90 #[inline(always)]
92 fn get_client_id(&self) -> u64 {
93 0
94 }
95
96 #[inline]
98 fn get_timestamp(&self) -> u64 {
99 SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
100 }
101}
102
103pub trait ClientCaller: Send {
106 type Facts: ClientFacts;
107 fn send_req(&self, task: <Self::Facts as ClientFacts>::Task)
108 -> impl Future<Output = ()> + Send;
109
110 fn get_codec(&self) -> <Self::Facts as ClientFacts>::Codec {
111 <Self::Facts as ClientFacts>::Codec::default()
112 }
113}
114
115pub trait ClientCallerBlocking: Send {
118 type Facts: ClientFacts;
119 fn send_req_blocking(&self, task: <Self::Facts as ClientFacts>::Task);
120
121 fn get_codec(&self) -> <Self::Facts as ClientFacts>::Codec {
122 <Self::Facts as ClientFacts>::Codec::default()
123 }
124}
125
126impl<C: ClientCaller + Send + Sync> ClientCaller for Arc<C> {
127 type Facts = C::Facts;
128
129 #[inline(always)]
130 async fn send_req(&self, task: <Self::Facts as ClientFacts>::Task) {
131 self.as_ref().send_req(task).await
132 }
133}
134
135impl<C: ClientCallerBlocking + Send + Sync> ClientCallerBlocking for Arc<C> {
136 type Facts = C::Facts;
137
138 #[inline(always)]
139 fn send_req_blocking(&self, task: <Self::Facts as ClientFacts>::Task) {
140 self.as_ref().send_req_blocking(task);
141 }
142}
143
144pub trait ClientTransport: fmt::Debug + Send + Sized + 'static {
150 type RT: AsyncRuntime + Clone;
151
152 fn connect(
156 addr: &str, conn_id: &str, config: &ClientConfig,
157 ) -> impl Future<Output = Result<Self, RpcIntErr>> + Send;
158
159 fn close_conn<F: ClientFacts>(&self, logger: &LogFilter) -> impl Future<Output = ()> + Send;
161
162 fn flush_req<F: ClientFacts>(
164 &self, logger: &LogFilter,
165 ) -> impl Future<Output = io::Result<()>> + Send;
166
167 fn write_req<'a, F: ClientFacts>(
169 &'a self, logger: &LogFilter, buf: &'a [u8], blob: Option<&'a [u8]>, need_flush: bool,
170 ) -> impl Future<Output = io::Result<()>> + Send;
171
172 fn read_resp<F: ClientFacts>(
174 &self, facts: &F, logger: &LogFilter, codec: &F::Codec,
175 close_ch: Option<&mut AsyncRx<mpsc::Null>>, task_reg: &mut ClientTaskTimer<F>,
176 ) -> impl std::future::Future<Output = Result<bool, RpcIntErr>> + Send;
177}
178
179pub struct ClientDefault<T: ClientTask, C: Codec> {
181 pub logger: Arc<LogFilter>,
182 config: ClientConfig,
183 _phan: std::marker::PhantomData<fn(&C, &T)>,
184}
185
186impl<T: ClientTask, C: Codec> ClientDefault<T, C> {
187 pub fn new(config: ClientConfig) -> Arc<Self> {
188 Arc::new(Self { logger: Arc::new(LogFilter::new()), config, _phan: Default::default() })
189 }
190
191 #[inline]
192 pub fn set_log_level(&self, level: log::Level) {
193 self.logger.set_level(level);
194 }
195}
196
197impl<T: ClientTask, C: Codec> ClientFacts for ClientDefault<T, C> {
198 type Codec = C;
199 type Task = T;
200
201 #[inline]
202 fn new_logger(&self) -> Arc<LogFilter> {
203 self.logger.clone()
204 }
205
206 #[inline]
207 fn get_config(&self) -> &ClientConfig {
208 &self.config
209 }
210}