Skip to main content

razor_stream/client/
mod.rs

1//! The module contains traits defined for the client-side
2
3use crate::{Codec, error::RpcIntErr};
4use captains_log::filter::LogFilter;
5use crossfire::{AsyncRx, mpsc};
6use orb::AsyncRuntime;
7use std::future::Future;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10use std::{fmt, io};
11
12pub mod task;
13use task::{ClientTask, ClientTaskDone};
14
15pub mod stream;
16pub mod timer;
17use timer::ClientTaskTimer;
18
19mod pool;
20pub use pool::ConnPool;
21mod failover;
22pub use failover::FailoverPool;
23
24mod throttler;
25
26/// General config for client-side
27#[derive(Clone)]
28pub struct ClientConfig {
29    /// timeout of RpcTask waiting for response, in seconds.
30    pub task_timeout: usize,
31    /// socket read timeout
32    pub read_timeout: Duration,
33    /// Socket write timeout
34    pub write_timeout: Duration,
35    /// Socket idle time to be close. for connection pool.
36    pub idle_timeout: Duration,
37    /// connect timeout
38    pub connect_timeout: Duration,
39    /// How many async RpcTask in the queue, prevent overflow server capacity
40    pub thresholds: usize,
41    /// In bytes. when non-zero, overwrite the default DEFAULT_BUF_SIZE of transport
42    pub stream_buf_size: usize,
43}
44
45impl Default for ClientConfig {
46    fn default() -> Self {
47        Self {
48            task_timeout: 20,
49            read_timeout: Duration::from_secs(5),
50            write_timeout: Duration::from_secs(5),
51            idle_timeout: Duration::from_secs(120),
52            connect_timeout: Duration::from_secs(10),
53            thresholds: 128,
54            stream_buf_size: 0,
55        }
56    }
57}
58
59/// A trait implemented by the user for the client-side, to define the customizable plugin.
60pub trait ClientFacts: Send + Sync + Sized + 'static {
61    /// Define the codec to serialization and deserialization
62    ///
63    /// Refers to [Codec]
64    type Codec: Codec;
65
66    /// Define the RPC task from client-side
67    ///
68    /// Either one ClientTask or an enum of multiple ClientTask.
69    /// If you have multiple task type, recommend to use the `enum_dispatch` crate.
70    ///
71    /// You can use macro [client_task_enum](crate::client::task::client_task_enum) and [client_task](crate::client::task::client_task) on task type
72    type Task: ClientTask;
73
74    /// You should keep ClientConfig inside, get_config() will return the reference.
75    fn get_config(&self) -> &ClientConfig;
76
77    /// Construct a [captains_log::filter::Filter](https://docs.rs/captains-log/latest/captains_log/filter/trait.Filter.html) to oganize log of a client
78    ///
79    /// TODO: Fix the logger interface
80    fn new_logger(&self) -> Arc<LogFilter>;
81
82    /// How to deal with error
83    ///
84    /// The FailoverPool will overwrite this to implement retry logic
85    #[inline(always)]
86    fn error_handle(&self, task: Self::Task) {
87        task.done();
88    }
89
90    /// You can overwrite this to assign a client_id
91    #[inline(always)]
92    fn get_client_id(&self) -> u64 {
93        0
94    }
95
96    /// NOTE: you may overwrite this to use coarstime or quanta
97    #[inline]
98    fn get_timestamp(&self) -> u64 {
99        SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
100    }
101}
102
103/// A trait to support sending request task in async text, for all router and connection pool
104/// implementations
105pub trait ClientCaller: Send {
106    type Facts: ClientFacts;
107    fn send_req(&self, task: <Self::Facts as ClientFacts>::Task)
108    -> impl Future<Output = ()> + Send;
109
110    fn get_codec(&self) -> <Self::Facts as ClientFacts>::Codec {
111        <Self::Facts as ClientFacts>::Codec::default()
112    }
113}
114
115/// A trait to support sending request task in blocking text, for all router and connection pool
116/// implementations
117pub trait ClientCallerBlocking: Send {
118    type Facts: ClientFacts;
119    fn send_req_blocking(&self, task: <Self::Facts as ClientFacts>::Task);
120
121    fn get_codec(&self) -> <Self::Facts as ClientFacts>::Codec {
122        <Self::Facts as ClientFacts>::Codec::default()
123    }
124}
125
126impl<C: ClientCaller + Send + Sync> ClientCaller for Arc<C> {
127    type Facts = C::Facts;
128
129    #[inline(always)]
130    async fn send_req(&self, task: <Self::Facts as ClientFacts>::Task) {
131        self.as_ref().send_req(task).await
132    }
133}
134
135impl<C: ClientCallerBlocking + Send + Sync> ClientCallerBlocking for Arc<C> {
136    type Facts = C::Facts;
137
138    #[inline(always)]
139    fn send_req_blocking(&self, task: <Self::Facts as ClientFacts>::Task) {
140        self.as_ref().send_req_blocking(task);
141    }
142}
143
144/// This trait is for client-side transport layer protocol.
145///
146/// The implementation can be found on:
147///
148/// - [razor-rpc-tcp](https://docs.rs/razor-rpc-tcp): For TCP and Unix socket
149pub trait ClientTransport: fmt::Debug + Send + Sized + 'static {
150    type RT: AsyncRuntime + Clone;
151
152    /// How to establish an async connection.
153    ///
154    /// conn_id: used for log fmt, can by the same of addr.
155    fn connect(
156        addr: &str, conn_id: &str, config: &ClientConfig,
157    ) -> impl Future<Output = Result<Self, RpcIntErr>> + Send;
158
159    /// Shutdown the write direction of the connection
160    fn close_conn<F: ClientFacts>(&self, logger: &LogFilter) -> impl Future<Output = ()> + Send;
161
162    /// Flush the request for the socket writer, if the transport has buffering logic
163    fn flush_req<F: ClientFacts>(
164        &self, logger: &LogFilter,
165    ) -> impl Future<Output = io::Result<()>> + Send;
166
167    /// Write out the encoded request task
168    fn write_req<'a, F: ClientFacts>(
169        &'a self, logger: &LogFilter, buf: &'a [u8], blob: Option<&'a [u8]>, need_flush: bool,
170    ) -> impl Future<Output = io::Result<()>> + Send;
171
172    /// Read the response and decode it from the socket, find and notify the registered ClientTask
173    fn read_resp<F: ClientFacts>(
174        &self, facts: &F, logger: &LogFilter, codec: &F::Codec,
175        close_ch: Option<&mut AsyncRx<mpsc::Null>>, task_reg: &mut ClientTaskTimer<F>,
176    ) -> impl std::future::Future<Output = Result<bool, RpcIntErr>> + Send;
177}
178
179/// An example ClientFacts for general use
180pub struct ClientDefault<T: ClientTask, C: Codec> {
181    pub logger: Arc<LogFilter>,
182    config: ClientConfig,
183    _phan: std::marker::PhantomData<fn(&C, &T)>,
184}
185
186impl<T: ClientTask, C: Codec> ClientDefault<T, C> {
187    pub fn new(config: ClientConfig) -> Arc<Self> {
188        Arc::new(Self { logger: Arc::new(LogFilter::new()), config, _phan: Default::default() })
189    }
190
191    #[inline]
192    pub fn set_log_level(&self, level: log::Level) {
193        self.logger.set_level(level);
194    }
195}
196
197impl<T: ClientTask, C: Codec> ClientFacts for ClientDefault<T, C> {
198    type Codec = C;
199    type Task = T;
200
201    #[inline]
202    fn new_logger(&self) -> Arc<LogFilter> {
203        self.logger.clone()
204    }
205
206    #[inline]
207    fn get_config(&self) -> &ClientConfig {
208        &self.config
209    }
210}