pingora_core/protocols/
mod.rs1mod digest;
18pub mod http;
19pub mod l4;
20pub mod raw_connect;
21pub mod tls;
22#[cfg(windows)]
23mod windows;
24
25pub use digest::{
26 Digest, GetProxyDigest, GetSocketDigest, GetTimingDigest, ProtoDigest, SocketDigest,
27 TimingDigest,
28};
29pub use l4::ext::TcpKeepalive;
30pub use tls::ALPN;
31
32use async_trait::async_trait;
33use std::fmt::Debug;
34use std::net::{IpAddr, Ipv4Addr};
35use std::sync::Arc;
36
37#[cfg(unix)]
38pub type UniqueIDType = i32;
39#[cfg(windows)]
40pub type UniqueIDType = usize;
41
42#[async_trait]
44pub trait Shutdown {
45 async fn shutdown(&mut self) -> ();
46}
47
48pub trait UniqueID {
50 fn id(&self) -> UniqueIDType;
53}
54
55pub trait Ssl {
57 fn get_ssl(&self) -> Option<&TlsRef> {
59 None
60 }
61
62 fn get_ssl_digest(&self) -> Option<Arc<tls::SslDigest>> {
64 None
65 }
66
67 fn selected_alpn_proto(&self) -> Option<ALPN> {
69 None
70 }
71}
72
73#[async_trait]
75pub trait Peek {
76 async fn try_peek(&mut self, _buf: &mut [u8]) -> std::io::Result<bool> {
80 Ok(false)
81 }
82}
83
84use std::any::Any;
85use tokio::io::{AsyncRead, AsyncWrite};
86
87pub trait IO:
89 AsyncRead
90 + AsyncWrite
91 + Shutdown
92 + UniqueID
93 + Ssl
94 + GetTimingDigest
95 + GetProxyDigest
96 + GetSocketDigest
97 + Peek
98 + Unpin
99 + Debug
100 + Send
101 + Sync
102{
103 fn as_any(&self) -> &dyn Any;
105 fn into_any(self: Box<Self>) -> Box<dyn Any>;
107}
108
109impl<
110 T: AsyncRead
111 + AsyncWrite
112 + Shutdown
113 + UniqueID
114 + Ssl
115 + GetTimingDigest
116 + GetProxyDigest
117 + GetSocketDigest
118 + Peek
119 + Unpin
120 + Debug
121 + Send
122 + Sync,
123 > IO for T
124where
125 T: 'static,
126{
127 fn as_any(&self) -> &dyn Any {
128 self
129 }
130 fn into_any(self: Box<Self>) -> Box<dyn Any> {
131 self
132 }
133}
134
135pub type Stream = Box<dyn IO>;
137
138mod ext_io_impl {
140 use super::*;
141 use tokio_test::io::Mock;
142
143 #[async_trait]
144 impl Shutdown for Mock {
145 async fn shutdown(&mut self) -> () {}
146 }
147 impl UniqueID for Mock {
148 fn id(&self) -> UniqueIDType {
149 0
150 }
151 }
152 impl Ssl for Mock {}
153 impl GetTimingDigest for Mock {
154 fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
155 vec![]
156 }
157 }
158 impl GetProxyDigest for Mock {
159 fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
160 None
161 }
162 }
163 impl GetSocketDigest for Mock {
164 fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
165 None
166 }
167 }
168
169 impl Peek for Mock {}
170
171 use std::io::Cursor;
172
173 #[async_trait]
174 impl<T: Send> Shutdown for Cursor<T> {
175 async fn shutdown(&mut self) -> () {}
176 }
177 impl<T> UniqueID for Cursor<T> {
178 fn id(&self) -> UniqueIDType {
179 0
180 }
181 }
182 impl<T> Ssl for Cursor<T> {}
183 impl<T> GetTimingDigest for Cursor<T> {
184 fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
185 vec![]
186 }
187 }
188 impl<T> GetProxyDigest for Cursor<T> {
189 fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
190 None
191 }
192 }
193 impl<T> GetSocketDigest for Cursor<T> {
194 fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
195 None
196 }
197 }
198 impl<T> Peek for Cursor<T> {}
199
200 use tokio::io::DuplexStream;
201
202 #[async_trait]
203 impl Shutdown for DuplexStream {
204 async fn shutdown(&mut self) -> () {}
205 }
206 impl UniqueID for DuplexStream {
207 fn id(&self) -> UniqueIDType {
208 0
209 }
210 }
211 impl Ssl for DuplexStream {}
212 impl GetTimingDigest for DuplexStream {
213 fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
214 vec![]
215 }
216 }
217 impl GetProxyDigest for DuplexStream {
218 fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
219 None
220 }
221 }
222 impl GetSocketDigest for DuplexStream {
223 fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
224 None
225 }
226 }
227
228 impl Peek for DuplexStream {}
229}
230
231#[cfg(unix)]
232pub mod ext_test {
233 use std::sync::Arc;
234
235 use async_trait::async_trait;
236
237 use super::{
238 raw_connect, GetProxyDigest, GetSocketDigest, GetTimingDigest, Peek, Shutdown,
239 SocketDigest, Ssl, TimingDigest, UniqueID, UniqueIDType,
240 };
241
242 #[async_trait]
243 impl Shutdown for tokio::net::UnixStream {
244 async fn shutdown(&mut self) -> () {}
245 }
246 impl UniqueID for tokio::net::UnixStream {
247 fn id(&self) -> UniqueIDType {
248 0
249 }
250 }
251 impl Ssl for tokio::net::UnixStream {}
252 impl GetTimingDigest for tokio::net::UnixStream {
253 fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
254 vec![]
255 }
256 }
257 impl GetProxyDigest for tokio::net::UnixStream {
258 fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
259 None
260 }
261 }
262 impl GetSocketDigest for tokio::net::UnixStream {
263 fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
264 None
265 }
266 }
267
268 impl Peek for tokio::net::UnixStream {}
269}
270
271#[cfg(unix)]
272pub(crate) trait ConnFdReusable {
273 fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool;
274}
275
276#[cfg(windows)]
277pub(crate) trait ConnSockReusable {
278 fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool;
279}
280
281use l4::socket::SocketAddr;
282use log::{debug, error};
283#[cfg(unix)]
284use nix::sys::socket::{getpeername, SockaddrStorage, UnixAddr};
285#[cfg(unix)]
286use std::os::unix::prelude::AsRawFd;
287#[cfg(windows)]
288use std::os::windows::io::AsRawSocket;
289use std::{net::SocketAddr as InetSocketAddr, path::Path};
290
291use crate::protocols::tls::TlsRef;
292
293#[cfg(unix)]
294impl ConnFdReusable for SocketAddr {
295 fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
296 match self {
297 SocketAddr::Inet(addr) => addr.check_fd_match(fd),
298 SocketAddr::Unix(addr) => addr
299 .as_pathname()
300 .expect("non-pathname unix sockets not supported as peer")
301 .check_fd_match(fd),
302 }
303 }
304}
305
306#[cfg(windows)]
307impl ConnSockReusable for SocketAddr {
308 fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool {
309 match self {
310 SocketAddr::Inet(addr) => addr.check_sock_match(sock),
311 }
312 }
313}
314
315#[cfg(unix)]
316impl ConnFdReusable for Path {
317 fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
318 let fd = fd.as_raw_fd();
319 match getpeername::<UnixAddr>(fd) {
320 Ok(peer) => match UnixAddr::new(self) {
321 Ok(addr) => {
322 if addr == peer {
323 debug!("Unix FD to: {peer} is reusable");
324 true
325 } else {
326 error!("Crit: unix FD mismatch: fd: {fd:?}, peer: {peer}, addr: {addr}",);
327 false
328 }
329 }
330 Err(e) => {
331 error!("Bad addr: {self:?}, error: {e:?}");
332 false
333 }
334 },
335 Err(e) => {
336 error!("Idle unix connection is broken: {e:?}");
337 false
338 }
339 }
340 }
341}
342
343#[cfg(unix)]
344impl ConnFdReusable for InetSocketAddr {
345 fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
346 let fd = fd.as_raw_fd();
347 match getpeername::<SockaddrStorage>(fd) {
348 Ok(peer) => {
349 const ZERO: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
350 if self.ip() == ZERO {
351 return true;
356 }
357 let addr = SockaddrStorage::from(*self);
358 if addr == peer {
359 debug!("Inet FD to: {addr} is reusable");
360 true
361 } else {
362 error!("Crit: FD mismatch: fd: {fd:?}, addr: {addr}, peer: {peer}",);
363 false
364 }
365 }
366 Err(e) => {
367 debug!("Idle connection is broken: {e:?}");
368 false
369 }
370 }
371 }
372}
373
374#[cfg(windows)]
375impl ConnSockReusable for InetSocketAddr {
376 fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool {
377 let sock = sock.as_raw_socket();
378 match windows::peer_addr(sock) {
379 Ok(peer) => {
380 const ZERO: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
381 if self.ip() == ZERO {
382 return true;
387 }
388 if self == &peer {
389 debug!("Inet FD to: {self} is reusable");
390 true
391 } else {
392 error!("Crit: FD mismatch: fd: {sock:?}, addr: {self}, peer: {peer}",);
393 false
394 }
395 }
396 Err(e) => {
397 debug!("Idle connection is broken: {e:?}");
398 false
399 }
400 }
401 }
402}