monoio_http_client/client/
unified.rs1use std::{
2 io,
3 net::ToSocketAddrs,
4 path::{Path, PathBuf},
5};
6
7use monoio::{
8 buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut},
9 io::{AsyncReadRent, AsyncWriteRent, Split},
10 net::{TcpStream, UnixStream},
11 BufResult,
12};
13use service_async::Param;
14use smol_str::SmolStr;
15
16use super::connector::{TcpConnector, TlsConnector, TlsStream, UnixConnector};
17use crate::Connector;
18
19#[derive(Clone)]
21pub enum UnifiedTransportAddr {
22 Tcp(SmolStr, u16),
23 Unix(PathBuf),
24 TcpTls(SmolStr, u16, super::key::ServerName),
25 UnixTls(PathBuf, super::key::ServerName),
26}
27
28struct TcpTlsAddr<'a>(&'a SmolStr, u16, &'a super::key::ServerName);
29struct UnixTlsAddr<'a>(&'a PathBuf, &'a super::key::ServerName);
30impl<'a> ToSocketAddrs for TcpTlsAddr<'a> {
31 type Iter = <(&'static str, u16) as ToSocketAddrs>::Iter;
32 fn to_socket_addrs(&self) -> io::Result<Self::Iter> {
33 (self.0.as_str(), self.1).to_socket_addrs()
34 }
35}
36impl<'a> service_async::Param<super::key::ServerName> for TcpTlsAddr<'a> {
37 fn param(&self) -> super::key::ServerName {
38 self.2.clone()
39 }
40}
41impl<'a> AsRef<Path> for UnixTlsAddr<'a> {
42 fn as_ref(&self) -> &Path {
43 self.0
44 }
45}
46impl<'a> service_async::Param<super::key::ServerName> for UnixTlsAddr<'a> {
47 fn param(&self) -> super::key::ServerName {
48 self.1.clone()
49 }
50}
51
52#[derive(Default, Clone, Debug)]
53pub struct UnifiedTransportConnector {
54 raw_tcp: TcpConnector,
55 raw_unix: UnixConnector,
56 tcp_tls: TlsConnector<TcpConnector>,
57 unix_tls: TlsConnector<UnixConnector>,
58}
59
60pub enum UnifiedTransportConnection {
61 Tcp(TcpStream),
62 Unix(UnixStream),
63 TcpTls(TlsStream<TcpStream>),
64 UnixTls(TlsStream<UnixStream>),
65 }
68
69impl std::fmt::Debug for UnifiedTransportConnection {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 match self {
72 Self::Tcp(_) => write!(f, "Tcp"),
73 Self::Unix(_) => write!(f, "Unix"),
74 Self::TcpTls(_) => write!(f, "TcpTls"),
75 Self::UnixTls(_) => write!(f, "UnixTls"),
76 }
77 }
78}
79
80impl<T> Connector<T> for UnifiedTransportConnector
81where
82 T: Param<UnifiedTransportAddr>,
83{
84 type Connection = UnifiedTransportConnection;
85 type Error = crate::Error;
86
87 async fn connect(&self, key: T) -> Result<Self::Connection, Self::Error> {
88 let unified_addr = key.param();
89 match &unified_addr {
90 UnifiedTransportAddr::Tcp(addr, port) => self
91 .raw_tcp
92 .connect((addr.as_str(), *port))
93 .await
94 .map_err(Into::into)
95 .map(UnifiedTransportConnection::Tcp),
96 UnifiedTransportAddr::Unix(path) => self
97 .raw_unix
98 .connect(path)
99 .await
100 .map_err(Into::into)
101 .map(UnifiedTransportConnection::Unix),
102 UnifiedTransportAddr::TcpTls(addr, port, tls) => self
103 .tcp_tls
104 .connect(TcpTlsAddr(addr, *port, tls))
105 .await
106 .map_err(Into::into)
107 .map(UnifiedTransportConnection::TcpTls),
108 UnifiedTransportAddr::UnixTls(path, tls) => self
109 .unix_tls
110 .connect(UnixTlsAddr(path, tls))
111 .await
112 .map_err(Into::into)
113 .map(UnifiedTransportConnection::UnixTls),
114 }
115 }
116}
117
118impl AsyncReadRent for UnifiedTransportConnection {
119 async fn read<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
120 match self {
121 UnifiedTransportConnection::Tcp(s) => s.read(buf).await,
122 UnifiedTransportConnection::Unix(s) => s.read(buf).await,
123 UnifiedTransportConnection::TcpTls(s) => s.read(buf).await,
124 UnifiedTransportConnection::UnixTls(s) => s.read(buf).await,
125 }
126 }
127
128 async fn readv<T: IoVecBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
129 match self {
130 UnifiedTransportConnection::Tcp(s) => s.readv(buf).await,
131 UnifiedTransportConnection::Unix(s) => s.readv(buf).await,
132 UnifiedTransportConnection::TcpTls(s) => s.readv(buf).await,
133 UnifiedTransportConnection::UnixTls(s) => s.readv(buf).await,
134 }
135 }
136}
137
138impl AsyncWriteRent for UnifiedTransportConnection {
139 async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
140 match self {
141 UnifiedTransportConnection::Tcp(s) => s.write(buf).await,
142 UnifiedTransportConnection::Unix(s) => s.write(buf).await,
143 UnifiedTransportConnection::TcpTls(s) => s.write(buf).await,
144 UnifiedTransportConnection::UnixTls(s) => s.write(buf).await,
145 }
146 }
147
148 async fn writev<T: IoVecBuf>(&mut self, buf: T) -> BufResult<usize, T> {
149 match self {
150 UnifiedTransportConnection::Tcp(s) => s.writev(buf).await,
151 UnifiedTransportConnection::Unix(s) => s.writev(buf).await,
152 UnifiedTransportConnection::TcpTls(s) => s.writev(buf).await,
153 UnifiedTransportConnection::UnixTls(s) => s.writev(buf).await,
154 }
155 }
156
157 async fn flush(&mut self) -> io::Result<()> {
158 match self {
159 UnifiedTransportConnection::Tcp(s) => s.flush().await,
160 UnifiedTransportConnection::Unix(s) => s.flush().await,
161 UnifiedTransportConnection::TcpTls(s) => s.flush().await,
162 UnifiedTransportConnection::UnixTls(s) => s.flush().await,
163 }
164 }
165
166 async fn shutdown(&mut self) -> io::Result<()> {
167 match self {
168 UnifiedTransportConnection::Tcp(s) => s.shutdown().await,
169 UnifiedTransportConnection::Unix(s) => s.shutdown().await,
170 UnifiedTransportConnection::TcpTls(s) => s.shutdown().await,
171 UnifiedTransportConnection::UnixTls(s) => s.shutdown().await,
172 }
173 }
174}
175
176unsafe impl Split for UnifiedTransportConnection {}