pingora_core/protocols/
mod.rs1mod digest;
18pub mod http;
19pub mod l4;
20pub mod raw_connect;
21pub mod tls;
22#[cfg(windows)]
23mod windows;
24
25pub use digest::{
26 Digest, GetProxyDigest, GetSocketDigest, GetTimingDigest, ProtoDigest, SocketDigest,
27 TimingDigest,
28};
29pub use l4::ext::TcpKeepalive;
30pub use tls::ALPN;
31
32use async_trait::async_trait;
33use std::fmt::Debug;
34use std::net::{IpAddr, Ipv4Addr};
35use std::sync::Arc;
36
37#[cfg(unix)]
38pub type UniqueIDType = i32;
39#[cfg(windows)]
40pub type UniqueIDType = usize;
41
42#[async_trait]
44pub trait Shutdown {
45 async fn shutdown(&mut self) -> ();
46}
47
48pub trait UniqueID {
50 fn id(&self) -> UniqueIDType;
53}
54
55pub trait Ssl {
57 fn get_ssl(&self) -> Option<&TlsRef> {
59 None
60 }
61
62 fn get_ssl_digest(&self) -> Option<Arc<tls::SslDigest>> {
64 None
65 }
66
67 fn selected_alpn_proto(&self) -> Option<ALPN> {
69 None
70 }
71}
72
73#[async_trait]
75pub trait Peek {
76 async fn try_peek(&mut self, _buf: &mut [u8]) -> std::io::Result<bool> {
80 Ok(false)
81 }
82}
83
84use std::any::Any;
85use tokio::io::{AsyncRead, AsyncWrite};
86
87pub trait IO:
89 AsyncRead
90 + AsyncWrite
91 + Shutdown
92 + UniqueID
93 + Ssl
94 + GetTimingDigest
95 + GetProxyDigest
96 + GetSocketDigest
97 + Peek
98 + Unpin
99 + Debug
100 + Send
101 + Sync
102{
103 fn as_any(&self) -> &dyn Any;
105 fn into_any(self: Box<Self>) -> Box<dyn Any>;
107}
108
109impl<
110 T: AsyncRead
111 + AsyncWrite
112 + Shutdown
113 + UniqueID
114 + Ssl
115 + GetTimingDigest
116 + GetProxyDigest
117 + GetSocketDigest
118 + Peek
119 + Unpin
120 + Debug
121 + Send
122 + Sync,
123 > IO for T
124where
125 T: 'static,
126{
127 fn as_any(&self) -> &dyn Any {
128 self
129 }
130 fn into_any(self: Box<Self>) -> Box<dyn Any> {
131 self
132 }
133}
134
135pub type Stream = Box<dyn IO>;
137
138mod ext_io_impl {
140 use super::*;
141 use tokio_test::io::Mock;
142
143 #[async_trait]
144 impl Shutdown for Mock {
145 async fn shutdown(&mut self) -> () {}
146 }
147 impl UniqueID for Mock {
148 fn id(&self) -> UniqueIDType {
149 0
150 }
151 }
152 impl Ssl for Mock {}
153 impl GetTimingDigest for Mock {
154 fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
155 vec![]
156 }
157 }
158 impl GetProxyDigest for Mock {
159 fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
160 None
161 }
162 }
163 impl GetSocketDigest for Mock {
164 fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
165 None
166 }
167 }
168
169 impl Peek for Mock {}
170
171 use std::io::Cursor;
172
173 #[async_trait]
174 impl<T: Send> Shutdown for Cursor<T> {
175 async fn shutdown(&mut self) -> () {}
176 }
177 impl<T> UniqueID for Cursor<T> {
178 fn id(&self) -> UniqueIDType {
179 0
180 }
181 }
182 impl<T> Ssl for Cursor<T> {}
183 impl<T> GetTimingDigest for Cursor<T> {
184 fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
185 vec![]
186 }
187 }
188 impl<T> GetProxyDigest for Cursor<T> {
189 fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
190 None
191 }
192 }
193 impl<T> GetSocketDigest for Cursor<T> {
194 fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
195 None
196 }
197 }
198 impl<T> Peek for Cursor<T> {}
199
200 use tokio::io::DuplexStream;
201
202 #[async_trait]
203 impl Shutdown for DuplexStream {
204 async fn shutdown(&mut self) -> () {}
205 }
206 impl UniqueID for DuplexStream {
207 fn id(&self) -> UniqueIDType {
208 0
209 }
210 }
211 impl Ssl for DuplexStream {}
212 impl GetTimingDigest for DuplexStream {
213 fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
214 vec![]
215 }
216 }
217 impl GetProxyDigest for DuplexStream {
218 fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
219 None
220 }
221 }
222 impl GetSocketDigest for DuplexStream {
223 fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
224 None
225 }
226 }
227
228 impl Peek for DuplexStream {}
229}
230
231#[cfg(unix)]
232pub(crate) trait ConnFdReusable {
233 fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool;
234}
235
236#[cfg(windows)]
237pub(crate) trait ConnSockReusable {
238 fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool;
239}
240
241use l4::socket::SocketAddr;
242use log::{debug, error};
243#[cfg(unix)]
244use nix::sys::socket::{getpeername, SockaddrStorage, UnixAddr};
245#[cfg(unix)]
246use std::os::unix::prelude::AsRawFd;
247#[cfg(windows)]
248use std::os::windows::io::AsRawSocket;
249use std::{net::SocketAddr as InetSocketAddr, path::Path};
250
251use crate::protocols::tls::TlsRef;
252
253#[cfg(unix)]
254impl ConnFdReusable for SocketAddr {
255 fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
256 match self {
257 SocketAddr::Inet(addr) => addr.check_fd_match(fd),
258 SocketAddr::Unix(addr) => addr
259 .as_pathname()
260 .expect("non-pathname unix sockets not supported as peer")
261 .check_fd_match(fd),
262 }
263 }
264}
265
266#[cfg(windows)]
267impl ConnSockReusable for SocketAddr {
268 fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool {
269 match self {
270 SocketAddr::Inet(addr) => addr.check_sock_match(sock),
271 }
272 }
273}
274
275#[cfg(unix)]
276impl ConnFdReusable for Path {
277 fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
278 let fd = fd.as_raw_fd();
279 match getpeername::<UnixAddr>(fd) {
280 Ok(peer) => match UnixAddr::new(self) {
281 Ok(addr) => {
282 if addr == peer {
283 debug!("Unix FD to: {peer} is reusable");
284 true
285 } else {
286 error!("Crit: unix FD mismatch: fd: {fd:?}, peer: {peer}, addr: {addr}",);
287 false
288 }
289 }
290 Err(e) => {
291 error!("Bad addr: {self:?}, error: {e:?}");
292 false
293 }
294 },
295 Err(e) => {
296 error!("Idle unix connection is broken: {e:?}");
297 false
298 }
299 }
300 }
301}
302
303#[cfg(unix)]
304impl ConnFdReusable for InetSocketAddr {
305 fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
306 let fd = fd.as_raw_fd();
307 match getpeername::<SockaddrStorage>(fd) {
308 Ok(peer) => {
309 const ZERO: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
310 if self.ip() == ZERO {
311 return true;
316 }
317 let addr = SockaddrStorage::from(*self);
318 if addr == peer {
319 debug!("Inet FD to: {addr} is reusable");
320 true
321 } else {
322 error!("Crit: FD mismatch: fd: {fd:?}, addr: {addr}, peer: {peer}",);
323 false
324 }
325 }
326 Err(e) => {
327 debug!("Idle connection is broken: {e:?}");
328 false
329 }
330 }
331 }
332}
333
334#[cfg(windows)]
335impl ConnSockReusable for InetSocketAddr {
336 fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool {
337 let sock = sock.as_raw_socket();
338 match windows::peer_addr(sock) {
339 Ok(peer) => {
340 const ZERO: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
341 if self.ip() == ZERO {
342 return true;
347 }
348 if self == &peer {
349 debug!("Inet FD to: {self} is reusable");
350 true
351 } else {
352 error!("Crit: FD mismatch: fd: {sock:?}, addr: {self}, peer: {peer}",);
353 false
354 }
355 }
356 Err(e) => {
357 debug!("Idle connection is broken: {e:?}");
358 false
359 }
360 }
361 }
362}