use crate::{Codec, error::RpcIntErr};
use captains_log::filter::LogFilter;
use crossfire::{AsyncRx, mpsc};
use orb::AsyncRuntime;
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{fmt, io};
pub mod task;
use task::{ClientTask, ClientTaskDone};
pub mod stream;
pub mod timer;
use timer::ClientTaskTimer;
mod pool;
pub use pool::ConnPool;
mod failover;
pub use failover::FailoverPool;
mod throttler;
#[derive(Clone)]
pub struct ClientConfig {
pub task_timeout: usize,
pub read_timeout: Duration,
pub write_timeout: Duration,
pub idle_timeout: Duration,
pub connect_timeout: Duration,
pub thresholds: usize,
pub stream_buf_size: usize,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
task_timeout: 20,
read_timeout: Duration::from_secs(5),
write_timeout: Duration::from_secs(5),
idle_timeout: Duration::from_secs(120),
connect_timeout: Duration::from_secs(10),
thresholds: 128,
stream_buf_size: 0,
}
}
}
pub trait ClientFacts: Send + Sync + Sized + 'static {
type Codec: Codec;
type Task: ClientTask;
fn get_config(&self) -> &ClientConfig;
fn new_logger(&self) -> Arc<LogFilter>;
#[inline(always)]
fn error_handle(&self, task: Self::Task) {
task.done();
}
#[inline(always)]
fn get_client_id(&self) -> u64 {
0
}
#[inline]
fn get_timestamp(&self) -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
}
}
pub trait ClientCaller: Send {
type Facts: ClientFacts;
fn send_req(&self, task: <Self::Facts as ClientFacts>::Task)
-> impl Future<Output = ()> + Send;
fn get_codec(&self) -> <Self::Facts as ClientFacts>::Codec {
<Self::Facts as ClientFacts>::Codec::default()
}
}
pub trait ClientCallerBlocking: Send {
type Facts: ClientFacts;
fn send_req_blocking(&self, task: <Self::Facts as ClientFacts>::Task);
fn get_codec(&self) -> <Self::Facts as ClientFacts>::Codec {
<Self::Facts as ClientFacts>::Codec::default()
}
}
impl<C: ClientCaller + Send + Sync> ClientCaller for Arc<C> {
type Facts = C::Facts;
#[inline(always)]
async fn send_req(&self, task: <Self::Facts as ClientFacts>::Task) {
self.as_ref().send_req(task).await
}
}
impl<C: ClientCallerBlocking + Send + Sync> ClientCallerBlocking for Arc<C> {
type Facts = C::Facts;
#[inline(always)]
fn send_req_blocking(&self, task: <Self::Facts as ClientFacts>::Task) {
self.as_ref().send_req_blocking(task);
}
}
pub trait ClientTransport: fmt::Debug + Send + Sized + 'static {
type RT: AsyncRuntime;
fn connect(
addr: &str, conn_id: &str, config: &ClientConfig,
) -> impl Future<Output = Result<Self, RpcIntErr>> + Send;
fn close_conn<F: ClientFacts>(&self, logger: &LogFilter) -> impl Future<Output = ()> + Send;
fn flush_req<F: ClientFacts>(
&self, logger: &LogFilter,
) -> impl Future<Output = io::Result<()>> + Send;
fn write_req<'a, F: ClientFacts>(
&'a self, logger: &LogFilter, buf: &'a [u8], blob: Option<&'a [u8]>, need_flush: bool,
) -> impl Future<Output = io::Result<()>> + Send;
fn read_resp<F: ClientFacts>(
&self, facts: &F, logger: &LogFilter, codec: &F::Codec,
close_ch: Option<&mut AsyncRx<mpsc::Null>>, task_reg: &mut ClientTaskTimer<F>,
) -> impl std::future::Future<Output = Result<bool, RpcIntErr>> + Send;
}
pub struct ClientDefault<T: ClientTask, C: Codec> {
pub logger: Arc<LogFilter>,
config: ClientConfig,
_phan: std::marker::PhantomData<fn(&C, &T)>,
}
impl<T: ClientTask, C: Codec> ClientDefault<T, C> {
pub fn new(config: ClientConfig) -> Arc<Self> {
Arc::new(Self { logger: Arc::new(LogFilter::new()), config, _phan: Default::default() })
}
#[inline]
pub fn set_log_level(&self, level: log::Level) {
self.logger.set_level(level);
}
}
impl<T: ClientTask, C: Codec> ClientFacts for ClientDefault<T, C> {
type Codec = C;
type Task = T;
#[inline]
fn new_logger(&self) -> Arc<LogFilter> {
self.logger.clone()
}
#[inline]
fn get_config(&self) -> &ClientConfig {
&self.config
}
}