1pub mod http;
18pub mod l4;
19mod offload;
20
21#[cfg(feature = "any_tls")]
22mod tls;
23
24#[cfg(not(feature = "any_tls"))]
25use crate::tls::connectors as tls;
26
27use crate::protocols::Stream;
28use crate::server::configuration::ServerConf;
29use crate::upstreams::peer::{Peer, ALPN};
30
31pub use l4::Connect as L4Connect;
32use l4::{connect as l4_connect, BindTo};
33use log::{debug, error, warn};
34use offload::OffloadRuntime;
35use parking_lot::RwLock;
36use pingora_error::{Error, ErrorType::*, OrErr, Result};
37use pingora_pool::{ConnectionMeta, ConnectionPool};
38use std::collections::HashMap;
39use std::net::SocketAddr;
40use std::sync::Arc;
41use tls::TlsConnector;
42use tokio::sync::Mutex;
43
44#[derive(Clone)]
46pub struct ConnectorOptions {
47 pub ca_file: Option<String>,
52 #[cfg(feature = "s2n")]
58 pub s2n_config_cache_size: Option<usize>,
59 pub cert_key_file: Option<(String, String)>,
63 pub debug_ssl_keylog: bool,
67 pub keepalive_pool_size: usize,
69 pub offload_threadpool: Option<(usize, usize)>,
78 pub bind_to_v4: Vec<SocketAddr>,
80 pub bind_to_v6: Vec<SocketAddr>,
82}
83
84impl ConnectorOptions {
85 pub fn from_server_conf(server_conf: &ServerConf) -> Self {
87 let offload_threadpool = server_conf
89 .upstream_connect_offload_threadpools
90 .zip(server_conf.upstream_connect_offload_thread_per_pool)
91 .filter(|(pools, threads)| *pools > 0 && *threads > 0);
92
93 let bind_to_v4 = server_conf
96 .client_bind_to_ipv4
97 .iter()
98 .map(|v4| {
99 let ip = v4.parse().unwrap();
100 SocketAddr::new(ip, 0)
101 })
102 .collect();
103
104 let bind_to_v6 = server_conf
105 .client_bind_to_ipv6
106 .iter()
107 .map(|v6| {
108 let ip = v6.parse().unwrap();
109 SocketAddr::new(ip, 0)
110 })
111 .collect();
112 ConnectorOptions {
113 ca_file: server_conf.ca_file.clone(),
114 cert_key_file: None, #[cfg(feature = "s2n")]
116 s2n_config_cache_size: server_conf.s2n_config_cache_size,
117 debug_ssl_keylog: server_conf.upstream_debug_ssl_keylog,
118 keepalive_pool_size: server_conf.upstream_keepalive_pool_size,
119 offload_threadpool,
120 bind_to_v4,
121 bind_to_v6,
122 }
123 }
124
125 pub fn new(keepalive_pool_size: usize) -> Self {
127 ConnectorOptions {
128 ca_file: None,
129 #[cfg(feature = "s2n")]
130 s2n_config_cache_size: None,
131 cert_key_file: None,
132 debug_ssl_keylog: false,
133 keepalive_pool_size,
134 offload_threadpool: None,
135 bind_to_v4: vec![],
136 bind_to_v6: vec![],
137 }
138 }
139}
140
141pub struct TransportConnector {
143 tls_ctx: tls::Connector,
144 connection_pool: Arc<ConnectionPool<Arc<Mutex<Stream>>>>,
145 offload: Option<OffloadRuntime>,
146 bind_to_v4: Vec<SocketAddr>,
147 bind_to_v6: Vec<SocketAddr>,
148 preferred_http_version: PreferredHttpVersion,
149}
150
151const DEFAULT_POOL_SIZE: usize = 128;
152
153impl TransportConnector {
154 pub fn new(mut options: Option<ConnectorOptions>) -> Self {
156 let pool_size = options
157 .as_ref()
158 .map_or(DEFAULT_POOL_SIZE, |c| c.keepalive_pool_size);
159 let offload = options.as_mut().and_then(|o| o.offload_threadpool.take());
162 let bind_to_v4 = options
163 .as_ref()
164 .map_or_else(Vec::new, |o| o.bind_to_v4.clone());
165 let bind_to_v6 = options
166 .as_ref()
167 .map_or_else(Vec::new, |o| o.bind_to_v6.clone());
168 TransportConnector {
169 tls_ctx: tls::Connector::new(options),
170 connection_pool: Arc::new(ConnectionPool::new(pool_size)),
171 offload: offload.map(|v| OffloadRuntime::new(v.0, v.1)),
172 bind_to_v4,
173 bind_to_v6,
174 preferred_http_version: PreferredHttpVersion::new(),
175 }
176 }
177
178 pub async fn new_stream<P: Peer + Send + Sync + 'static>(&self, peer: &P) -> Result<Stream> {
182 let rt = self
183 .offload
184 .as_ref()
185 .map(|o| o.get_runtime(peer.reuse_hash()));
186 let bind_to = l4::bind_to_random(peer, &self.bind_to_v4, &self.bind_to_v6);
187 let alpn_override = self.preferred_http_version.get(peer);
188 let stream = if let Some(rt) = rt {
189 let peer = peer.clone();
190 let tls_ctx = self.tls_ctx.clone();
191 rt.spawn(async move { do_connect(&peer, bind_to, alpn_override, &tls_ctx.ctx).await })
192 .await
193 .or_err(InternalError, "offload runtime failure")??
194 } else {
195 do_connect(peer, bind_to, alpn_override, &self.tls_ctx.ctx).await?
196 };
197
198 Ok(stream)
199 }
200
201 pub async fn reused_stream<P: Peer + Send + Sync>(&self, peer: &P) -> Option<Stream> {
203 match self.connection_pool.get(&peer.reuse_hash()) {
204 Some(s) => {
205 debug!("find reusable stream, trying to acquire it");
206 {
207 let _ = s.lock().await;
208 } match Arc::try_unwrap(s) {
210 Ok(l) => {
211 let mut stream = l.into_inner();
212 #[cfg(unix)]
215 if peer.matches_fd(stream.id()) && test_reusable_stream(&mut stream) {
216 Some(stream)
217 } else {
218 None
219 }
220 #[cfg(windows)]
221 {
222 use std::os::windows::io::{AsRawSocket, RawSocket};
223 struct WrappedRawSocket(RawSocket);
224 impl AsRawSocket for WrappedRawSocket {
225 fn as_raw_socket(&self) -> RawSocket {
226 self.0
227 }
228 }
229 if peer.matches_sock(WrappedRawSocket(stream.id() as RawSocket))
230 && test_reusable_stream(&mut stream)
231 {
232 Some(stream)
233 } else {
234 None
235 }
236 }
237 }
238 Err(_) => {
239 error!("failed to acquire reusable stream");
240 None
241 }
242 }
243 }
244 None => {
245 debug!("No reusable connection found for {peer}");
246 None
247 }
248 }
249 }
250
251 pub fn release_stream(
259 &self,
260 mut stream: Stream,
261 key: u64, idle_timeout: Option<std::time::Duration>,
263 ) {
264 if !test_reusable_stream(&mut stream) {
265 return;
266 }
267 let id = stream.id();
268 let meta = ConnectionMeta::new(key, id);
269 debug!("Try to keepalive client session");
270 let stream = Arc::new(Mutex::new(stream));
271 let locked_stream = stream.clone().try_lock_owned().unwrap(); let (notify_close, watch_use) = self.connection_pool.put(&meta, stream);
273 let pool = self.connection_pool.clone(); let rt = pingora_runtime::current_handle();
275 rt.spawn(async move {
276 pool.idle_poll(locked_stream, &meta, idle_timeout, notify_close, watch_use)
277 .await;
278 });
279 }
280
281 pub async fn get_stream<P: Peer + Send + Sync + 'static>(
288 &self,
289 peer: &P,
290 ) -> Result<(Stream, bool)> {
291 let reused_stream = self.reused_stream(peer).await;
292 if let Some(s) = reused_stream {
293 Ok((s, true))
294 } else {
295 let s = self.new_stream(peer).await?;
296 Ok((s, false))
297 }
298 }
299
300 pub fn prefer_h1(&self, peer: &impl Peer) {
302 self.preferred_http_version.add(peer, 1);
303 }
304}
305
306async fn do_connect<P: Peer + Send + Sync>(
309 peer: &P,
310 bind_to: Option<BindTo>,
311 alpn_override: Option<ALPN>,
312 tls_ctx: &TlsConnector,
313) -> Result<Stream> {
314 let connect_future = do_connect_inner(peer, bind_to, alpn_override, tls_ctx);
317
318 match peer.total_connection_timeout() {
319 Some(t) => match pingora_timeout::timeout(t, connect_future).await {
320 Ok(res) => res,
321 Err(_) => Error::e_explain(
322 ConnectTimedout,
323 format!("connecting to server {peer}, total-connection timeout {t:?}"),
324 ),
325 },
326 None => connect_future.await,
327 }
328}
329
330async fn do_connect_inner<P: Peer + Send + Sync>(
332 peer: &P,
333 bind_to: Option<BindTo>,
334 alpn_override: Option<ALPN>,
335 tls_ctx: &TlsConnector,
336) -> Result<Stream> {
337 let stream = l4_connect(peer, bind_to).await?;
338 if peer.tls() {
339 let tls_stream = tls::connect(stream, peer, alpn_override, tls_ctx).await?;
340 Ok(Box::new(tls_stream))
341 } else {
342 Ok(Box::new(stream))
343 }
344}
345
346struct PreferredHttpVersion {
347 versions: RwLock<HashMap<u64, u8>>, }
350
351impl PreferredHttpVersion {
354 pub fn new() -> Self {
355 PreferredHttpVersion {
356 versions: RwLock::default(),
357 }
358 }
359
360 pub fn add(&self, peer: &impl Peer, version: u8) {
361 let key = peer.reuse_hash();
362 let mut v = self.versions.write();
363 v.insert(key, version);
364 }
365
366 pub fn get(&self, peer: &impl Peer) -> Option<ALPN> {
367 let key = peer.reuse_hash();
368 let v = self.versions.read();
369 v.get(&key)
370 .copied()
371 .map(|v| if v == 1 { ALPN::H1 } else { ALPN::H2H1 })
372 }
373}
374
375use futures::future::FutureExt;
376use tokio::io::AsyncReadExt;
377
378fn test_reusable_stream(stream: &mut Stream) -> bool {
380 let mut buf = [0; 1];
381 let result = tokio::task::unconstrained(stream.read(&mut buf[..])).now_or_never();
383 if let Some(data_result) = result {
384 match data_result {
385 Ok(n) => {
386 if n == 0 {
387 debug!("Idle connection is closed");
388 } else {
389 warn!("Unexpected data read in idle connection");
390 }
391 }
392 Err(e) => {
393 debug!("Idle connection is broken: {e:?}");
394 }
395 }
396 false
397 } else {
398 true
399 }
400}
401
402#[cfg(all(test, unix))]
404pub(crate) mod test_utils {
405 use tokio::io::AsyncWriteExt;
406 use tokio::net::UnixListener;
407
408 pub fn unique_uds_path(test_name: &str) -> String {
410 format!(
411 "/tmp/test_{test_name}_{:?}_{}.sock",
412 std::thread::current().id(),
413 std::process::id()
414 )
415 }
416
417 pub fn spawn_mock_uds_server(
424 socket_path: String,
425 response: &'static [u8],
426 ) -> (
427 tokio::sync::oneshot::Receiver<()>,
428 tokio::sync::oneshot::Sender<()>,
429 tokio::task::JoinHandle<()>,
430 ) {
431 let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
432 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
433
434 let server_handle = tokio::spawn(async move {
435 let _ = std::fs::remove_file(&socket_path);
436 let listener = UnixListener::bind(&socket_path).unwrap();
437 let _ = ready_tx.send(());
439
440 if let Ok((mut stream, _addr)) = listener.accept().await {
441 let _ = stream.write_all(response).await;
442 let _ = shutdown_rx.await;
444 }
445 let _ = std::fs::remove_file(&socket_path);
446 });
447
448 (ready_rx, shutdown_tx, server_handle)
449 }
450
451 pub fn spawn_mock_uds_server_close_immediate(
455 socket_path: String,
456 ) -> (
457 tokio::sync::oneshot::Receiver<()>,
458 tokio::sync::oneshot::Sender<()>,
459 tokio::task::JoinHandle<()>,
460 ) {
461 let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
462 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
463
464 let server_handle = tokio::spawn(async move {
465 let _ = std::fs::remove_file(&socket_path);
466 let listener = UnixListener::bind(&socket_path).unwrap();
467 let _ = ready_tx.send(());
469
470 if let Ok((mut stream, _addr)) = listener.accept().await {
471 let _ = stream.shutdown().await;
472 let _ = shutdown_rx.await;
474 }
475 let _ = std::fs::remove_file(&socket_path);
476 });
477
478 (ready_rx, shutdown_tx, server_handle)
479 }
480}
481
482#[cfg(test)]
483#[cfg(feature = "any_tls")]
484mod tests {
485 use pingora_error::ErrorType;
486 use tls::Connector;
487
488 use super::*;
489 use crate::upstreams::peer::BasicPeer;
490
491 const BLACK_HOLE: &str = "192.0.2.1:79";
493
494 #[tokio::test]
495 async fn test_connect() {
496 let connector = TransportConnector::new(None);
497 let peer = BasicPeer::new("1.1.1.1:80");
498 let stream = connector.new_stream(&peer).await.unwrap();
500 connector.release_stream(stream, peer.reuse_hash(), None);
501
502 let (_, reused) = connector.get_stream(&peer).await.unwrap();
503 assert!(reused);
504 }
505
506 #[tokio::test]
507 async fn test_connect_tls() {
508 let connector = TransportConnector::new(None);
509 let mut peer = BasicPeer::new("1.1.1.1:443");
510 peer.sni = "one.one.one.one".to_string();
512 let stream = connector.new_stream(&peer).await.unwrap();
514 connector.release_stream(stream, peer.reuse_hash(), None);
515
516 let (_, reused) = connector.get_stream(&peer).await.unwrap();
517 assert!(reused);
518 }
519
520 #[tokio::test(flavor = "multi_thread")]
521 #[cfg(unix)]
522 async fn test_connect_uds() {
523 let socket_path = test_utils::unique_uds_path("transport_connector");
524 let (ready_rx, shutdown_tx, server_handle) =
525 test_utils::spawn_mock_uds_server(socket_path.clone(), b"it works!");
526
527 ready_rx.await.unwrap();
529
530 let connector = TransportConnector::new(None);
532 let peer = BasicPeer::new_uds(&socket_path).unwrap();
533 let mut stream = connector.new_stream(&peer).await.unwrap();
535 let mut buf = [0; 9];
536 let _ = stream.read(&mut buf).await.unwrap();
537 assert_eq!(&buf, b"it works!");
538
539 connector.release_stream(stream, peer.reuse_hash(), None);
541 let (stream, reused) = connector.get_stream(&peer).await.unwrap();
542 assert!(reused);
543
544 drop(stream);
546 let _ = shutdown_tx.send(());
547 server_handle.await.unwrap();
548 }
549
550 async fn do_test_conn_timeout(conf: Option<ConnectorOptions>) {
551 let connector = TransportConnector::new(conf);
552 let mut peer = BasicPeer::new(BLACK_HOLE);
553 peer.options.connection_timeout = Some(std::time::Duration::from_millis(1));
554 let stream = connector.new_stream(&peer).await;
555 match stream {
556 Ok(_) => panic!("should throw an error"),
557 Err(e) => assert_eq!(e.etype(), &ConnectTimedout),
558 }
559 }
560
561 #[tokio::test]
562 async fn test_conn_timeout() {
563 do_test_conn_timeout(None).await;
564 }
565
566 #[tokio::test]
567 async fn test_conn_timeout_with_offload() {
568 let mut conf = ConnectorOptions::new(8);
569 conf.offload_threadpool = Some((2, 2));
570 do_test_conn_timeout(Some(conf)).await;
571 }
572
573 #[tokio::test]
574 async fn test_connector_bind_to() {
575 let peer = BasicPeer::new("240.0.0.1:80");
577 let mut conf = ConnectorOptions::new(1);
578 conf.bind_to_v4.push("127.0.0.1:0".parse().unwrap());
579 let connector = TransportConnector::new(Some(conf));
580
581 let stream = connector.new_stream(&peer).await;
582 let error = stream.unwrap_err();
583 assert!(error.etype() == &ConnectError || error.etype() == &ConnectTimedout)
585 }
586
587 async fn get_do_connect_failure_with_peer(peer: &BasicPeer) -> (ErrorType, String) {
591 let tls_connector = Connector::new(None);
592 let stream = do_connect(peer, None, None, &tls_connector.ctx).await;
593 match stream {
594 Ok(_) => panic!("should throw an error"),
595 Err(e) => (
596 e.etype().clone(),
597 e.context
598 .as_ref()
599 .map(|ctx| ctx.as_str().to_owned())
600 .unwrap_or_default(),
601 ),
602 }
603 }
604
605 #[tokio::test]
606 async fn test_do_connect_with_total_timeout() {
607 let mut peer = BasicPeer::new(BLACK_HOLE);
608 peer.options.total_connection_timeout = Some(std::time::Duration::from_millis(1));
609 let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
610 assert_eq!(etype, ConnectTimedout);
611 assert!(context.contains("total-connection timeout"));
612 }
613
614 #[tokio::test]
615 async fn test_tls_connect_timeout_supersedes_total() {
616 let mut peer = BasicPeer::new(BLACK_HOLE);
617 peer.options.total_connection_timeout = Some(std::time::Duration::from_millis(10));
618 peer.options.connection_timeout = Some(std::time::Duration::from_millis(1));
619 let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
620 assert_eq!(etype, ConnectTimedout);
621 assert!(!context.contains("total-connection timeout"));
622 }
623
624 #[tokio::test]
625 async fn test_do_connect_without_total_timeout() {
626 let peer = BasicPeer::new(BLACK_HOLE);
627 let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
628 assert!(etype != ConnectTimedout || !context.contains("total-connection timeout"));
629 }
630}