Skip to main content

pingora_core/connectors/
mod.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Connecting to servers
16
17pub mod http;
18pub mod l4;
19mod offload;
20
21#[cfg(feature = "any_tls")]
22mod tls;
23
24#[cfg(not(feature = "any_tls"))]
25use crate::tls::connectors as tls;
26
27use crate::protocols::Stream;
28use crate::server::configuration::ServerConf;
29use crate::upstreams::peer::{Peer, ALPN};
30
31pub use l4::Connect as L4Connect;
32use l4::{connect as l4_connect, BindTo};
33use log::{debug, error, warn};
34use offload::OffloadRuntime;
35use parking_lot::RwLock;
36use pingora_error::{Error, ErrorType::*, OrErr, Result};
37use pingora_pool::{ConnectionMeta, ConnectionPool};
38use std::collections::HashMap;
39use std::net::SocketAddr;
40use std::sync::Arc;
41use tls::TlsConnector;
42use tokio::sync::Mutex;
43
44/// The options to configure a [TransportConnector]
45#[derive(Clone)]
46pub struct ConnectorOptions {
47    /// Path to the CA file used to validate server certs.
48    ///
49    /// If `None`, the CA in the [default](https://www.openssl.org/docs/manmaster/man3/SSL_CTX_set_default_verify_paths.html)
50    /// locations will be loaded
51    pub ca_file: Option<String>,
52    /// The maximum number of unique s2n configs to cache. Creating a new s2n config is an
53    /// expensive operation, so we cache and re-use config objects with identical configurations.
54    /// Defaults to a cache size of 10. A value of 0 disables the cache.
55    ///
56    /// WARNING: Disabling the s2n config cache can result in poor performance
57    #[cfg(feature = "s2n")]
58    pub s2n_config_cache_size: Option<usize>,
59    /// The default client cert and key to use for mTLS
60    ///
61    /// Each individual connection can use their own cert key to override this.
62    pub cert_key_file: Option<(String, String)>,
63    /// When enabled allows TLS keys to be written to a file specified by the SSLKEYLOG
64    /// env variable. This can be used by tools like Wireshark to decrypt traffic
65    /// for debugging purposes.
66    pub debug_ssl_keylog: bool,
67    /// How many connections to keepalive
68    pub keepalive_pool_size: usize,
69    /// Optionally offload the connection establishment to dedicated thread pools
70    ///
71    /// TCP and TLS connection establishment can be CPU intensive. Sometimes such tasks can slow
72    /// down the entire service, which causes timeouts which leads to more connections which
73    /// snowballs the issue. Use this option to isolate these CPU intensive tasks from impacting
74    /// other traffic.
75    ///
76    /// Syntax: (#pools, #thread in each pool)
77    pub offload_threadpool: Option<(usize, usize)>,
78    /// Bind to any of the given source IPv6 addresses
79    pub bind_to_v4: Vec<SocketAddr>,
80    /// Bind to any of the given source IPv4 addresses
81    pub bind_to_v6: Vec<SocketAddr>,
82}
83
84impl ConnectorOptions {
85    /// Derive the [ConnectorOptions] from a [ServerConf]
86    pub fn from_server_conf(server_conf: &ServerConf) -> Self {
87        // if both pools and threads are Some(>0)
88        let offload_threadpool = server_conf
89            .upstream_connect_offload_threadpools
90            .zip(server_conf.upstream_connect_offload_thread_per_pool)
91            .filter(|(pools, threads)| *pools > 0 && *threads > 0);
92
93        // create SocketAddrs with port 0 for src addr bind
94
95        let bind_to_v4 = server_conf
96            .client_bind_to_ipv4
97            .iter()
98            .map(|v4| {
99                let ip = v4.parse().unwrap();
100                SocketAddr::new(ip, 0)
101            })
102            .collect();
103
104        let bind_to_v6 = server_conf
105            .client_bind_to_ipv6
106            .iter()
107            .map(|v6| {
108                let ip = v6.parse().unwrap();
109                SocketAddr::new(ip, 0)
110            })
111            .collect();
112        ConnectorOptions {
113            ca_file: server_conf.ca_file.clone(),
114            cert_key_file: None, // TODO: use it
115            #[cfg(feature = "s2n")]
116            s2n_config_cache_size: server_conf.s2n_config_cache_size,
117            debug_ssl_keylog: server_conf.upstream_debug_ssl_keylog,
118            keepalive_pool_size: server_conf.upstream_keepalive_pool_size,
119            offload_threadpool,
120            bind_to_v4,
121            bind_to_v6,
122        }
123    }
124
125    /// Create a new [ConnectorOptions] with the given keepalive pool size
126    pub fn new(keepalive_pool_size: usize) -> Self {
127        ConnectorOptions {
128            ca_file: None,
129            #[cfg(feature = "s2n")]
130            s2n_config_cache_size: None,
131            cert_key_file: None,
132            debug_ssl_keylog: false,
133            keepalive_pool_size,
134            offload_threadpool: None,
135            bind_to_v4: vec![],
136            bind_to_v6: vec![],
137        }
138    }
139}
140
141/// [TransportConnector] provides APIs to connect to servers via TCP or TLS with connection reuse
142pub struct TransportConnector {
143    tls_ctx: tls::Connector,
144    connection_pool: Arc<ConnectionPool<Arc<Mutex<Stream>>>>,
145    offload: Option<OffloadRuntime>,
146    bind_to_v4: Vec<SocketAddr>,
147    bind_to_v6: Vec<SocketAddr>,
148    preferred_http_version: PreferredHttpVersion,
149}
150
151const DEFAULT_POOL_SIZE: usize = 128;
152
153impl TransportConnector {
154    /// Create a new [TransportConnector] with the given [ConnectorOptions]
155    pub fn new(mut options: Option<ConnectorOptions>) -> Self {
156        let pool_size = options
157            .as_ref()
158            .map_or(DEFAULT_POOL_SIZE, |c| c.keepalive_pool_size);
159        // Take the offloading setting there because this layer has implement offloading,
160        // so no need for stacks at lower layer to offload again.
161        let offload = options.as_mut().and_then(|o| o.offload_threadpool.take());
162        let bind_to_v4 = options
163            .as_ref()
164            .map_or_else(Vec::new, |o| o.bind_to_v4.clone());
165        let bind_to_v6 = options
166            .as_ref()
167            .map_or_else(Vec::new, |o| o.bind_to_v6.clone());
168        TransportConnector {
169            tls_ctx: tls::Connector::new(options),
170            connection_pool: Arc::new(ConnectionPool::new(pool_size)),
171            offload: offload.map(|v| OffloadRuntime::new(v.0, v.1)),
172            bind_to_v4,
173            bind_to_v6,
174            preferred_http_version: PreferredHttpVersion::new(),
175        }
176    }
177
178    /// Connect to the given server [Peer]
179    ///
180    /// No connection is reused.
181    pub async fn new_stream<P: Peer + Send + Sync + 'static>(&self, peer: &P) -> Result<Stream> {
182        let rt = self
183            .offload
184            .as_ref()
185            .map(|o| o.get_runtime(peer.reuse_hash()));
186        let bind_to = l4::bind_to_random(peer, &self.bind_to_v4, &self.bind_to_v6);
187        let alpn_override = self.preferred_http_version.get(peer);
188        let stream = if let Some(rt) = rt {
189            let peer = peer.clone();
190            let tls_ctx = self.tls_ctx.clone();
191            rt.spawn(async move { do_connect(&peer, bind_to, alpn_override, &tls_ctx.ctx).await })
192                .await
193                .or_err(InternalError, "offload runtime failure")??
194        } else {
195            do_connect(peer, bind_to, alpn_override, &self.tls_ctx.ctx).await?
196        };
197
198        Ok(stream)
199    }
200
201    /// Try to find a reusable connection to the given server [Peer]
202    pub async fn reused_stream<P: Peer + Send + Sync>(&self, peer: &P) -> Option<Stream> {
203        match self.connection_pool.get(&peer.reuse_hash()) {
204            Some(s) => {
205                debug!("find reusable stream, trying to acquire it");
206                {
207                    let _ = s.lock().await;
208                } // wait for the idle poll to release it
209                match Arc::try_unwrap(s) {
210                    Ok(l) => {
211                        let mut stream = l.into_inner();
212                        // test_reusable_stream: we assume server would never actively send data
213                        // first on an idle stream.
214                        #[cfg(unix)]
215                        if peer.matches_fd(stream.id()) && test_reusable_stream(&mut stream) {
216                            Some(stream)
217                        } else {
218                            None
219                        }
220                        #[cfg(windows)]
221                        {
222                            use std::os::windows::io::{AsRawSocket, RawSocket};
223                            struct WrappedRawSocket(RawSocket);
224                            impl AsRawSocket for WrappedRawSocket {
225                                fn as_raw_socket(&self) -> RawSocket {
226                                    self.0
227                                }
228                            }
229                            if peer.matches_sock(WrappedRawSocket(stream.id() as RawSocket))
230                                && test_reusable_stream(&mut stream)
231                            {
232                                Some(stream)
233                            } else {
234                                None
235                            }
236                        }
237                    }
238                    Err(_) => {
239                        error!("failed to acquire reusable stream");
240                        None
241                    }
242                }
243            }
244            None => {
245                debug!("No reusable connection found for {peer}");
246                None
247            }
248        }
249    }
250
251    /// Return the [Stream] to the [TransportConnector] for connection reuse.
252    ///
253    /// Not all TCP/TLS connections can be reused. It is the caller's responsibility to make sure
254    /// that protocol over the [Stream] supports connection reuse and the [Stream] itself is ready
255    /// to be reused.
256    ///
257    /// If a [Stream] is dropped instead of being returned via this function. it will be closed.
258    pub fn release_stream(
259        &self,
260        mut stream: Stream,
261        key: u64, // usually peer.reuse_hash()
262        idle_timeout: Option<std::time::Duration>,
263    ) {
264        if !test_reusable_stream(&mut stream) {
265            return;
266        }
267        let id = stream.id();
268        let meta = ConnectionMeta::new(key, id);
269        debug!("Try to keepalive client session");
270        let stream = Arc::new(Mutex::new(stream));
271        let locked_stream = stream.clone().try_lock_owned().unwrap(); // safe as we just created it
272        let (notify_close, watch_use) = self.connection_pool.put(&meta, stream);
273        let pool = self.connection_pool.clone(); //clone the arc
274        let rt = pingora_runtime::current_handle();
275        rt.spawn(async move {
276            pool.idle_poll(locked_stream, &meta, idle_timeout, notify_close, watch_use)
277                .await;
278        });
279    }
280
281    /// Get a stream to the given server [Peer]
282    ///
283    /// This function will try to find a reusable [Stream] first. If there is none, a new connection
284    /// will be made to the server.
285    ///
286    /// The returned boolean will indicate whether the stream is reused.
287    pub async fn get_stream<P: Peer + Send + Sync + 'static>(
288        &self,
289        peer: &P,
290    ) -> Result<(Stream, bool)> {
291        let reused_stream = self.reused_stream(peer).await;
292        if let Some(s) = reused_stream {
293            Ok((s, true))
294        } else {
295            let s = self.new_stream(peer).await?;
296            Ok((s, false))
297        }
298    }
299
300    /// Tell the connector to always send h1 for ALPN for the given peer in the future.
301    pub fn prefer_h1(&self, peer: &impl Peer) {
302        self.preferred_http_version.add(peer, 1);
303    }
304}
305
306// Perform the actual L4 and tls connection steps while respecting the peer's
307// connection timeout if there is one
308async fn do_connect<P: Peer + Send + Sync>(
309    peer: &P,
310    bind_to: Option<BindTo>,
311    alpn_override: Option<ALPN>,
312    tls_ctx: &TlsConnector,
313) -> Result<Stream> {
314    // Create the future that does the connections, but don't evaluate it until
315    // we decide if we need a timeout or not
316    let connect_future = do_connect_inner(peer, bind_to, alpn_override, tls_ctx);
317
318    match peer.total_connection_timeout() {
319        Some(t) => match pingora_timeout::timeout(t, connect_future).await {
320            Ok(res) => res,
321            Err(_) => Error::e_explain(
322                ConnectTimedout,
323                format!("connecting to server {peer}, total-connection timeout {t:?}"),
324            ),
325        },
326        None => connect_future.await,
327    }
328}
329
330// Perform the actual L4 and tls connection steps with no timeout
331async fn do_connect_inner<P: Peer + Send + Sync>(
332    peer: &P,
333    bind_to: Option<BindTo>,
334    alpn_override: Option<ALPN>,
335    tls_ctx: &TlsConnector,
336) -> Result<Stream> {
337    let stream = l4_connect(peer, bind_to).await?;
338    if peer.tls() {
339        let tls_stream = tls::connect(stream, peer, alpn_override, tls_ctx).await?;
340        Ok(Box::new(tls_stream))
341    } else {
342        Ok(Box::new(stream))
343    }
344}
345
346struct PreferredHttpVersion {
347    // TODO: shard to avoid the global lock
348    versions: RwLock<HashMap<u64, u8>>, // <hash of peer, version>
349}
350
351// TODO: limit the size of this
352
353impl PreferredHttpVersion {
354    pub fn new() -> Self {
355        PreferredHttpVersion {
356            versions: RwLock::default(),
357        }
358    }
359
360    pub fn add(&self, peer: &impl Peer, version: u8) {
361        let key = peer.reuse_hash();
362        let mut v = self.versions.write();
363        v.insert(key, version);
364    }
365
366    pub fn get(&self, peer: &impl Peer) -> Option<ALPN> {
367        let key = peer.reuse_hash();
368        let v = self.versions.read();
369        v.get(&key)
370            .copied()
371            .map(|v| if v == 1 { ALPN::H1 } else { ALPN::H2H1 })
372    }
373}
374
375use futures::future::FutureExt;
376use tokio::io::AsyncReadExt;
377
378/// Test whether a stream is already closed or not reusable (server sent unexpected data)
379fn test_reusable_stream(stream: &mut Stream) -> bool {
380    let mut buf = [0; 1];
381    // tokio::task::unconstrained because now_or_never may yield None when the future is ready
382    let result = tokio::task::unconstrained(stream.read(&mut buf[..])).now_or_never();
383    if let Some(data_result) = result {
384        match data_result {
385            Ok(n) => {
386                if n == 0 {
387                    debug!("Idle connection is closed");
388                } else {
389                    warn!("Unexpected data read in idle connection");
390                }
391            }
392            Err(e) => {
393                debug!("Idle connection is broken: {e:?}");
394            }
395        }
396        false
397    } else {
398        true
399    }
400}
401
402/// Test utilities for creating mock acceptors.
403#[cfg(all(test, unix))]
404pub(crate) mod test_utils {
405    use tokio::io::AsyncWriteExt;
406    use tokio::net::UnixListener;
407
408    /// Generates a unique socket path for testing to avoid conflicts when running in parallel
409    pub fn unique_uds_path(test_name: &str) -> String {
410        format!(
411            "/tmp/test_{test_name}_{:?}_{}.sock",
412            std::thread::current().id(),
413            std::process::id()
414        )
415    }
416
417    /// A mock UDS server that accepts one connection, sends data, and waits for shutdown signal
418    ///
419    /// Returns: (ready_rx, shutdown_tx, server_handle)
420    /// - ready_rx: Wait on this to know when server is ready to accept connections
421    /// - shutdown_tx: Send on this to tell server to shut down
422    /// - server_handle: Join handle for the server task
423    pub fn spawn_mock_uds_server(
424        socket_path: String,
425        response: &'static [u8],
426    ) -> (
427        tokio::sync::oneshot::Receiver<()>,
428        tokio::sync::oneshot::Sender<()>,
429        tokio::task::JoinHandle<()>,
430    ) {
431        let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
432        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
433
434        let server_handle = tokio::spawn(async move {
435            let _ = std::fs::remove_file(&socket_path);
436            let listener = UnixListener::bind(&socket_path).unwrap();
437            // Signal that the server is ready to accept connections
438            let _ = ready_tx.send(());
439
440            if let Ok((mut stream, _addr)) = listener.accept().await {
441                let _ = stream.write_all(response).await;
442                // Keep the connection open until the test tells us to shutdown
443                let _ = shutdown_rx.await;
444            }
445            let _ = std::fs::remove_file(&socket_path);
446        });
447
448        (ready_rx, shutdown_tx, server_handle)
449    }
450
451    /// A mock UDS server that immediately closes connections (for testing error handling)
452    ///
453    /// Returns: (ready_rx, shutdown_tx, server_handle)
454    pub fn spawn_mock_uds_server_close_immediate(
455        socket_path: String,
456    ) -> (
457        tokio::sync::oneshot::Receiver<()>,
458        tokio::sync::oneshot::Sender<()>,
459        tokio::task::JoinHandle<()>,
460    ) {
461        let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
462        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
463
464        let server_handle = tokio::spawn(async move {
465            let _ = std::fs::remove_file(&socket_path);
466            let listener = UnixListener::bind(&socket_path).unwrap();
467            // Signal that the server is ready to accept connections
468            let _ = ready_tx.send(());
469
470            if let Ok((mut stream, _addr)) = listener.accept().await {
471                let _ = stream.shutdown().await;
472                // Wait for shutdown signal before cleaning up
473                let _ = shutdown_rx.await;
474            }
475            let _ = std::fs::remove_file(&socket_path);
476        });
477
478        (ready_rx, shutdown_tx, server_handle)
479    }
480}
481
482#[cfg(test)]
483#[cfg(feature = "any_tls")]
484mod tests {
485    use pingora_error::ErrorType;
486    use tls::Connector;
487
488    use super::*;
489    use crate::upstreams::peer::BasicPeer;
490
491    // 192.0.2.1 is effectively a black hole
492    const BLACK_HOLE: &str = "192.0.2.1:79";
493
494    #[tokio::test]
495    async fn test_connect() {
496        let connector = TransportConnector::new(None);
497        let peer = BasicPeer::new("1.1.1.1:80");
498        // make a new connection to 1.1.1.1
499        let stream = connector.new_stream(&peer).await.unwrap();
500        connector.release_stream(stream, peer.reuse_hash(), None);
501
502        let (_, reused) = connector.get_stream(&peer).await.unwrap();
503        assert!(reused);
504    }
505
506    #[tokio::test]
507    async fn test_connect_tls() {
508        let connector = TransportConnector::new(None);
509        let mut peer = BasicPeer::new("1.1.1.1:443");
510        // BasicPeer will use tls when SNI is set
511        peer.sni = "one.one.one.one".to_string();
512        // make a new connection to https://1.1.1.1
513        let stream = connector.new_stream(&peer).await.unwrap();
514        connector.release_stream(stream, peer.reuse_hash(), None);
515
516        let (_, reused) = connector.get_stream(&peer).await.unwrap();
517        assert!(reused);
518    }
519
520    #[tokio::test(flavor = "multi_thread")]
521    #[cfg(unix)]
522    async fn test_connect_uds() {
523        let socket_path = test_utils::unique_uds_path("transport_connector");
524        let (ready_rx, shutdown_tx, server_handle) =
525            test_utils::spawn_mock_uds_server(socket_path.clone(), b"it works!");
526
527        // Wait for the server to be ready before connecting
528        ready_rx.await.unwrap();
529
530        // create a new service at /tmp
531        let connector = TransportConnector::new(None);
532        let peer = BasicPeer::new_uds(&socket_path).unwrap();
533        // make a new connection to mock uds
534        let mut stream = connector.new_stream(&peer).await.unwrap();
535        let mut buf = [0; 9];
536        let _ = stream.read(&mut buf).await.unwrap();
537        assert_eq!(&buf, b"it works!");
538
539        // Test connection reuse by releasing and getting the stream back
540        connector.release_stream(stream, peer.reuse_hash(), None);
541        let (stream, reused) = connector.get_stream(&peer).await.unwrap();
542        assert!(reused);
543
544        // Clean up: drop the stream, tell server to shutdown, and wait for it
545        drop(stream);
546        let _ = shutdown_tx.send(());
547        server_handle.await.unwrap();
548    }
549
550    async fn do_test_conn_timeout(conf: Option<ConnectorOptions>) {
551        let connector = TransportConnector::new(conf);
552        let mut peer = BasicPeer::new(BLACK_HOLE);
553        peer.options.connection_timeout = Some(std::time::Duration::from_millis(1));
554        let stream = connector.new_stream(&peer).await;
555        match stream {
556            Ok(_) => panic!("should throw an error"),
557            Err(e) => assert_eq!(e.etype(), &ConnectTimedout),
558        }
559    }
560
561    #[tokio::test]
562    async fn test_conn_timeout() {
563        do_test_conn_timeout(None).await;
564    }
565
566    #[tokio::test]
567    async fn test_conn_timeout_with_offload() {
568        let mut conf = ConnectorOptions::new(8);
569        conf.offload_threadpool = Some((2, 2));
570        do_test_conn_timeout(Some(conf)).await;
571    }
572
573    #[tokio::test]
574    async fn test_connector_bind_to() {
575        // connect to remote while bind to localhost will fail
576        let peer = BasicPeer::new("240.0.0.1:80");
577        let mut conf = ConnectorOptions::new(1);
578        conf.bind_to_v4.push("127.0.0.1:0".parse().unwrap());
579        let connector = TransportConnector::new(Some(conf));
580
581        let stream = connector.new_stream(&peer).await;
582        let error = stream.unwrap_err();
583        // XXX: some systems will allow the socket to bind and connect without error, only to timeout
584        assert!(error.etype() == &ConnectError || error.etype() == &ConnectTimedout)
585    }
586
587    /// Helper function for testing error handling in the `do_connect` function.
588    /// This assumes that the connection will fail to on the peer and returns
589    /// the decomposed error type and message
590    async fn get_do_connect_failure_with_peer(peer: &BasicPeer) -> (ErrorType, String) {
591        let tls_connector = Connector::new(None);
592        let stream = do_connect(peer, None, None, &tls_connector.ctx).await;
593        match stream {
594            Ok(_) => panic!("should throw an error"),
595            Err(e) => (
596                e.etype().clone(),
597                e.context
598                    .as_ref()
599                    .map(|ctx| ctx.as_str().to_owned())
600                    .unwrap_or_default(),
601            ),
602        }
603    }
604
605    #[tokio::test]
606    async fn test_do_connect_with_total_timeout() {
607        let mut peer = BasicPeer::new(BLACK_HOLE);
608        peer.options.total_connection_timeout = Some(std::time::Duration::from_millis(1));
609        let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
610        assert_eq!(etype, ConnectTimedout);
611        assert!(context.contains("total-connection timeout"));
612    }
613
614    #[tokio::test]
615    async fn test_tls_connect_timeout_supersedes_total() {
616        let mut peer = BasicPeer::new(BLACK_HOLE);
617        peer.options.total_connection_timeout = Some(std::time::Duration::from_millis(10));
618        peer.options.connection_timeout = Some(std::time::Duration::from_millis(1));
619        let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
620        assert_eq!(etype, ConnectTimedout);
621        assert!(!context.contains("total-connection timeout"));
622    }
623
624    #[tokio::test]
625    async fn test_do_connect_without_total_timeout() {
626        let peer = BasicPeer::new(BLACK_HOLE);
627        let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
628        assert!(etype != ConnectTimedout || !context.contains("total-connection timeout"));
629    }
630}