1pub mod http;
18pub mod l4;
19mod offload;
20
21#[cfg(feature = "any_tls")]
22mod tls;
23
24#[cfg(not(feature = "any_tls"))]
25use crate::tls::connectors as tls;
26
27use crate::protocols::Stream;
28use crate::server::configuration::ServerConf;
29use crate::upstreams::peer::{Peer, ALPN};
30
31pub use l4::Connect as L4Connect;
32use l4::{connect as l4_connect, BindTo};
33use log::{debug, error, warn};
34use offload::OffloadRuntime;
35use parking_lot::RwLock;
36use pingora_error::{Error, ErrorType::*, OrErr, Result};
37use pingora_pool::{ConnectionMeta, ConnectionPool};
38use std::collections::HashMap;
39use std::net::SocketAddr;
40use std::sync::Arc;
41use tls::TlsConnector;
42use tokio::sync::Mutex;
43
44#[derive(Clone)]
46pub struct ConnectorOptions {
47 pub ca_file: Option<String>,
52 pub cert_key_file: Option<(String, String)>,
56 pub debug_ssl_keylog: bool,
60 pub keepalive_pool_size: usize,
62 pub offload_threadpool: Option<(usize, usize)>,
71 pub bind_to_v4: Vec<SocketAddr>,
73 pub bind_to_v6: Vec<SocketAddr>,
75}
76
77impl ConnectorOptions {
78 pub fn from_server_conf(server_conf: &ServerConf) -> Self {
80 let offload_threadpool = server_conf
82 .upstream_connect_offload_threadpools
83 .zip(server_conf.upstream_connect_offload_thread_per_pool)
84 .filter(|(pools, threads)| *pools > 0 && *threads > 0);
85
86 let bind_to_v4 = server_conf
89 .client_bind_to_ipv4
90 .iter()
91 .map(|v4| {
92 let ip = v4.parse().unwrap();
93 SocketAddr::new(ip, 0)
94 })
95 .collect();
96
97 let bind_to_v6 = server_conf
98 .client_bind_to_ipv6
99 .iter()
100 .map(|v6| {
101 let ip = v6.parse().unwrap();
102 SocketAddr::new(ip, 0)
103 })
104 .collect();
105 ConnectorOptions {
106 ca_file: server_conf.ca_file.clone(),
107 cert_key_file: None, debug_ssl_keylog: server_conf.upstream_debug_ssl_keylog,
109 keepalive_pool_size: server_conf.upstream_keepalive_pool_size,
110 offload_threadpool,
111 bind_to_v4,
112 bind_to_v6,
113 }
114 }
115
116 pub fn new(keepalive_pool_size: usize) -> Self {
118 ConnectorOptions {
119 ca_file: None,
120 cert_key_file: None,
121 debug_ssl_keylog: false,
122 keepalive_pool_size,
123 offload_threadpool: None,
124 bind_to_v4: vec![],
125 bind_to_v6: vec![],
126 }
127 }
128}
129
130pub struct TransportConnector {
132 tls_ctx: tls::Connector,
133 connection_pool: Arc<ConnectionPool<Arc<Mutex<Stream>>>>,
134 offload: Option<OffloadRuntime>,
135 bind_to_v4: Vec<SocketAddr>,
136 bind_to_v6: Vec<SocketAddr>,
137 preferred_http_version: PreferredHttpVersion,
138}
139
140const DEFAULT_POOL_SIZE: usize = 128;
141
142impl TransportConnector {
143 pub fn new(mut options: Option<ConnectorOptions>) -> Self {
145 let pool_size = options
146 .as_ref()
147 .map_or(DEFAULT_POOL_SIZE, |c| c.keepalive_pool_size);
148 let offload = options.as_mut().and_then(|o| o.offload_threadpool.take());
151 let bind_to_v4 = options
152 .as_ref()
153 .map_or_else(Vec::new, |o| o.bind_to_v4.clone());
154 let bind_to_v6 = options
155 .as_ref()
156 .map_or_else(Vec::new, |o| o.bind_to_v6.clone());
157 TransportConnector {
158 tls_ctx: tls::Connector::new(options),
159 connection_pool: Arc::new(ConnectionPool::new(pool_size)),
160 offload: offload.map(|v| OffloadRuntime::new(v.0, v.1)),
161 bind_to_v4,
162 bind_to_v6,
163 preferred_http_version: PreferredHttpVersion::new(),
164 }
165 }
166
167 pub async fn new_stream<P: Peer + Send + Sync + 'static>(&self, peer: &P) -> Result<Stream> {
171 let rt = self
172 .offload
173 .as_ref()
174 .map(|o| o.get_runtime(peer.reuse_hash()));
175 let bind_to = l4::bind_to_random(peer, &self.bind_to_v4, &self.bind_to_v6);
176 let alpn_override = self.preferred_http_version.get(peer);
177 let stream = if let Some(rt) = rt {
178 let peer = peer.clone();
179 let tls_ctx = self.tls_ctx.clone();
180 rt.spawn(async move { do_connect(&peer, bind_to, alpn_override, &tls_ctx.ctx).await })
181 .await
182 .or_err(InternalError, "offload runtime failure")??
183 } else {
184 do_connect(peer, bind_to, alpn_override, &self.tls_ctx.ctx).await?
185 };
186
187 Ok(stream)
188 }
189
190 pub async fn reused_stream<P: Peer + Send + Sync>(&self, peer: &P) -> Option<Stream> {
192 match self.connection_pool.get(&peer.reuse_hash()) {
193 Some(s) => {
194 debug!("find reusable stream, trying to acquire it");
195 {
196 let _ = s.lock().await;
197 } match Arc::try_unwrap(s) {
199 Ok(l) => {
200 let mut stream = l.into_inner();
201 #[cfg(unix)]
204 if peer.matches_fd(stream.id()) && test_reusable_stream(&mut stream) {
205 Some(stream)
206 } else {
207 None
208 }
209 #[cfg(windows)]
210 {
211 use std::os::windows::io::{AsRawSocket, RawSocket};
212 struct WrappedRawSocket(RawSocket);
213 impl AsRawSocket for WrappedRawSocket {
214 fn as_raw_socket(&self) -> RawSocket {
215 self.0
216 }
217 }
218 if peer.matches_sock(WrappedRawSocket(stream.id() as RawSocket))
219 && test_reusable_stream(&mut stream)
220 {
221 Some(stream)
222 } else {
223 None
224 }
225 }
226 }
227 Err(_) => {
228 error!("failed to acquire reusable stream");
229 None
230 }
231 }
232 }
233 None => {
234 debug!("No reusable connection found for {peer}");
235 None
236 }
237 }
238 }
239
240 pub fn release_stream(
248 &self,
249 mut stream: Stream,
250 key: u64, idle_timeout: Option<std::time::Duration>,
252 ) {
253 if !test_reusable_stream(&mut stream) {
254 return;
255 }
256 let id = stream.id();
257 let meta = ConnectionMeta::new(key, id);
258 debug!("Try to keepalive client session");
259 let stream = Arc::new(Mutex::new(stream));
260 let locked_stream = stream.clone().try_lock_owned().unwrap(); let (notify_close, watch_use) = self.connection_pool.put(&meta, stream);
262 let pool = self.connection_pool.clone(); let rt = pingora_runtime::current_handle();
264 rt.spawn(async move {
265 pool.idle_poll(locked_stream, &meta, idle_timeout, notify_close, watch_use)
266 .await;
267 });
268 }
269
270 pub async fn get_stream<P: Peer + Send + Sync + 'static>(
277 &self,
278 peer: &P,
279 ) -> Result<(Stream, bool)> {
280 let reused_stream = self.reused_stream(peer).await;
281 if let Some(s) = reused_stream {
282 Ok((s, true))
283 } else {
284 let s = self.new_stream(peer).await?;
285 Ok((s, false))
286 }
287 }
288
289 pub fn prefer_h1(&self, peer: &impl Peer) {
291 self.preferred_http_version.add(peer, 1);
292 }
293}
294
295async fn do_connect<P: Peer + Send + Sync>(
298 peer: &P,
299 bind_to: Option<BindTo>,
300 alpn_override: Option<ALPN>,
301 tls_ctx: &TlsConnector,
302) -> Result<Stream> {
303 let connect_future = do_connect_inner(peer, bind_to, alpn_override, tls_ctx);
306
307 match peer.total_connection_timeout() {
308 Some(t) => match pingora_timeout::timeout(t, connect_future).await {
309 Ok(res) => res,
310 Err(_) => Error::e_explain(
311 ConnectTimedout,
312 format!("connecting to server {peer}, total-connection timeout {t:?}"),
313 ),
314 },
315 None => connect_future.await,
316 }
317}
318
319async fn do_connect_inner<P: Peer + Send + Sync>(
321 peer: &P,
322 bind_to: Option<BindTo>,
323 alpn_override: Option<ALPN>,
324 tls_ctx: &TlsConnector,
325) -> Result<Stream> {
326 let stream = l4_connect(peer, bind_to).await?;
327 if peer.tls() {
328 let tls_stream = tls::connect(stream, peer, alpn_override, tls_ctx).await?;
329 Ok(Box::new(tls_stream))
330 } else {
331 Ok(Box::new(stream))
332 }
333}
334
335struct PreferredHttpVersion {
336 versions: RwLock<HashMap<u64, u8>>, }
339
340impl PreferredHttpVersion {
343 pub fn new() -> Self {
344 PreferredHttpVersion {
345 versions: RwLock::default(),
346 }
347 }
348
349 pub fn add(&self, peer: &impl Peer, version: u8) {
350 let key = peer.reuse_hash();
351 let mut v = self.versions.write();
352 v.insert(key, version);
353 }
354
355 pub fn get(&self, peer: &impl Peer) -> Option<ALPN> {
356 let key = peer.reuse_hash();
357 let v = self.versions.read();
358 v.get(&key)
359 .copied()
360 .map(|v| if v == 1 { ALPN::H1 } else { ALPN::H2H1 })
361 }
362}
363
364use futures::future::FutureExt;
365use tokio::io::AsyncReadExt;
366
367fn test_reusable_stream(stream: &mut Stream) -> bool {
369 let mut buf = [0; 1];
370 let result = tokio::task::unconstrained(stream.read(&mut buf[..])).now_or_never();
372 if let Some(data_result) = result {
373 match data_result {
374 Ok(n) => {
375 if n == 0 {
376 debug!("Idle connection is closed");
377 } else {
378 warn!("Unexpected data read in idle connection");
379 }
380 }
381 Err(e) => {
382 debug!("Idle connection is broken: {e:?}");
383 }
384 }
385 false
386 } else {
387 true
388 }
389}
390
391#[cfg(test)]
392#[cfg(feature = "any_tls")]
393mod tests {
394 use pingora_error::ErrorType;
395 use tls::Connector;
396
397 use super::*;
398 use crate::upstreams::peer::BasicPeer;
399 use tokio::io::AsyncWriteExt;
400 #[cfg(unix)]
401 use tokio::net::UnixListener;
402
403 const BLACK_HOLE: &str = "192.0.2.1:79";
405
406 #[tokio::test]
407 async fn test_connect() {
408 let connector = TransportConnector::new(None);
409 let peer = BasicPeer::new("1.1.1.1:80");
410 let stream = connector.new_stream(&peer).await.unwrap();
412 connector.release_stream(stream, peer.reuse_hash(), None);
413
414 let (_, reused) = connector.get_stream(&peer).await.unwrap();
415 assert!(reused);
416 }
417
418 #[tokio::test]
419 async fn test_connect_tls() {
420 let connector = TransportConnector::new(None);
421 let mut peer = BasicPeer::new("1.1.1.1:443");
422 peer.sni = "one.one.one.one".to_string();
424 let stream = connector.new_stream(&peer).await.unwrap();
426 connector.release_stream(stream, peer.reuse_hash(), None);
427
428 let (_, reused) = connector.get_stream(&peer).await.unwrap();
429 assert!(reused);
430 }
431
432 #[cfg(unix)]
433 const MOCK_UDS_PATH: &str = "/tmp/test_unix_transport_connector.sock";
434
435 #[cfg(unix)]
437 async fn mock_connect_server() {
438 let _ = std::fs::remove_file(MOCK_UDS_PATH);
439 let listener = UnixListener::bind(MOCK_UDS_PATH).unwrap();
440 if let Ok((mut stream, _addr)) = listener.accept().await {
441 stream.write_all(b"it works!").await.unwrap();
442 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
444 }
445 let _ = std::fs::remove_file(MOCK_UDS_PATH);
446 }
447 #[tokio::test(flavor = "multi_thread")]
448 async fn test_connect_uds() {
449 tokio::spawn(async {
450 mock_connect_server().await;
451 });
452 let connector = TransportConnector::new(None);
454 let peer = BasicPeer::new_uds(MOCK_UDS_PATH).unwrap();
455 let mut stream = connector.new_stream(&peer).await.unwrap();
457 let mut buf = [0; 9];
458 let _ = stream.read(&mut buf).await.unwrap();
459 assert_eq!(&buf, b"it works!");
460 connector.release_stream(stream, peer.reuse_hash(), None);
461
462 let (_, reused) = connector.get_stream(&peer).await.unwrap();
463 assert!(reused);
464 }
465
466 async fn do_test_conn_timeout(conf: Option<ConnectorOptions>) {
467 let connector = TransportConnector::new(conf);
468 let mut peer = BasicPeer::new(BLACK_HOLE);
469 peer.options.connection_timeout = Some(std::time::Duration::from_millis(1));
470 let stream = connector.new_stream(&peer).await;
471 match stream {
472 Ok(_) => panic!("should throw an error"),
473 Err(e) => assert_eq!(e.etype(), &ConnectTimedout),
474 }
475 }
476
477 #[tokio::test]
478 async fn test_conn_timeout() {
479 do_test_conn_timeout(None).await;
480 }
481
482 #[tokio::test]
483 async fn test_conn_timeout_with_offload() {
484 let mut conf = ConnectorOptions::new(8);
485 conf.offload_threadpool = Some((2, 2));
486 do_test_conn_timeout(Some(conf)).await;
487 }
488
489 #[tokio::test]
490 async fn test_connector_bind_to() {
491 let peer = BasicPeer::new("240.0.0.1:80");
493 let mut conf = ConnectorOptions::new(1);
494 conf.bind_to_v4.push("127.0.0.1:0".parse().unwrap());
495 let connector = TransportConnector::new(Some(conf));
496
497 let stream = connector.new_stream(&peer).await;
498 let error = stream.unwrap_err();
499 assert!(error.etype() == &ConnectError || error.etype() == &ConnectTimedout)
501 }
502
503 async fn get_do_connect_failure_with_peer(peer: &BasicPeer) -> (ErrorType, String) {
507 let tls_connector = Connector::new(None);
508 let stream = do_connect(peer, None, None, &tls_connector.ctx).await;
509 match stream {
510 Ok(_) => panic!("should throw an error"),
511 Err(e) => (
512 e.etype().clone(),
513 e.context
514 .as_ref()
515 .map(|ctx| ctx.as_str().to_owned())
516 .unwrap_or_default(),
517 ),
518 }
519 }
520
521 #[tokio::test]
522 async fn test_do_connect_with_total_timeout() {
523 let mut peer = BasicPeer::new(BLACK_HOLE);
524 peer.options.total_connection_timeout = Some(std::time::Duration::from_millis(1));
525 let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
526 assert_eq!(etype, ConnectTimedout);
527 assert!(context.contains("total-connection timeout"));
528 }
529
530 #[tokio::test]
531 async fn test_tls_connect_timeout_supersedes_total() {
532 let mut peer = BasicPeer::new(BLACK_HOLE);
533 peer.options.total_connection_timeout = Some(std::time::Duration::from_millis(10));
534 peer.options.connection_timeout = Some(std::time::Duration::from_millis(1));
535 let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
536 assert_eq!(etype, ConnectTimedout);
537 assert!(!context.contains("total-connection timeout"));
538 }
539
540 #[tokio::test]
541 async fn test_do_connect_without_total_timeout() {
542 let peer = BasicPeer::new(BLACK_HOLE);
543 let (etype, context) = get_do_connect_failure_with_peer(&peer).await;
544 assert!(etype != ConnectTimedout || !context.contains("total-connection timeout"));
545 }
546}