pingora_core/protocols/
mod.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Abstractions and implementations for protocols including TCP, TLS and HTTP
16
17mod digest;
18pub mod http;
19pub mod l4;
20pub mod raw_connect;
21pub mod tls;
22#[cfg(windows)]
23mod windows;
24
25pub use digest::{
26    Digest, GetProxyDigest, GetSocketDigest, GetTimingDigest, ProtoDigest, SocketDigest,
27    TimingDigest,
28};
29pub use l4::ext::TcpKeepalive;
30pub use tls::ALPN;
31
32use async_trait::async_trait;
33use std::fmt::Debug;
34use std::net::{IpAddr, Ipv4Addr};
35use std::sync::Arc;
36
37#[cfg(unix)]
38pub type UniqueIDType = i32;
39#[cfg(windows)]
40pub type UniqueIDType = usize;
41
42/// Define how a protocol should shutdown its connection.
43#[async_trait]
44pub trait Shutdown {
45    async fn shutdown(&mut self) -> ();
46}
47
48/// Define how a given session/connection identifies itself.
49pub trait UniqueID {
50    /// The ID returned should be unique among all existing connections of the same type.
51    /// But ID can be recycled after a connection is shutdown.
52    fn id(&self) -> UniqueIDType;
53}
54
55/// Interface to get TLS info
56pub trait Ssl {
57    /// Return the TLS info if the connection is over TLS
58    fn get_ssl(&self) -> Option<&TlsRef> {
59        None
60    }
61
62    /// Return the [`tls::SslDigest`] for logging
63    fn get_ssl_digest(&self) -> Option<Arc<tls::SslDigest>> {
64        None
65    }
66
67    /// Return selected ALPN if any
68    fn selected_alpn_proto(&self) -> Option<ALPN> {
69        None
70    }
71}
72
73/// The ability peek data before consuming it
74#[async_trait]
75pub trait Peek {
76    /// Peek data but not consuming it. This call should block until some data
77    /// is sent.
78    /// Return `false` if peeking is not supported/allowed.
79    async fn try_peek(&mut self, _buf: &mut [u8]) -> std::io::Result<bool> {
80        Ok(false)
81    }
82}
83
84use std::any::Any;
85use tokio::io::{AsyncRead, AsyncWrite};
86
87/// The abstraction of transport layer IO
88pub trait IO:
89    AsyncRead
90    + AsyncWrite
91    + Shutdown
92    + UniqueID
93    + Ssl
94    + GetTimingDigest
95    + GetProxyDigest
96    + GetSocketDigest
97    + Peek
98    + Unpin
99    + Debug
100    + Send
101    + Sync
102{
103    /// helper to cast as the reference of the concrete type
104    fn as_any(&self) -> &dyn Any;
105    /// helper to cast back of the concrete type
106    fn into_any(self: Box<Self>) -> Box<dyn Any>;
107}
108
109impl<
110        T: AsyncRead
111            + AsyncWrite
112            + Shutdown
113            + UniqueID
114            + Ssl
115            + GetTimingDigest
116            + GetProxyDigest
117            + GetSocketDigest
118            + Peek
119            + Unpin
120            + Debug
121            + Send
122            + Sync,
123    > IO for T
124where
125    T: 'static,
126{
127    fn as_any(&self) -> &dyn Any {
128        self
129    }
130    fn into_any(self: Box<Self>) -> Box<dyn Any> {
131        self
132    }
133}
134
135/// The type of any established transport layer connection
136pub type Stream = Box<dyn IO>;
137
138// Implement IO trait for 3rd party types, mostly for testing
139mod ext_io_impl {
140    use super::*;
141    use tokio_test::io::Mock;
142
143    #[async_trait]
144    impl Shutdown for Mock {
145        async fn shutdown(&mut self) -> () {}
146    }
147    impl UniqueID for Mock {
148        fn id(&self) -> UniqueIDType {
149            0
150        }
151    }
152    impl Ssl for Mock {}
153    impl GetTimingDigest for Mock {
154        fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
155            vec![]
156        }
157    }
158    impl GetProxyDigest for Mock {
159        fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
160            None
161        }
162    }
163    impl GetSocketDigest for Mock {
164        fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
165            None
166        }
167    }
168
169    impl Peek for Mock {}
170
171    use std::io::Cursor;
172
173    #[async_trait]
174    impl<T: Send> Shutdown for Cursor<T> {
175        async fn shutdown(&mut self) -> () {}
176    }
177    impl<T> UniqueID for Cursor<T> {
178        fn id(&self) -> UniqueIDType {
179            0
180        }
181    }
182    impl<T> Ssl for Cursor<T> {}
183    impl<T> GetTimingDigest for Cursor<T> {
184        fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
185            vec![]
186        }
187    }
188    impl<T> GetProxyDigest for Cursor<T> {
189        fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
190            None
191        }
192    }
193    impl<T> GetSocketDigest for Cursor<T> {
194        fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
195            None
196        }
197    }
198    impl<T> Peek for Cursor<T> {}
199
200    use tokio::io::DuplexStream;
201
202    #[async_trait]
203    impl Shutdown for DuplexStream {
204        async fn shutdown(&mut self) -> () {}
205    }
206    impl UniqueID for DuplexStream {
207        fn id(&self) -> UniqueIDType {
208            0
209        }
210    }
211    impl Ssl for DuplexStream {}
212    impl GetTimingDigest for DuplexStream {
213        fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
214            vec![]
215        }
216    }
217    impl GetProxyDigest for DuplexStream {
218        fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
219            None
220        }
221    }
222    impl GetSocketDigest for DuplexStream {
223        fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
224            None
225        }
226    }
227
228    impl Peek for DuplexStream {}
229}
230
231#[cfg(unix)]
232pub(crate) trait ConnFdReusable {
233    fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool;
234}
235
236#[cfg(windows)]
237pub(crate) trait ConnSockReusable {
238    fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool;
239}
240
241use l4::socket::SocketAddr;
242use log::{debug, error};
243#[cfg(unix)]
244use nix::sys::socket::{getpeername, SockaddrStorage, UnixAddr};
245#[cfg(unix)]
246use std::os::unix::prelude::AsRawFd;
247#[cfg(windows)]
248use std::os::windows::io::AsRawSocket;
249use std::{net::SocketAddr as InetSocketAddr, path::Path};
250
251use crate::protocols::tls::TlsRef;
252
253#[cfg(unix)]
254impl ConnFdReusable for SocketAddr {
255    fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
256        match self {
257            SocketAddr::Inet(addr) => addr.check_fd_match(fd),
258            SocketAddr::Unix(addr) => addr
259                .as_pathname()
260                .expect("non-pathname unix sockets not supported as peer")
261                .check_fd_match(fd),
262        }
263    }
264}
265
266#[cfg(windows)]
267impl ConnSockReusable for SocketAddr {
268    fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool {
269        match self {
270            SocketAddr::Inet(addr) => addr.check_sock_match(sock),
271        }
272    }
273}
274
275#[cfg(unix)]
276impl ConnFdReusable for Path {
277    fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
278        let fd = fd.as_raw_fd();
279        match getpeername::<UnixAddr>(fd) {
280            Ok(peer) => match UnixAddr::new(self) {
281                Ok(addr) => {
282                    if addr == peer {
283                        debug!("Unix FD to: {peer} is reusable");
284                        true
285                    } else {
286                        error!("Crit: unix FD mismatch: fd: {fd:?}, peer: {peer}, addr: {addr}",);
287                        false
288                    }
289                }
290                Err(e) => {
291                    error!("Bad addr: {self:?}, error: {e:?}");
292                    false
293                }
294            },
295            Err(e) => {
296                error!("Idle unix connection is broken: {e:?}");
297                false
298            }
299        }
300    }
301}
302
303#[cfg(unix)]
304impl ConnFdReusable for InetSocketAddr {
305    fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
306        let fd = fd.as_raw_fd();
307        match getpeername::<SockaddrStorage>(fd) {
308            Ok(peer) => {
309                const ZERO: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
310                if self.ip() == ZERO {
311                    // https://www.rfc-editor.org/rfc/rfc1122.html#section-3.2.1.3
312                    // 0.0.0.0 should only be used as source IP not destination
313                    // However in some systems this destination IP is mapped to 127.0.0.1.
314                    // We just skip this check here to avoid false positive mismatch.
315                    return true;
316                }
317                let addr = SockaddrStorage::from(*self);
318                if addr == peer {
319                    debug!("Inet FD to: {addr} is reusable");
320                    true
321                } else {
322                    error!("Crit: FD mismatch: fd: {fd:?}, addr: {addr}, peer: {peer}",);
323                    false
324                }
325            }
326            Err(e) => {
327                debug!("Idle connection is broken: {e:?}");
328                false
329            }
330        }
331    }
332}
333
334#[cfg(windows)]
335impl ConnSockReusable for InetSocketAddr {
336    fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool {
337        let sock = sock.as_raw_socket();
338        match windows::peer_addr(sock) {
339            Ok(peer) => {
340                const ZERO: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
341                if self.ip() == ZERO {
342                    // https://www.rfc-editor.org/rfc/rfc1122.html#section-3.2.1.3
343                    // 0.0.0.0 should only be used as source IP not destination
344                    // However in some systems this destination IP is mapped to 127.0.0.1.
345                    // We just skip this check here to avoid false positive mismatch.
346                    return true;
347                }
348                if self == &peer {
349                    debug!("Inet FD to: {self} is reusable");
350                    true
351                } else {
352                    error!("Crit: FD mismatch: fd: {sock:?}, addr: {self}, peer: {peer}",);
353                    false
354                }
355            }
356            Err(e) => {
357                debug!("Idle connection is broken: {e:?}");
358                false
359            }
360        }
361    }
362}