Skip to main content

pingora_core/protocols/
mod.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Abstractions and implementations for protocols including TCP, TLS and HTTP
16
17mod digest;
18pub mod http;
19pub mod l4;
20pub mod raw_connect;
21pub mod tls;
22#[cfg(windows)]
23mod windows;
24
25pub use digest::{
26    Digest, GetProxyDigest, GetSocketDigest, GetTimingDigest, ProtoDigest, SocketDigest,
27    TimingDigest,
28};
29pub use l4::ext::TcpKeepalive;
30pub use tls::ALPN;
31
32use async_trait::async_trait;
33use std::fmt::Debug;
34use std::net::{IpAddr, Ipv4Addr};
35use std::sync::Arc;
36
37#[cfg(unix)]
38pub type UniqueIDType = i32;
39#[cfg(windows)]
40pub type UniqueIDType = usize;
41
42/// Define how a protocol should shutdown its connection.
43#[async_trait]
44pub trait Shutdown {
45    async fn shutdown(&mut self) -> ();
46}
47
48/// Define how a given session/connection identifies itself.
49pub trait UniqueID {
50    /// The ID returned should be unique among all existing connections of the same type.
51    /// But ID can be recycled after a connection is shutdown.
52    fn id(&self) -> UniqueIDType;
53}
54
55/// Interface to get TLS info
56pub trait Ssl {
57    /// Return the TLS info if the connection is over TLS
58    fn get_ssl(&self) -> Option<&TlsRef> {
59        None
60    }
61
62    /// Return the [`tls::SslDigest`] for logging
63    fn get_ssl_digest(&self) -> Option<Arc<tls::SslDigest>> {
64        None
65    }
66
67    /// Return selected ALPN if any
68    fn selected_alpn_proto(&self) -> Option<ALPN> {
69        None
70    }
71}
72
73/// The ability peek data before consuming it
74#[async_trait]
75pub trait Peek {
76    /// Peek data but not consuming it. This call should block until some data
77    /// is sent.
78    /// Return `false` if peeking is not supported/allowed.
79    async fn try_peek(&mut self, _buf: &mut [u8]) -> std::io::Result<bool> {
80        Ok(false)
81    }
82}
83
84use std::any::Any;
85use tokio::io::{AsyncRead, AsyncWrite};
86
87/// The abstraction of transport layer IO
88pub trait IO:
89    AsyncRead
90    + AsyncWrite
91    + Shutdown
92    + UniqueID
93    + Ssl
94    + GetTimingDigest
95    + GetProxyDigest
96    + GetSocketDigest
97    + Peek
98    + Unpin
99    + Debug
100    + Send
101    + Sync
102{
103    /// helper to cast as the reference of the concrete type
104    fn as_any(&self) -> &dyn Any;
105    /// helper to cast back of the concrete type
106    fn into_any(self: Box<Self>) -> Box<dyn Any>;
107}
108
109impl<
110        T: AsyncRead
111            + AsyncWrite
112            + Shutdown
113            + UniqueID
114            + Ssl
115            + GetTimingDigest
116            + GetProxyDigest
117            + GetSocketDigest
118            + Peek
119            + Unpin
120            + Debug
121            + Send
122            + Sync,
123    > IO for T
124where
125    T: 'static,
126{
127    fn as_any(&self) -> &dyn Any {
128        self
129    }
130    fn into_any(self: Box<Self>) -> Box<dyn Any> {
131        self
132    }
133}
134
135/// The type of any established transport layer connection
136pub type Stream = Box<dyn IO>;
137
138// Implement IO trait for 3rd party types, mostly for testing
139mod ext_io_impl {
140    use super::*;
141    use tokio_test::io::Mock;
142
143    #[async_trait]
144    impl Shutdown for Mock {
145        async fn shutdown(&mut self) -> () {}
146    }
147    impl UniqueID for Mock {
148        fn id(&self) -> UniqueIDType {
149            0
150        }
151    }
152    impl Ssl for Mock {}
153    impl GetTimingDigest for Mock {
154        fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
155            vec![]
156        }
157    }
158    impl GetProxyDigest for Mock {
159        fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
160            None
161        }
162    }
163    impl GetSocketDigest for Mock {
164        fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
165            None
166        }
167    }
168
169    impl Peek for Mock {}
170
171    use std::io::Cursor;
172
173    #[async_trait]
174    impl<T: Send> Shutdown for Cursor<T> {
175        async fn shutdown(&mut self) -> () {}
176    }
177    impl<T> UniqueID for Cursor<T> {
178        fn id(&self) -> UniqueIDType {
179            0
180        }
181    }
182    impl<T> Ssl for Cursor<T> {}
183    impl<T> GetTimingDigest for Cursor<T> {
184        fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
185            vec![]
186        }
187    }
188    impl<T> GetProxyDigest for Cursor<T> {
189        fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
190            None
191        }
192    }
193    impl<T> GetSocketDigest for Cursor<T> {
194        fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
195            None
196        }
197    }
198    impl<T> Peek for Cursor<T> {}
199
200    use tokio::io::DuplexStream;
201
202    #[async_trait]
203    impl Shutdown for DuplexStream {
204        async fn shutdown(&mut self) -> () {}
205    }
206    impl UniqueID for DuplexStream {
207        fn id(&self) -> UniqueIDType {
208            0
209        }
210    }
211    impl Ssl for DuplexStream {}
212    impl GetTimingDigest for DuplexStream {
213        fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
214            vec![]
215        }
216    }
217    impl GetProxyDigest for DuplexStream {
218        fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
219            None
220        }
221    }
222    impl GetSocketDigest for DuplexStream {
223        fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
224            None
225        }
226    }
227
228    impl Peek for DuplexStream {}
229}
230
231#[cfg(unix)]
232pub mod ext_test {
233    use std::sync::Arc;
234
235    use async_trait::async_trait;
236
237    use super::{
238        raw_connect, GetProxyDigest, GetSocketDigest, GetTimingDigest, Peek, Shutdown,
239        SocketDigest, Ssl, TimingDigest, UniqueID, UniqueIDType,
240    };
241
242    #[async_trait]
243    impl Shutdown for tokio::net::UnixStream {
244        async fn shutdown(&mut self) -> () {}
245    }
246    impl UniqueID for tokio::net::UnixStream {
247        fn id(&self) -> UniqueIDType {
248            0
249        }
250    }
251    impl Ssl for tokio::net::UnixStream {}
252    impl GetTimingDigest for tokio::net::UnixStream {
253        fn get_timing_digest(&self) -> Vec<Option<TimingDigest>> {
254            vec![]
255        }
256    }
257    impl GetProxyDigest for tokio::net::UnixStream {
258        fn get_proxy_digest(&self) -> Option<Arc<raw_connect::ProxyDigest>> {
259            None
260        }
261    }
262    impl GetSocketDigest for tokio::net::UnixStream {
263        fn get_socket_digest(&self) -> Option<Arc<SocketDigest>> {
264            None
265        }
266    }
267
268    impl Peek for tokio::net::UnixStream {}
269}
270
271#[cfg(unix)]
272pub(crate) trait ConnFdReusable {
273    fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool;
274}
275
276#[cfg(windows)]
277pub(crate) trait ConnSockReusable {
278    fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool;
279}
280
281use l4::socket::SocketAddr;
282use log::{debug, error};
283#[cfg(unix)]
284use nix::sys::socket::{getpeername, SockaddrStorage, UnixAddr};
285#[cfg(unix)]
286use std::os::unix::prelude::AsRawFd;
287#[cfg(windows)]
288use std::os::windows::io::AsRawSocket;
289use std::{net::SocketAddr as InetSocketAddr, path::Path};
290
291use crate::protocols::tls::TlsRef;
292
293#[cfg(unix)]
294impl ConnFdReusable for SocketAddr {
295    fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
296        match self {
297            SocketAddr::Inet(addr) => addr.check_fd_match(fd),
298            SocketAddr::Unix(addr) => addr
299                .as_pathname()
300                .expect("non-pathname unix sockets not supported as peer")
301                .check_fd_match(fd),
302        }
303    }
304}
305
306#[cfg(windows)]
307impl ConnSockReusable for SocketAddr {
308    fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool {
309        match self {
310            SocketAddr::Inet(addr) => addr.check_sock_match(sock),
311        }
312    }
313}
314
315#[cfg(unix)]
316impl ConnFdReusable for Path {
317    fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
318        let fd = fd.as_raw_fd();
319        match getpeername::<UnixAddr>(fd) {
320            Ok(peer) => match UnixAddr::new(self) {
321                Ok(addr) => {
322                    if addr == peer {
323                        debug!("Unix FD to: {peer} is reusable");
324                        true
325                    } else {
326                        error!("Crit: unix FD mismatch: fd: {fd:?}, peer: {peer}, addr: {addr}",);
327                        false
328                    }
329                }
330                Err(e) => {
331                    error!("Bad addr: {self:?}, error: {e:?}");
332                    false
333                }
334            },
335            Err(e) => {
336                error!("Idle unix connection is broken: {e:?}");
337                false
338            }
339        }
340    }
341}
342
343#[cfg(unix)]
344impl ConnFdReusable for InetSocketAddr {
345    fn check_fd_match<V: AsRawFd>(&self, fd: V) -> bool {
346        let fd = fd.as_raw_fd();
347        match getpeername::<SockaddrStorage>(fd) {
348            Ok(peer) => {
349                const ZERO: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
350                if self.ip() == ZERO {
351                    // https://www.rfc-editor.org/rfc/rfc1122.html#section-3.2.1.3
352                    // 0.0.0.0 should only be used as source IP not destination
353                    // However in some systems this destination IP is mapped to 127.0.0.1.
354                    // We just skip this check here to avoid false positive mismatch.
355                    return true;
356                }
357                let addr = SockaddrStorage::from(*self);
358                if addr == peer {
359                    debug!("Inet FD to: {addr} is reusable");
360                    true
361                } else {
362                    error!("Crit: FD mismatch: fd: {fd:?}, addr: {addr}, peer: {peer}",);
363                    false
364                }
365            }
366            Err(e) => {
367                debug!("Idle connection is broken: {e:?}");
368                false
369            }
370        }
371    }
372}
373
374#[cfg(windows)]
375impl ConnSockReusable for InetSocketAddr {
376    fn check_sock_match<V: AsRawSocket>(&self, sock: V) -> bool {
377        let sock = sock.as_raw_socket();
378        match windows::peer_addr(sock) {
379            Ok(peer) => {
380                const ZERO: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
381                if self.ip() == ZERO {
382                    // https://www.rfc-editor.org/rfc/rfc1122.html#section-3.2.1.3
383                    // 0.0.0.0 should only be used as source IP not destination
384                    // However in some systems this destination IP is mapped to 127.0.0.1.
385                    // We just skip this check here to avoid false positive mismatch.
386                    return true;
387                }
388                if self == &peer {
389                    debug!("Inet FD to: {self} is reusable");
390                    true
391                } else {
392                    error!("Crit: FD mismatch: fd: {sock:?}, addr: {self}, peer: {peer}",);
393                    false
394                }
395            }
396            Err(e) => {
397                debug!("Idle connection is broken: {e:?}");
398                false
399            }
400        }
401    }
402}