pingora_core/upstreams/
peer.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Defines where to connect to and how to connect to a remote server
16
17use crate::connectors::{l4::BindTo, L4Connect};
18use crate::protocols::l4::socket::SocketAddr;
19use crate::protocols::tls::CaType;
20#[cfg(unix)]
21use crate::protocols::ConnFdReusable;
22use crate::protocols::TcpKeepalive;
23use crate::utils::tls::{get_organization_unit, CertKey};
24use ahash::AHasher;
25use derivative::Derivative;
26use pingora_error::{
27    ErrorType::{InternalError, SocketError},
28    OrErr, Result,
29};
30use std::collections::BTreeMap;
31use std::fmt::{Display, Formatter, Result as FmtResult};
32use std::hash::{Hash, Hasher};
33use std::net::{IpAddr, SocketAddr as InetSocketAddr, ToSocketAddrs as ToInetSocketAddrs};
34#[cfg(unix)]
35use std::os::unix::{net::SocketAddr as UnixSocketAddr, prelude::AsRawFd};
36#[cfg(windows)]
37use std::os::windows::io::AsRawSocket;
38use std::path::{Path, PathBuf};
39use std::sync::Arc;
40use std::time::Duration;
41use tokio::net::TcpSocket;
42
43pub use crate::protocols::tls::ALPN;
44
45/// The interface to trace the connection
46pub trait Tracing: Send + Sync + std::fmt::Debug {
47    /// This method is called when successfully connected to a remote server
48    fn on_connected(&self);
49    /// This method is called when the connection is disconnected.
50    fn on_disconnected(&self);
51    /// A way to clone itself
52    fn boxed_clone(&self) -> Box<dyn Tracing>;
53}
54
55/// An object-safe version of Tracing object that can use Clone
56#[derive(Debug)]
57pub struct Tracer(pub Box<dyn Tracing>);
58
59impl Clone for Tracer {
60    fn clone(&self) -> Self {
61        Tracer(self.0.boxed_clone())
62    }
63}
64
65/// [`Peer`] defines the interface to communicate with the [`crate::connectors`] regarding where to
66/// connect to and how to connect to it.
67pub trait Peer: Display + Clone {
68    /// The remote address to connect to
69    fn address(&self) -> &SocketAddr;
70    /// If TLS should be used;
71    fn tls(&self) -> bool;
72    /// The SNI to send, if TLS is used
73    fn sni(&self) -> &str;
74    /// To decide whether a [`Peer`] can use the connection established by another [`Peer`].
75    ///
76    /// The connections to two peers are considered reusable to each other if their reuse hashes are
77    /// the same
78    fn reuse_hash(&self) -> u64;
79    /// Get the proxy setting to connect to the remote server
80    fn get_proxy(&self) -> Option<&Proxy> {
81        None
82    }
83    /// Get the additional options to connect to the peer.
84    ///
85    /// See [`PeerOptions`] for more details
86    fn get_peer_options(&self) -> Option<&PeerOptions> {
87        None
88    }
89    /// Get the additional options for modification.
90    fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
91        None
92    }
93    /// Whether the TLS handshake should validate the cert of the server.
94    fn verify_cert(&self) -> bool {
95        match self.get_peer_options() {
96            Some(opt) => opt.verify_cert,
97            None => false,
98        }
99    }
100    /// Whether the TLS handshake should verify that the server cert matches the SNI.
101    fn verify_hostname(&self) -> bool {
102        match self.get_peer_options() {
103            Some(opt) => opt.verify_hostname,
104            None => false,
105        }
106    }
107    /// The alternative common name to use to verify the server cert.
108    ///
109    /// If the server cert doesn't match the SNI, this name will be used to
110    /// verify the cert.
111    fn alternative_cn(&self) -> Option<&String> {
112        match self.get_peer_options() {
113            Some(opt) => opt.alternative_cn.as_ref(),
114            None => None,
115        }
116    }
117    /// Information about the local source address this connection should be bound to.
118    fn bind_to(&self) -> Option<&BindTo> {
119        match self.get_peer_options() {
120            Some(opt) => opt.bind_to.as_ref(),
121            None => None,
122        }
123    }
124    /// How long connect() call should be wait before it returns a timeout error.
125    fn connection_timeout(&self) -> Option<Duration> {
126        match self.get_peer_options() {
127            Some(opt) => opt.connection_timeout,
128            None => None,
129        }
130    }
131    /// How long the overall connection establishment should take before a timeout error is returned.
132    fn total_connection_timeout(&self) -> Option<Duration> {
133        match self.get_peer_options() {
134            Some(opt) => opt.total_connection_timeout,
135            None => None,
136        }
137    }
138    /// If the connection can be reused, how long the connection should wait to be reused before it
139    /// shuts down.
140    fn idle_timeout(&self) -> Option<Duration> {
141        self.get_peer_options().and_then(|o| o.idle_timeout)
142    }
143
144    /// Get the ALPN preference.
145    fn get_alpn(&self) -> Option<&ALPN> {
146        self.get_peer_options().map(|opt| &opt.alpn)
147    }
148
149    /// Get the CA cert to use to validate the server cert.
150    ///
151    /// If not set, the default CAs will be used.
152    fn get_ca(&self) -> Option<&Arc<CaType>> {
153        match self.get_peer_options() {
154            Some(opt) => opt.ca.as_ref(),
155            None => None,
156        }
157    }
158
159    /// Get the client cert and key for mutual TLS if any
160    fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
161        None
162    }
163
164    /// The TCP keepalive setting that should be applied to this connection
165    fn tcp_keepalive(&self) -> Option<&TcpKeepalive> {
166        self.get_peer_options()
167            .and_then(|o| o.tcp_keepalive.as_ref())
168    }
169
170    /// The interval H2 pings to send to the server if any
171    fn h2_ping_interval(&self) -> Option<Duration> {
172        self.get_peer_options().and_then(|o| o.h2_ping_interval)
173    }
174
175    /// The size of the TCP receive buffer should be limited to. See SO_RCVBUF for more details.
176    fn tcp_recv_buf(&self) -> Option<usize> {
177        self.get_peer_options().and_then(|o| o.tcp_recv_buf)
178    }
179
180    /// The DSCP value that should be applied to the send side of this connection.
181    /// See the [RFC](https://datatracker.ietf.org/doc/html/rfc2474) for more details.
182    fn dscp(&self) -> Option<u8> {
183        self.get_peer_options().and_then(|o| o.dscp)
184    }
185
186    /// Whether to enable TCP fast open.
187    fn tcp_fast_open(&self) -> bool {
188        self.get_peer_options()
189            .map(|o| o.tcp_fast_open)
190            .unwrap_or_default()
191    }
192
193    #[cfg(unix)]
194    fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
195        self.address().check_fd_match(fd)
196    }
197
198    #[cfg(windows)]
199    fn matches_sock<V: AsRawSocket>(&self, sock: V) -> bool {
200        use crate::protocols::ConnSockReusable;
201        self.address().check_sock_match(sock)
202    }
203
204    fn get_tracer(&self) -> Option<Tracer> {
205        None
206    }
207
208    /// Returns a hook that should be run before an upstream TCP connection is connected.
209    ///
210    /// This hook can be used to set additional socket options.
211    fn upstream_tcp_sock_tweak_hook(
212        &self,
213    ) -> Option<&Arc<dyn Fn(&TcpSocket) -> Result<()> + Send + Sync + 'static>> {
214        self.get_peer_options()?
215            .upstream_tcp_sock_tweak_hook
216            .as_ref()
217    }
218}
219
220/// A simple TCP or TLS peer without many complicated settings.
221#[derive(Debug, Clone)]
222pub struct BasicPeer {
223    pub _address: SocketAddr,
224    pub sni: String,
225    pub options: PeerOptions,
226}
227
228impl BasicPeer {
229    /// Create a new [`BasicPeer`].
230    pub fn new(address: &str) -> Self {
231        let addr = SocketAddr::Inet(address.parse().unwrap()); // TODO: check error
232        Self::new_from_sockaddr(addr)
233    }
234
235    /// Create a new [`BasicPeer`] with the given path to a Unix domain socket.
236    #[cfg(unix)]
237    pub fn new_uds<P: AsRef<Path>>(path: P) -> Result<Self> {
238        let addr = SocketAddr::Unix(
239            UnixSocketAddr::from_pathname(path.as_ref())
240                .or_err(InternalError, "while creating BasicPeer")?,
241        );
242        Ok(Self::new_from_sockaddr(addr))
243    }
244
245    fn new_from_sockaddr(sockaddr: SocketAddr) -> Self {
246        BasicPeer {
247            _address: sockaddr,
248            sni: "".to_string(), // TODO: add support for SNI
249            options: PeerOptions::new(),
250        }
251    }
252}
253
254impl Display for BasicPeer {
255    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
256        write!(f, "{:?}", self)
257    }
258}
259
260impl Peer for BasicPeer {
261    fn address(&self) -> &SocketAddr {
262        &self._address
263    }
264
265    fn tls(&self) -> bool {
266        !self.sni.is_empty()
267    }
268
269    fn bind_to(&self) -> Option<&BindTo> {
270        None
271    }
272
273    fn sni(&self) -> &str {
274        &self.sni
275    }
276
277    // TODO: change connection pool to accept u64 instead of String
278    fn reuse_hash(&self) -> u64 {
279        let mut hasher = AHasher::default();
280        self._address.hash(&mut hasher);
281        hasher.finish()
282    }
283
284    fn get_peer_options(&self) -> Option<&PeerOptions> {
285        Some(&self.options)
286    }
287}
288
289/// Define whether to connect via http or https
290#[derive(Hash, Clone, Debug, PartialEq)]
291pub enum Scheme {
292    HTTP,
293    HTTPS,
294}
295
296impl Display for Scheme {
297    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
298        match self {
299            Scheme::HTTP => write!(f, "HTTP"),
300            Scheme::HTTPS => write!(f, "HTTPS"),
301        }
302    }
303}
304
305impl Scheme {
306    pub fn from_tls_bool(tls: bool) -> Self {
307        if tls {
308            Self::HTTPS
309        } else {
310            Self::HTTP
311        }
312    }
313}
314
315/// The preferences to connect to a remote server
316///
317/// See [`Peer`] for the meaning of the fields
318#[non_exhaustive]
319#[derive(Clone, Derivative)]
320#[derivative(Debug)]
321pub struct PeerOptions {
322    pub bind_to: Option<BindTo>,
323    pub connection_timeout: Option<Duration>,
324    pub total_connection_timeout: Option<Duration>,
325    pub read_timeout: Option<Duration>,
326    pub idle_timeout: Option<Duration>,
327    pub write_timeout: Option<Duration>,
328    pub verify_cert: bool,
329    pub verify_hostname: bool,
330    /* accept the cert if it's CN matches the SNI or this name */
331    pub alternative_cn: Option<String>,
332    pub alpn: ALPN,
333    pub ca: Option<Arc<CaType>>,
334    pub tcp_keepalive: Option<TcpKeepalive>,
335    pub tcp_recv_buf: Option<usize>,
336    pub dscp: Option<u8>,
337    pub h2_ping_interval: Option<Duration>,
338    // how many concurrent h2 stream are allowed in the same connection
339    pub max_h2_streams: usize,
340    pub extra_proxy_headers: BTreeMap<String, Vec<u8>>,
341    // The list of curve the tls connection should advertise
342    // if `None`, the default curves will be used
343    pub curves: Option<&'static str>,
344    // see ssl_use_second_key_share
345    pub second_keyshare: bool,
346    // whether to enable TCP fast open
347    pub tcp_fast_open: bool,
348    // use Arc because Clone is required but not allowed in trait object
349    pub tracer: Option<Tracer>,
350    // A custom L4 connector to use to establish new L4 connections
351    pub custom_l4: Option<Arc<dyn L4Connect + Send + Sync>>,
352    #[derivative(Debug = "ignore")]
353    pub upstream_tcp_sock_tweak_hook:
354        Option<Arc<dyn Fn(&TcpSocket) -> Result<()> + Send + Sync + 'static>>,
355}
356
357impl PeerOptions {
358    /// Create a new [`PeerOptions`]
359    pub fn new() -> Self {
360        PeerOptions {
361            bind_to: None,
362            connection_timeout: None,
363            total_connection_timeout: None,
364            read_timeout: None,
365            idle_timeout: None,
366            write_timeout: None,
367            verify_cert: true,
368            verify_hostname: true,
369            alternative_cn: None,
370            alpn: ALPN::H1,
371            ca: None,
372            tcp_keepalive: None,
373            tcp_recv_buf: None,
374            dscp: None,
375            h2_ping_interval: None,
376            max_h2_streams: 1,
377            extra_proxy_headers: BTreeMap::new(),
378            curves: None,
379            second_keyshare: true, // default true and noop when not using PQ curves
380            tcp_fast_open: false,
381            tracer: None,
382            custom_l4: None,
383            upstream_tcp_sock_tweak_hook: None,
384        }
385    }
386
387    /// Set the ALPN according to the `max` and `min` constrains.
388    pub fn set_http_version(&mut self, max: u8, min: u8) {
389        self.alpn = ALPN::new(max, min);
390    }
391}
392
393impl Display for PeerOptions {
394    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
395        if let Some(b) = self.bind_to.as_ref() {
396            write!(f, "bind_to: {:?},", b)?;
397        }
398        if let Some(t) = self.connection_timeout {
399            write!(f, "conn_timeout: {:?},", t)?;
400        }
401        if let Some(t) = self.total_connection_timeout {
402            write!(f, "total_conn_timeout: {:?},", t)?;
403        }
404        if self.verify_cert {
405            write!(f, "verify_cert: true,")?;
406        }
407        if self.verify_hostname {
408            write!(f, "verify_hostname: true,")?;
409        }
410        if let Some(cn) = &self.alternative_cn {
411            write!(f, "alt_cn: {},", cn)?;
412        }
413        write!(f, "alpn: {},", self.alpn)?;
414        if let Some(cas) = &self.ca {
415            for ca in cas.iter() {
416                write!(
417                    f,
418                    "CA: {}, expire: {},",
419                    get_organization_unit(ca).unwrap_or_default(),
420                    ca.not_after()
421                )?;
422            }
423        }
424        if let Some(tcp_keepalive) = &self.tcp_keepalive {
425            write!(f, "tcp_keepalive: {},", tcp_keepalive)?;
426        }
427        if let Some(h2_ping_interval) = self.h2_ping_interval {
428            write!(f, "h2_ping_interval: {:?},", h2_ping_interval)?;
429        }
430        Ok(())
431    }
432}
433
434/// A peer representing the remote HTTP server to connect to
435#[derive(Debug, Clone)]
436pub struct HttpPeer {
437    pub _address: SocketAddr,
438    pub scheme: Scheme,
439    pub sni: String,
440    pub proxy: Option<Proxy>,
441    pub client_cert_key: Option<Arc<CertKey>>,
442    /// a custom field to isolate connection reuse. Requests with different group keys
443    /// cannot share connections with each other.
444    pub group_key: u64,
445    pub options: PeerOptions,
446}
447
448impl HttpPeer {
449    // These methods are pretty ad-hoc
450    pub fn is_tls(&self) -> bool {
451        match self.scheme {
452            Scheme::HTTP => false,
453            Scheme::HTTPS => true,
454        }
455    }
456
457    fn new_from_sockaddr(address: SocketAddr, tls: bool, sni: String) -> Self {
458        HttpPeer {
459            _address: address,
460            scheme: Scheme::from_tls_bool(tls),
461            sni,
462            proxy: None,
463            client_cert_key: None,
464            group_key: 0,
465            options: PeerOptions::new(),
466        }
467    }
468
469    /// Create a new [`HttpPeer`] with the given socket address and TLS settings.
470    pub fn new<A: ToInetSocketAddrs>(address: A, tls: bool, sni: String) -> Self {
471        let mut addrs_iter = address.to_socket_addrs().unwrap(); //TODO: handle error
472        let addr = addrs_iter.next().unwrap();
473        Self::new_from_sockaddr(SocketAddr::Inet(addr), tls, sni)
474    }
475
476    /// Create a new [`HttpPeer`] with the given path to Unix domain socket and TLS settings.
477    #[cfg(unix)]
478    pub fn new_uds(path: &str, tls: bool, sni: String) -> Result<Self> {
479        let addr = SocketAddr::Unix(
480            UnixSocketAddr::from_pathname(Path::new(path)).or_err(SocketError, "invalid path")?,
481        );
482        Ok(Self::new_from_sockaddr(addr, tls, sni))
483    }
484
485    /// Create a new [`HttpPeer`] that uses a proxy to connect to the upstream IP and port
486    /// combination.
487    pub fn new_proxy(
488        next_hop: &str,
489        ip_addr: IpAddr,
490        port: u16,
491        tls: bool,
492        sni: &str,
493        headers: BTreeMap<String, Vec<u8>>,
494    ) -> Self {
495        HttpPeer {
496            _address: SocketAddr::Inet(InetSocketAddr::new(ip_addr, port)),
497            scheme: Scheme::from_tls_bool(tls),
498            sni: sni.to_string(),
499            proxy: Some(Proxy {
500                next_hop: PathBuf::from(next_hop).into(),
501                host: ip_addr.to_string(),
502                port,
503                headers,
504            }),
505            client_cert_key: None,
506            group_key: 0,
507            options: PeerOptions::new(),
508        }
509    }
510
511    fn peer_hash(&self) -> u64 {
512        let mut hasher = AHasher::default();
513        self.hash(&mut hasher);
514        hasher.finish()
515    }
516}
517
518impl Hash for HttpPeer {
519    fn hash<H: Hasher>(&self, state: &mut H) {
520        self._address.hash(state);
521        self.scheme.hash(state);
522        self.proxy.hash(state);
523        self.sni.hash(state);
524        // client cert serial
525        self.client_cert_key.hash(state);
526        // origin server cert verification
527        self.verify_cert().hash(state);
528        self.verify_hostname().hash(state);
529        self.alternative_cn().hash(state);
530        self.group_key.hash(state);
531    }
532}
533
534impl Display for HttpPeer {
535    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
536        write!(f, "addr: {}, scheme: {},", self._address, self.scheme)?;
537        if !self.sni.is_empty() {
538            write!(f, "sni: {},", self.sni)?;
539        }
540        if let Some(p) = self.proxy.as_ref() {
541            write!(f, "proxy: {p},")?;
542        }
543        if let Some(cert) = &self.client_cert_key {
544            write!(f, "client cert: {},", cert)?;
545        }
546        Ok(())
547    }
548}
549
550impl Peer for HttpPeer {
551    fn address(&self) -> &SocketAddr {
552        &self._address
553    }
554
555    fn tls(&self) -> bool {
556        self.is_tls()
557    }
558
559    fn sni(&self) -> &str {
560        &self.sni
561    }
562
563    // TODO: change connection pool to accept u64 instead of String
564    fn reuse_hash(&self) -> u64 {
565        self.peer_hash()
566    }
567
568    fn get_peer_options(&self) -> Option<&PeerOptions> {
569        Some(&self.options)
570    }
571
572    fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
573        Some(&mut self.options)
574    }
575
576    fn get_proxy(&self) -> Option<&Proxy> {
577        self.proxy.as_ref()
578    }
579
580    #[cfg(unix)]
581    fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
582        if let Some(proxy) = self.get_proxy() {
583            proxy.next_hop.check_fd_match(fd)
584        } else {
585            self.address().check_fd_match(fd)
586        }
587    }
588
589    #[cfg(windows)]
590    fn matches_sock<V: AsRawSocket>(&self, sock: V) -> bool {
591        use crate::protocols::ConnSockReusable;
592
593        if let Some(proxy) = self.get_proxy() {
594            panic!("windows do not support peers with proxy")
595        } else {
596            self.address().check_sock_match(sock)
597        }
598    }
599
600    fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
601        self.client_cert_key.as_ref()
602    }
603
604    fn get_tracer(&self) -> Option<Tracer> {
605        self.options.tracer.clone()
606    }
607}
608
609/// The proxy settings to connect to the remote server, CONNECT only for now
610#[derive(Debug, Hash, Clone)]
611pub struct Proxy {
612    pub next_hop: Box<Path>, // for now this will be the path to the UDS
613    pub host: String,        // the proxied host. Could be either IP addr or hostname.
614    pub port: u16,           // the port to proxy to
615    pub headers: BTreeMap<String, Vec<u8>>, // the additional headers to add to CONNECT
616}
617
618impl Display for Proxy {
619    fn fmt(&self, f: &mut Formatter) -> FmtResult {
620        write!(
621            f,
622            "next_hop: {}, host: {}, port: {}",
623            self.next_hop.display(),
624            self.host,
625            self.port
626        )
627    }
628}