1use crate::connectors::{l4::BindTo, L4Connect};
18use crate::protocols::l4::socket::SocketAddr;
19use crate::protocols::tls::CaType;
20#[cfg(feature = "openssl_derived")]
21use crate::protocols::tls::HandshakeCompleteHook;
22#[cfg(feature = "s2n")]
23use crate::protocols::tls::PskType;
24#[cfg(unix)]
25use crate::protocols::ConnFdReusable;
26use crate::protocols::TcpKeepalive;
27use crate::utils::tls::{get_organization_unit, CertKey};
28use ahash::AHasher;
29use derivative::Derivative;
30use pingora_error::{
31 ErrorType::{InternalError, SocketError},
32 OrErr, Result,
33};
34#[cfg(feature = "s2n")]
35use pingora_s2n::S2NPolicy;
36use std::collections::BTreeMap;
37use std::fmt::{Display, Formatter, Result as FmtResult};
38use std::hash::{Hash, Hasher};
39use std::net::{IpAddr, SocketAddr as InetSocketAddr, ToSocketAddrs as ToInetSocketAddrs};
40#[cfg(unix)]
41use std::os::unix::{net::SocketAddr as UnixSocketAddr, prelude::AsRawFd};
42#[cfg(windows)]
43use std::os::windows::io::AsRawSocket;
44use std::path::{Path, PathBuf};
45use std::sync::Arc;
46use std::time::Duration;
47use tokio::net::TcpSocket;
48
49pub use crate::protocols::tls::ALPN;
50
51pub type ProxyDigestUserDataHook = Arc<
59 dyn Fn(
60 &http::request::Parts, &pingora_http::ResponseHeader, ) -> Option<Box<dyn std::any::Any + Send + Sync>>
63 + Send
64 + Sync
65 + 'static,
66>;
67
68pub trait Tracing: Send + Sync + std::fmt::Debug {
70 fn on_connected(&self);
72 fn on_disconnected(&self);
74 fn boxed_clone(&self) -> Box<dyn Tracing>;
76}
77
78#[derive(Debug)]
80pub struct Tracer(pub Box<dyn Tracing>);
81
82impl Clone for Tracer {
83 fn clone(&self) -> Self {
84 Tracer(self.0.boxed_clone())
85 }
86}
87
88pub trait Peer: Display + Clone {
91 fn address(&self) -> &SocketAddr;
93 fn tls(&self) -> bool;
95 fn sni(&self) -> &str;
97 fn reuse_hash(&self) -> u64;
102 fn get_proxy(&self) -> Option<&Proxy> {
104 None
105 }
106 fn get_peer_options(&self) -> Option<&PeerOptions> {
110 None
111 }
112 fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
114 None
115 }
116 fn verify_cert(&self) -> bool {
118 match self.get_peer_options() {
119 Some(opt) => opt.verify_cert,
120 None => false,
121 }
122 }
123 fn verify_hostname(&self) -> bool {
125 match self.get_peer_options() {
126 Some(opt) => opt.verify_hostname,
127 None => false,
128 }
129 }
130 #[cfg(feature = "s2n")]
132 fn use_system_certs(&self) -> bool {
133 match self.get_peer_options() {
134 Some(opt) => opt.use_system_certs,
135 None => false,
136 }
137 }
138 fn alternative_cn(&self) -> Option<&String> {
143 match self.get_peer_options() {
144 Some(opt) => opt.alternative_cn.as_ref(),
145 None => None,
146 }
147 }
148 fn bind_to(&self) -> Option<&BindTo> {
150 match self.get_peer_options() {
151 Some(opt) => opt.bind_to.as_ref(),
152 None => None,
153 }
154 }
155 fn connection_timeout(&self) -> Option<Duration> {
157 match self.get_peer_options() {
158 Some(opt) => opt.connection_timeout,
159 None => None,
160 }
161 }
162 fn total_connection_timeout(&self) -> Option<Duration> {
164 match self.get_peer_options() {
165 Some(opt) => opt.total_connection_timeout,
166 None => None,
167 }
168 }
169 fn idle_timeout(&self) -> Option<Duration> {
172 self.get_peer_options().and_then(|o| o.idle_timeout)
173 }
174
175 fn get_alpn(&self) -> Option<&ALPN> {
177 self.get_peer_options().map(|opt| &opt.alpn)
178 }
179
180 fn get_ca(&self) -> Option<&Arc<CaType>> {
184 match self.get_peer_options() {
185 Some(opt) => opt.ca.as_ref(),
186 None => None,
187 }
188 }
189
190 fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
192 None
193 }
194
195 #[cfg(feature = "s2n")]
199 fn get_psk(&self) -> Option<&Arc<PskType>> {
200 match self.get_peer_options() {
201 Some(opt) => opt.psk.as_ref(),
202 None => None,
203 }
204 }
205
206 #[cfg(feature = "s2n")]
211 fn get_s2n_security_policy(&self) -> Option<&S2NPolicy> {
212 match self.get_peer_options() {
213 Some(opt) => opt.s2n_security_policy.as_ref(),
214 None => None,
215 }
216 }
217
218 #[cfg(feature = "s2n")]
222 fn get_max_blinding_delay(&self) -> Option<u32> {
223 match self.get_peer_options() {
224 Some(opt) => opt.max_blinding_delay,
225 None => None,
226 }
227 }
228
229 fn tcp_keepalive(&self) -> Option<&TcpKeepalive> {
231 self.get_peer_options()
232 .and_then(|o| o.tcp_keepalive.as_ref())
233 }
234
235 fn h2_ping_interval(&self) -> Option<Duration> {
237 self.get_peer_options().and_then(|o| o.h2_ping_interval)
238 }
239
240 fn tcp_recv_buf(&self) -> Option<usize> {
242 self.get_peer_options().and_then(|o| o.tcp_recv_buf)
243 }
244
245 fn dscp(&self) -> Option<u8> {
248 self.get_peer_options().and_then(|o| o.dscp)
249 }
250
251 fn tcp_fast_open(&self) -> bool {
253 self.get_peer_options()
254 .map(|o| o.tcp_fast_open)
255 .unwrap_or_default()
256 }
257
258 #[cfg(unix)]
259 fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
260 self.address().check_fd_match(fd)
261 }
262
263 #[cfg(windows)]
264 fn matches_sock<V: AsRawSocket>(&self, sock: V) -> bool {
265 use crate::protocols::ConnSockReusable;
266 self.address().check_sock_match(sock)
267 }
268
269 fn get_tracer(&self) -> Option<Tracer> {
270 None
271 }
272
273 fn upstream_tcp_sock_tweak_hook(
277 &self,
278 ) -> Option<&Arc<dyn Fn(&TcpSocket) -> Result<()> + Send + Sync + 'static>> {
279 self.get_peer_options()?
280 .upstream_tcp_sock_tweak_hook
281 .as_ref()
282 }
283
284 fn proxy_digest_user_data_hook(&self) -> Option<&ProxyDigestUserDataHook> {
287 self.get_peer_options()?
288 .proxy_digest_user_data_hook
289 .as_ref()
290 }
291
292 #[cfg(feature = "openssl_derived")]
301 fn upstream_tls_handshake_complete_hook(&self) -> Option<&HandshakeCompleteHook> {
302 self.get_peer_options()?
303 .upstream_tls_handshake_complete_hook
304 .as_ref()
305 }
306}
307
308#[derive(Debug, Clone)]
310pub struct BasicPeer {
311 pub _address: SocketAddr,
312 pub sni: String,
313 pub options: PeerOptions,
314}
315
316impl BasicPeer {
317 pub fn new(address: &str) -> Self {
319 let addr = SocketAddr::Inet(address.parse().unwrap()); Self::new_from_sockaddr(addr)
321 }
322
323 #[cfg(unix)]
325 pub fn new_uds<P: AsRef<Path>>(path: P) -> Result<Self> {
326 let addr = SocketAddr::Unix(
327 UnixSocketAddr::from_pathname(path.as_ref())
328 .or_err(InternalError, "while creating BasicPeer")?,
329 );
330 Ok(Self::new_from_sockaddr(addr))
331 }
332
333 fn new_from_sockaddr(sockaddr: SocketAddr) -> Self {
334 BasicPeer {
335 _address: sockaddr,
336 sni: "".to_string(), options: PeerOptions::new(),
338 }
339 }
340}
341
342impl Display for BasicPeer {
343 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
344 write!(f, "{:?}", self)
345 }
346}
347
348impl Peer for BasicPeer {
349 fn address(&self) -> &SocketAddr {
350 &self._address
351 }
352
353 fn tls(&self) -> bool {
354 !self.sni.is_empty()
355 }
356
357 fn bind_to(&self) -> Option<&BindTo> {
358 None
359 }
360
361 fn sni(&self) -> &str {
362 &self.sni
363 }
364
365 fn reuse_hash(&self) -> u64 {
367 let mut hasher = AHasher::default();
368 self._address.hash(&mut hasher);
369 hasher.finish()
370 }
371
372 fn get_peer_options(&self) -> Option<&PeerOptions> {
373 Some(&self.options)
374 }
375}
376
377#[derive(Hash, Clone, Debug, PartialEq)]
379pub enum Scheme {
380 HTTP,
381 HTTPS,
382}
383
384impl Display for Scheme {
385 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
386 match self {
387 Scheme::HTTP => write!(f, "HTTP"),
388 Scheme::HTTPS => write!(f, "HTTPS"),
389 }
390 }
391}
392
393impl Scheme {
394 pub fn from_tls_bool(tls: bool) -> Self {
395 if tls {
396 Self::HTTPS
397 } else {
398 Self::HTTP
399 }
400 }
401}
402
403#[non_exhaustive]
407#[derive(Clone, Derivative)]
408#[derivative(Debug)]
409pub struct PeerOptions {
410 pub bind_to: Option<BindTo>,
411 pub connection_timeout: Option<Duration>,
412 pub total_connection_timeout: Option<Duration>,
413 pub read_timeout: Option<Duration>,
414 pub idle_timeout: Option<Duration>,
415 pub write_timeout: Option<Duration>,
416 pub verify_cert: bool,
417 pub verify_hostname: bool,
418 #[cfg(feature = "s2n")]
419 pub use_system_certs: bool,
420 pub alternative_cn: Option<String>,
422 pub alpn: ALPN,
423 pub ca: Option<Arc<CaType>>,
424 pub tcp_keepalive: Option<TcpKeepalive>,
425 pub tcp_recv_buf: Option<usize>,
426 pub dscp: Option<u8>,
427 pub h2_ping_interval: Option<Duration>,
428 #[cfg(feature = "s2n")]
429 pub psk: Option<Arc<PskType>>,
430 #[cfg(feature = "s2n")]
431 pub s2n_security_policy: Option<S2NPolicy>,
432 #[cfg(feature = "s2n")]
433 pub max_blinding_delay: Option<u32>,
434 pub max_h2_streams: usize,
436 pub extra_proxy_headers: BTreeMap<String, Vec<u8>>,
437 pub curves: Option<&'static str>,
440 pub second_keyshare: bool,
442 pub tcp_fast_open: bool,
444 pub tracer: Option<Tracer>,
446 pub custom_l4: Option<Arc<dyn L4Connect + Send + Sync>>,
448 #[derivative(Debug = "ignore")]
449 pub upstream_tcp_sock_tweak_hook:
450 Option<Arc<dyn Fn(&TcpSocket) -> Result<()> + Send + Sync + 'static>>,
451 #[derivative(Debug = "ignore")]
452 pub proxy_digest_user_data_hook: Option<ProxyDigestUserDataHook>,
453 #[cfg(feature = "openssl_derived")]
458 #[derivative(Debug = "ignore")]
459 pub upstream_tls_handshake_complete_hook: Option<HandshakeCompleteHook>,
460}
461
462impl PeerOptions {
463 pub fn new() -> Self {
465 PeerOptions {
466 bind_to: None,
467 connection_timeout: None,
468 total_connection_timeout: None,
469 read_timeout: None,
470 idle_timeout: None,
471 write_timeout: None,
472 verify_cert: true,
473 verify_hostname: true,
474 #[cfg(feature = "s2n")]
475 use_system_certs: true,
476 alternative_cn: None,
477 alpn: ALPN::H1,
478 ca: None,
479 tcp_keepalive: None,
480 tcp_recv_buf: None,
481 dscp: None,
482 h2_ping_interval: None,
483 #[cfg(feature = "s2n")]
484 psk: None,
485 #[cfg(feature = "s2n")]
486 s2n_security_policy: None,
487 #[cfg(feature = "s2n")]
488 max_blinding_delay: None,
489 max_h2_streams: 1,
490 extra_proxy_headers: BTreeMap::new(),
491 curves: None,
492 second_keyshare: true, tcp_fast_open: false,
494 tracer: None,
495 custom_l4: None,
496 upstream_tcp_sock_tweak_hook: None,
497 proxy_digest_user_data_hook: None,
498 #[cfg(feature = "openssl_derived")]
499 upstream_tls_handshake_complete_hook: None,
500 }
501 }
502
503 pub fn set_http_version(&mut self, max: u8, min: u8) {
505 self.alpn = ALPN::new(max, min);
506 }
507}
508
509impl Display for PeerOptions {
510 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
511 if let Some(b) = self.bind_to.as_ref() {
512 write!(f, "bind_to: {:?},", b)?;
513 }
514 if let Some(t) = self.connection_timeout {
515 write!(f, "conn_timeout: {:?},", t)?;
516 }
517 if let Some(t) = self.total_connection_timeout {
518 write!(f, "total_conn_timeout: {:?},", t)?;
519 }
520 if self.verify_cert {
521 write!(f, "verify_cert: true,")?;
522 }
523 if self.verify_hostname {
524 write!(f, "verify_hostname: true,")?;
525 }
526 #[cfg(feature = "s2n")]
527 if self.use_system_certs {
528 write!(f, "use_system_certs: true,")?;
529 }
530 if let Some(cn) = &self.alternative_cn {
531 write!(f, "alt_cn: {},", cn)?;
532 }
533 write!(f, "alpn: {},", self.alpn)?;
534 if let Some(cas) = &self.ca {
535 for ca in cas.iter() {
536 write!(
537 f,
538 "CA: {}, expire: {},",
539 get_organization_unit(ca).unwrap_or_default(),
540 ca.not_after()
541 )?;
542 }
543 }
544 #[cfg(feature = "s2n")]
545 if let Some(policy) = &self.s2n_security_policy {
546 write!(f, "s2n_security_policy: {:?}, ", policy)?;
547 }
548 #[cfg(feature = "s2n")]
549 if let Some(psk_config) = &self.psk {
550 for psk in &psk_config.keys {
551 write!(
552 f,
553 "psk_identity: {}",
554 String::from_utf8_lossy(psk.identity.as_slice())
555 )?;
556 }
557 }
558 if let Some(tcp_keepalive) = &self.tcp_keepalive {
559 write!(f, "tcp_keepalive: {},", tcp_keepalive)?;
560 }
561 if let Some(h2_ping_interval) = self.h2_ping_interval {
562 write!(f, "h2_ping_interval: {:?},", h2_ping_interval)?;
563 }
564 Ok(())
565 }
566}
567
568#[derive(Debug, Clone)]
570pub struct HttpPeer {
571 pub _address: SocketAddr,
572 pub scheme: Scheme,
573 pub sni: String,
574 pub proxy: Option<Proxy>,
575 pub client_cert_key: Option<Arc<CertKey>>,
576 pub group_key: u64,
579 pub options: PeerOptions,
580}
581
582impl HttpPeer {
583 pub fn is_tls(&self) -> bool {
585 match self.scheme {
586 Scheme::HTTP => false,
587 Scheme::HTTPS => true,
588 }
589 }
590
591 fn new_from_sockaddr(address: SocketAddr, tls: bool, sni: String) -> Self {
592 HttpPeer {
593 _address: address,
594 scheme: Scheme::from_tls_bool(tls),
595 sni,
596 proxy: None,
597 client_cert_key: None,
598 group_key: 0,
599 options: PeerOptions::new(),
600 }
601 }
602
603 pub fn new<A: ToInetSocketAddrs>(address: A, tls: bool, sni: String) -> Self {
605 let mut addrs_iter = address.to_socket_addrs().unwrap(); let addr = addrs_iter.next().unwrap();
607 Self::new_from_sockaddr(SocketAddr::Inet(addr), tls, sni)
608 }
609
610 #[cfg(unix)]
612 pub fn new_uds(path: &str, tls: bool, sni: String) -> Result<Self> {
613 let addr = SocketAddr::Unix(
614 UnixSocketAddr::from_pathname(Path::new(path)).or_err(SocketError, "invalid path")?,
615 );
616 Ok(Self::new_from_sockaddr(addr, tls, sni))
617 }
618
619 pub fn new_proxy(
622 next_hop: &str,
623 ip_addr: IpAddr,
624 port: u16,
625 tls: bool,
626 sni: &str,
627 headers: BTreeMap<String, Vec<u8>>,
628 ) -> Self {
629 HttpPeer {
630 _address: SocketAddr::Inet(InetSocketAddr::new(ip_addr, port)),
631 scheme: Scheme::from_tls_bool(tls),
632 sni: sni.to_string(),
633 proxy: Some(Proxy {
634 next_hop: PathBuf::from(next_hop).into(),
635 host: ip_addr.to_string(),
636 port,
637 headers,
638 }),
639 client_cert_key: None,
640 group_key: 0,
641 options: PeerOptions::new(),
642 }
643 }
644
645 pub fn new_mtls<A: ToInetSocketAddrs>(
647 address: A,
648 sni: String,
649 client_cert_key: Arc<CertKey>,
650 ) -> Self {
651 let mut peer = Self::new(address, true, sni);
652 peer.client_cert_key = Some(client_cert_key);
653 peer
654 }
655
656 fn peer_hash(&self) -> u64 {
657 let mut hasher = AHasher::default();
658 self.hash(&mut hasher);
659 hasher.finish()
660 }
661}
662
663impl Hash for HttpPeer {
664 fn hash<H: Hasher>(&self, state: &mut H) {
665 self._address.hash(state);
666 self.scheme.hash(state);
667 self.proxy.hash(state);
668 self.sni.hash(state);
669 self.client_cert_key.hash(state);
671 self.verify_cert().hash(state);
673 self.verify_hostname().hash(state);
674 self.alternative_cn().hash(state);
675 #[cfg(feature = "s2n")]
676 self.get_psk().hash(state);
677 self.group_key.hash(state);
678 self.options.max_h2_streams.hash(state);
680 }
681}
682
683impl Display for HttpPeer {
684 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
685 write!(f, "addr: {}, scheme: {}", self._address, self.scheme)?;
686 if !self.sni.is_empty() {
687 write!(f, ", sni: {}", self.sni)?;
688 }
689 if let Some(p) = self.proxy.as_ref() {
690 write!(f, ", proxy: {p}")?;
691 }
692 if let Some(cert) = &self.client_cert_key {
693 write!(f, ", client cert: {}", cert)?;
694 }
695 Ok(())
696 }
697}
698
699impl Peer for HttpPeer {
700 fn address(&self) -> &SocketAddr {
701 &self._address
702 }
703
704 fn tls(&self) -> bool {
705 self.is_tls()
706 }
707
708 fn sni(&self) -> &str {
709 &self.sni
710 }
711
712 fn reuse_hash(&self) -> u64 {
714 self.peer_hash()
715 }
716
717 fn get_peer_options(&self) -> Option<&PeerOptions> {
718 Some(&self.options)
719 }
720
721 fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
722 Some(&mut self.options)
723 }
724
725 fn get_proxy(&self) -> Option<&Proxy> {
726 self.proxy.as_ref()
727 }
728
729 #[cfg(unix)]
730 fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
731 if let Some(proxy) = self.get_proxy() {
732 proxy.next_hop.check_fd_match(fd)
733 } else {
734 self.address().check_fd_match(fd)
735 }
736 }
737
738 #[cfg(windows)]
739 fn matches_sock<V: AsRawSocket>(&self, sock: V) -> bool {
740 use crate::protocols::ConnSockReusable;
741
742 if let Some(proxy) = self.get_proxy() {
743 panic!("windows do not support peers with proxy")
744 } else {
745 self.address().check_sock_match(sock)
746 }
747 }
748
749 fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
750 self.client_cert_key.as_ref()
751 }
752
753 fn get_tracer(&self) -> Option<Tracer> {
754 self.options.tracer.clone()
755 }
756}
757
758#[derive(Debug, Hash, Clone)]
760pub struct Proxy {
761 pub next_hop: Box<Path>, pub host: String, pub port: u16, pub headers: BTreeMap<String, Vec<u8>>, }
766
767impl Display for Proxy {
768 fn fmt(&self, f: &mut Formatter) -> FmtResult {
769 write!(
770 f,
771 "next_hop: {}, host: {}, port: {}",
772 self.next_hop.display(),
773 self.host,
774 self.port
775 )
776 }
777}