Skip to main content

razor_stream/client/
mod.rs

1//! The module contains traits defined for the client-side
2
3use crate::{Codec, error::RpcIntErr};
4use captains_log::filter::LogFilter;
5use crossfire::{AsyncRx, mpsc};
6use std::future::Future;
7use std::sync::Arc;
8use std::time::{Duration, SystemTime, UNIX_EPOCH};
9use std::{fmt, io};
10
11pub mod task;
12use task::{ClientTask, ClientTaskDone};
13
14pub mod stream;
15pub mod timer;
16use timer::ClientTaskTimer;
17
18mod pool;
19pub use pool::ClientPool;
20mod failover;
21pub use failover::FailoverPool;
22
23mod throttler;
24
25/// General config for client-side
26#[derive(Clone)]
27pub struct ClientConfig {
28    /// timeout of RpcTask waiting for response, in seconds.
29    pub task_timeout: usize,
30    /// socket read timeout
31    pub read_timeout: Duration,
32    /// Socket write timeout
33    pub write_timeout: Duration,
34    /// Socket idle time to be close. for connection pool.
35    pub idle_timeout: Duration,
36    /// connect timeout
37    pub connect_timeout: Duration,
38    /// How many async RpcTask in the queue, prevent overflow server capacity
39    pub thresholds: usize,
40    /// In bytes. when non-zero, overwrite the default DEFAULT_BUF_SIZE of transport
41    pub stream_buf_size: usize,
42}
43
44impl Default for ClientConfig {
45    fn default() -> Self {
46        Self {
47            task_timeout: 20,
48            read_timeout: Duration::from_secs(5),
49            write_timeout: Duration::from_secs(5),
50            idle_timeout: Duration::from_secs(120),
51            connect_timeout: Duration::from_secs(10),
52            thresholds: 128,
53            stream_buf_size: 0,
54        }
55    }
56}
57
58/// A trait implemented by the user for the client-side, to define the customizable plugin.
59///
60/// # NOTE
61///
62/// If you choose implement this trait rather than use [ClientDefault].
63/// We recommend your implementation to Deref<Target=orb::AsyncRuntime>
64/// then the blanket trait in `orb::AsyncRuntime` will automatically impl AsyncRuntime on your ClientFacts type.
65/// Refer to the code of [ClientDefault] for example.
66pub trait ClientFacts: orb::AsyncRuntime + Send + Sync + Sized + 'static {
67    /// Define the codec to serialization and deserialization
68    ///
69    /// Refers to [Codec]
70    type Codec: Codec;
71
72    /// Define the RPC task from client-side
73    ///
74    /// Either one ClientTask or an enum of multiple ClientTask.
75    /// If you have multiple task type, recommend to use the `enum_dispatch` crate.
76    ///
77    /// You can use macro [client_task_enum](crate::client::task::client_task_enum) and [client_task](crate::client::task::client_task) on task type
78    type Task: ClientTask;
79
80    /// You should keep ClientConfig inside, get_config() will return the reference.
81    fn get_config(&self) -> &ClientConfig;
82
83    /// Construct a [captains_log::filter::Filter](https://docs.rs/captains-log/latest/captains_log/filter/trait.Filter.html) to oganize log of a client
84    ///
85    /// TODO: Fix the logger interface
86    fn new_logger(&self) -> Arc<LogFilter>;
87
88    /// How to deal with error
89    ///
90    /// The FailoverPool will overwrite this to implement retry logic
91    #[inline(always)]
92    fn error_handle(&self, task: Self::Task) {
93        task.done();
94    }
95
96    /// You can overwrite this to assign a client_id
97    #[inline(always)]
98    fn get_client_id(&self) -> u64 {
99        0
100    }
101
102    /// NOTE: you may overwrite this to use coarstime or quanta
103    #[inline]
104    fn get_timestamp(&self) -> u64 {
105        SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
106    }
107}
108
109/// A trait to support sending request task in async text, for all router and connection pool
110/// implementations
111pub trait ClientCaller: Send {
112    type Facts: ClientFacts;
113    fn send_req(&self, task: <Self::Facts as ClientFacts>::Task)
114    -> impl Future<Output = ()> + Send;
115}
116
117/// A trait to support sending request task in blocking text, for all router and connection pool
118/// implementations
119pub trait ClientCallerBlocking: Send {
120    type Facts: ClientFacts;
121    fn send_req_blocking(&self, task: <Self::Facts as ClientFacts>::Task);
122}
123
124impl<C: ClientCaller + Send + Sync> ClientCaller for Arc<C> {
125    type Facts = C::Facts;
126    #[inline(always)]
127    async fn send_req(&self, task: <Self::Facts as ClientFacts>::Task) {
128        self.as_ref().send_req(task).await
129    }
130}
131
132impl<C: ClientCallerBlocking + Send + Sync> ClientCallerBlocking for Arc<C> {
133    type Facts = C::Facts;
134
135    #[inline(always)]
136    fn send_req_blocking(&self, task: <Self::Facts as ClientFacts>::Task) {
137        self.as_ref().send_req_blocking(task);
138    }
139}
140
141/// This trait is for client-side transport layer protocol.
142///
143/// The implementation can be found on:
144///
145/// - [razor-rpc-tcp](https://docs.rs/razor-rpc-tcp): For TCP and Unix socket
146///
147/// # NOTE:
148///
149/// Instead of binding this to ClientFacts,
150/// we use the associate type `RT` in generic param instead of ClientFacts to break cycle dep.
151/// because [FailoverPool] will rewrap the facts into its own.
152pub trait ClientTransport: fmt::Debug + Send + Sized + 'static {
153    /// How to establish an async connection.
154    ///
155    /// conn_id: used for log fmt, can by the same of addr.
156    fn connect(
157        addr: &str, conn_id: &str, config: &ClientConfig,
158    ) -> impl Future<Output = Result<Self, RpcIntErr>> + Send;
159
160    /// Shutdown the write direction of the connection
161    fn close_conn<F: ClientFacts>(&self, logger: &LogFilter) -> impl Future<Output = ()> + Send;
162
163    /// Flush the request for the socket writer, if the transport has buffering logic
164    fn flush_req<F: ClientFacts>(
165        &self, logger: &LogFilter,
166    ) -> impl Future<Output = io::Result<()>> + Send;
167
168    /// Write out the encoded request task
169    fn write_req<'a, F: ClientFacts>(
170        &'a self, logger: &LogFilter, buf: &'a [u8], blob: Option<&'a [u8]>, need_flush: bool,
171    ) -> impl Future<Output = io::Result<()>> + Send;
172
173    /// Read the response and decode it from the socket, find and notify the registered ClientTask
174    fn read_resp<F: ClientFacts>(
175        &self, facts: &F, logger: &LogFilter, codec: &F::Codec,
176        close_ch: Option<&mut AsyncRx<mpsc::Null>>, task_reg: &mut ClientTaskTimer<F>,
177    ) -> impl std::future::Future<Output = Result<bool, RpcIntErr>> + Send;
178}
179
180/// An example ClientFacts for general use
181pub struct ClientDefault<T: ClientTask, RT: orb::AsyncRuntime, C: Codec> {
182    pub logger: Arc<LogFilter>,
183    config: ClientConfig,
184    rt: RT,
185    _phan: std::marker::PhantomData<fn(&C, &T)>,
186}
187
188impl<T: ClientTask, RT: orb::AsyncRuntime, C: Codec> ClientDefault<T, RT, C> {
189    pub fn new(config: ClientConfig, rt: RT) -> Arc<Self> {
190        Arc::new(Self { logger: Arc::new(LogFilter::new()), config, rt, _phan: Default::default() })
191    }
192
193    #[inline]
194    pub fn set_log_level(&self, level: log::Level) {
195        self.logger.set_level(level);
196    }
197}
198
199impl<T: ClientTask, RT: orb::AsyncRuntime, C: Codec> std::ops::Deref for ClientDefault<T, RT, C> {
200    type Target = RT;
201    fn deref(&self) -> &Self::Target {
202        &self.rt
203    }
204}
205
206impl<T: ClientTask, RT: orb::AsyncRuntime, C: Codec> ClientFacts for ClientDefault<T, RT, C> {
207    type Codec = C;
208    type Task = T;
209
210    #[inline]
211    fn new_logger(&self) -> Arc<LogFilter> {
212        self.logger.clone()
213    }
214
215    #[inline]
216    fn get_config(&self) -> &ClientConfig {
217        &self.config
218    }
219}