monoio_client/tcp_client/
mod.rsuse std::{
future::Future,
net::SocketAddr,
ops::{Deref, DerefMut},
time::Duration,
};
use monoio::{buf::IoBuf, io::AsyncWriteRentExt};
use crate::client::{
Client, ClientBuilder, ClientError, Connector, Metrics, Multiplex, ReuseError, ReuseResult,
};
use std::net::ToSocketAddrs;
use monoio::net::TcpStream;
use thiserror::Error as ThisError;
#[derive(ThisError, Debug)]
pub enum TcpClientError {
#[error("IO error {0}")]
IOError(#[from] std::io::Error),
#[error("ClientError error {0}")]
ClientError(#[from] ClientError<std::io::Error>),
}
#[derive(Clone, Debug)]
struct TcpClientInner {
config: TcpClientBuilder,
}
#[derive(Clone, Debug)]
pub struct TcpClient {
inner: Client<TcpClientInner>,
}
impl TcpClient {
pub async fn send_buf<K: ToSocketAddrs, T: IoBuf>(
&self,
key: K,
buf: T,
) -> Result<T, TcpClientError> {
let addr = key.to_socket_addrs()?.next().unwrap();
let mut conn = self.inner.get(addr).await?;
let (result, buf) = conn.write_all(buf).await;
match result {
Ok(_) => Ok(buf),
Err(e) => {
conn.set_io_error(&e);
Err(e.into())
}
}
}
}
#[derive(Clone, Debug)]
pub struct TcpClientBuilder {
tcp_fast_open: bool,
no_delay: bool,
tcp_keepalive: Option<(Option<Duration>, Option<Duration>, Option<u32>)>,
max_conns: usize,
connect_timeout: Option<Duration>,
}
impl TcpClientBuilder {
pub fn new() -> Self {
Self {
tcp_fast_open: false,
no_delay: false,
tcp_keepalive: None,
max_conns: 1024,
connect_timeout: None,
}
}
pub fn tcp_fast_open(mut self, fast_open: bool) -> Self {
self.tcp_fast_open = fast_open;
self
}
pub fn no_delay(mut self, no_delay: bool) -> Self {
self.no_delay = no_delay;
self
}
pub fn tcp_keepalive(
mut self,
time: Option<Duration>,
interval: Option<Duration>,
retries: Option<u32>,
) -> Self {
self.tcp_keepalive = Some((time, interval, retries));
self
}
pub fn max_conns(mut self, conns: usize) -> Self {
self.max_conns = conns;
self
}
pub fn connect_timeout(mut self, value: Duration) -> Self {
self.connect_timeout = Some(value);
self
}
pub fn build(self) -> TcpClient {
let inner = ClientBuilder::new(TcpClientInner {
config: self.clone(),
})
.connect_timeout(self.connect_timeout)
.build();
TcpClient { inner }
}
}
impl Default for TcpClientBuilder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct TcpConn {
inner: TcpStream,
io_error: Option<std::io::Error>,
}
impl TcpConn {
fn set_io_error(&mut self, e: &std::io::Error) {
let cloned_error = std::io::Error::new(e.kind(), "IO error");
self.io_error = Some(cloned_error)
}
}
impl Multiplex for TcpConn {}
impl Deref for TcpConn {
type Target = TcpStream;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for TcpConn {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl Connector for TcpClientInner {
type Connection = TcpConn;
type Key = SocketAddr;
type Error = std::io::Error;
type ConnectionFuture<'a> = impl Future<Output = Result<Self::Connection, Self::Error>> + 'a;
type ReuseFuture<'a> = impl Future<Output = ReuseResult<Self::Error>> + 'a;
fn connect(&self, key: Self::Key) -> Self::ConnectionFuture<'_> {
async move {
let stream = TcpStream::connect(key).await?;
if let Some(c) = self.config.tcp_keepalive {
stream.set_tcp_keepalive(c.0, c.1, c.2)?;
}
if self.config.no_delay {
stream.set_nodelay(true)?;
}
Ok(TcpConn {
inner: stream,
io_error: None,
})
}
}
fn reuse<'a>(
&'a self,
conn: &'a mut Self::Connection,
_metrics: &Metrics,
) -> Self::ReuseFuture<'_> {
async move {
match conn.io_error.take() {
Some(e) => Err(ReuseError::Backend(e)),
None => Ok(()),
}
}
}
}