1use crate::connectors::{l4::BindTo, L4Connect};
18use crate::protocols::l4::socket::SocketAddr;
19use crate::protocols::tls::CaType;
20#[cfg(feature = "openssl_derived")]
21use crate::protocols::tls::HandshakeCompleteHook;
22#[cfg(feature = "s2n")]
23use crate::protocols::tls::PskType;
24#[cfg(unix)]
25use crate::protocols::ConnFdReusable;
26use crate::protocols::TcpKeepalive;
27use crate::utils::tls::{get_organization_unit, CertKey};
28use ahash::AHasher;
29use derivative::Derivative;
30use pingora_error::{
31 ErrorType::{InternalError, SocketError},
32 OrErr, Result,
33};
34#[cfg(feature = "s2n")]
35use pingora_s2n::S2NPolicy;
36use std::collections::BTreeMap;
37use std::fmt::{Display, Formatter, Result as FmtResult};
38use std::hash::{Hash, Hasher};
39use std::net::{IpAddr, SocketAddr as InetSocketAddr, ToSocketAddrs as ToInetSocketAddrs};
40#[cfg(unix)]
41use std::os::unix::{net::SocketAddr as UnixSocketAddr, prelude::AsRawFd};
42#[cfg(windows)]
43use std::os::windows::io::AsRawSocket;
44use std::path::{Path, PathBuf};
45use std::sync::Arc;
46use std::time::Duration;
47use tokio::net::TcpSocket;
48
49pub use crate::protocols::tls::ALPN;
50
51pub type ProxyDigestUserDataHook = Arc<
59 dyn Fn(
60 &http::request::Parts, &pingora_http::ResponseHeader, ) -> Option<Box<dyn std::any::Any + Send + Sync>>
63 + Send
64 + Sync
65 + 'static,
66>;
67
68pub trait Tracing: Send + Sync + std::fmt::Debug {
70 fn on_connected(&self);
72 fn on_disconnected(&self);
74 fn boxed_clone(&self) -> Box<dyn Tracing>;
76}
77
78#[derive(Debug)]
80pub struct Tracer(pub Box<dyn Tracing>);
81
82impl Clone for Tracer {
83 fn clone(&self) -> Self {
84 Tracer(self.0.boxed_clone())
85 }
86}
87
88pub trait Peer: Display + Clone {
91 fn address(&self) -> &SocketAddr;
93 fn tls(&self) -> bool;
95 fn sni(&self) -> &str;
97 fn reuse_hash(&self) -> u64;
102 fn get_proxy(&self) -> Option<&Proxy> {
104 None
105 }
106 fn get_peer_options(&self) -> Option<&PeerOptions> {
110 None
111 }
112 fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
114 None
115 }
116 fn verify_cert(&self) -> bool {
118 match self.get_peer_options() {
119 Some(opt) => opt.verify_cert,
120 None => false,
121 }
122 }
123 fn verify_hostname(&self) -> bool {
125 match self.get_peer_options() {
126 Some(opt) => opt.verify_hostname,
127 None => false,
128 }
129 }
130 #[cfg(feature = "s2n")]
132 fn use_system_certs(&self) -> bool {
133 match self.get_peer_options() {
134 Some(opt) => opt.use_system_certs,
135 None => false,
136 }
137 }
138 fn alternative_cn(&self) -> Option<&String> {
143 match self.get_peer_options() {
144 Some(opt) => opt.alternative_cn.as_ref(),
145 None => None,
146 }
147 }
148 fn bind_to(&self) -> Option<&BindTo> {
150 match self.get_peer_options() {
151 Some(opt) => opt.bind_to.as_ref(),
152 None => None,
153 }
154 }
155 fn connection_timeout(&self) -> Option<Duration> {
157 match self.get_peer_options() {
158 Some(opt) => opt.connection_timeout,
159 None => None,
160 }
161 }
162 fn total_connection_timeout(&self) -> Option<Duration> {
164 match self.get_peer_options() {
165 Some(opt) => opt.total_connection_timeout,
166 None => None,
167 }
168 }
169 fn idle_timeout(&self) -> Option<Duration> {
172 self.get_peer_options().and_then(|o| o.idle_timeout)
173 }
174
175 fn get_alpn(&self) -> Option<&ALPN> {
177 self.get_peer_options().map(|opt| &opt.alpn)
178 }
179
180 fn get_ca(&self) -> Option<&Arc<CaType>> {
184 match self.get_peer_options() {
185 Some(opt) => opt.ca.as_ref(),
186 None => None,
187 }
188 }
189
190 fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
192 None
193 }
194
195 #[cfg(feature = "s2n")]
199 fn get_psk(&self) -> Option<&Arc<PskType>> {
200 match self.get_peer_options() {
201 Some(opt) => opt.psk.as_ref(),
202 None => None,
203 }
204 }
205
206 #[cfg(feature = "s2n")]
211 fn get_s2n_security_policy(&self) -> Option<&S2NPolicy> {
212 match self.get_peer_options() {
213 Some(opt) => opt.s2n_security_policy.as_ref(),
214 None => None,
215 }
216 }
217
218 #[cfg(feature = "s2n")]
222 fn get_max_blinding_delay(&self) -> Option<u32> {
223 match self.get_peer_options() {
224 Some(opt) => opt.max_blinding_delay,
225 None => None,
226 }
227 }
228
229 fn tcp_keepalive(&self) -> Option<&TcpKeepalive> {
231 self.get_peer_options()
232 .and_then(|o| o.tcp_keepalive.as_ref())
233 }
234
235 fn h2_ping_interval(&self) -> Option<Duration> {
237 self.get_peer_options().and_then(|o| o.h2_ping_interval)
238 }
239
240 fn tcp_recv_buf(&self) -> Option<usize> {
242 self.get_peer_options().and_then(|o| o.tcp_recv_buf)
243 }
244
245 fn dscp(&self) -> Option<u8> {
248 self.get_peer_options().and_then(|o| o.dscp)
249 }
250
251 fn tcp_fast_open(&self) -> bool {
253 self.get_peer_options()
254 .map(|o| o.tcp_fast_open)
255 .unwrap_or_default()
256 }
257
258 #[cfg(unix)]
259 fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
260 self.address().check_fd_match(fd)
261 }
262
263 #[cfg(windows)]
264 fn matches_sock<V: AsRawSocket>(&self, sock: V) -> bool {
265 use crate::protocols::ConnSockReusable;
266 self.address().check_sock_match(sock)
267 }
268
269 fn get_tracer(&self) -> Option<Tracer> {
270 None
271 }
272
273 fn upstream_tcp_sock_tweak_hook(
277 &self,
278 ) -> Option<&Arc<dyn Fn(&TcpSocket) -> Result<()> + Send + Sync + 'static>> {
279 self.get_peer_options()?
280 .upstream_tcp_sock_tweak_hook
281 .as_ref()
282 }
283
284 fn proxy_digest_user_data_hook(&self) -> Option<&ProxyDigestUserDataHook> {
287 self.get_peer_options()?
288 .proxy_digest_user_data_hook
289 .as_ref()
290 }
291
292 #[cfg(feature = "openssl_derived")]
301 fn upstream_tls_handshake_complete_hook(&self) -> Option<&HandshakeCompleteHook> {
302 self.get_peer_options()?
303 .upstream_tls_handshake_complete_hook
304 .as_ref()
305 }
306}
307
308#[derive(Debug, Clone)]
310pub struct BasicPeer {
311 pub _address: SocketAddr,
312 pub sni: String,
313 pub options: PeerOptions,
314}
315
316impl BasicPeer {
317 pub fn new(address: &str) -> Self {
319 let addr = SocketAddr::Inet(address.parse().unwrap()); Self::new_from_sockaddr(addr)
321 }
322
323 #[cfg(unix)]
325 pub fn new_uds<P: AsRef<Path>>(path: P) -> Result<Self> {
326 let addr = SocketAddr::Unix(
327 UnixSocketAddr::from_pathname(path.as_ref())
328 .or_err(InternalError, "while creating BasicPeer")?,
329 );
330 Ok(Self::new_from_sockaddr(addr))
331 }
332
333 fn new_from_sockaddr(sockaddr: SocketAddr) -> Self {
334 BasicPeer {
335 _address: sockaddr,
336 sni: "".to_string(), options: PeerOptions::new(),
338 }
339 }
340}
341
342impl Display for BasicPeer {
343 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
344 write!(f, "{:?}", self)
345 }
346}
347
348impl Peer for BasicPeer {
349 fn address(&self) -> &SocketAddr {
350 &self._address
351 }
352
353 fn tls(&self) -> bool {
354 !self.sni.is_empty()
355 }
356
357 fn bind_to(&self) -> Option<&BindTo> {
358 None
359 }
360
361 fn sni(&self) -> &str {
362 &self.sni
363 }
364
365 fn reuse_hash(&self) -> u64 {
367 let mut hasher = AHasher::default();
368 self._address.hash(&mut hasher);
369 hasher.finish()
370 }
371
372 fn get_peer_options(&self) -> Option<&PeerOptions> {
373 Some(&self.options)
374 }
375}
376
377#[derive(Hash, Clone, Debug, PartialEq)]
379pub enum Scheme {
380 HTTP,
381 HTTPS,
382}
383
384impl Display for Scheme {
385 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
386 match self {
387 Scheme::HTTP => write!(f, "HTTP"),
388 Scheme::HTTPS => write!(f, "HTTPS"),
389 }
390 }
391}
392
393impl Scheme {
394 pub fn from_tls_bool(tls: bool) -> Self {
395 if tls {
396 Self::HTTPS
397 } else {
398 Self::HTTP
399 }
400 }
401}
402
403#[non_exhaustive]
407#[derive(Clone, Derivative)]
408#[derivative(Debug)]
409pub struct PeerOptions {
410 pub bind_to: Option<BindTo>,
411 pub connection_timeout: Option<Duration>,
412 pub total_connection_timeout: Option<Duration>,
413 pub read_timeout: Option<Duration>,
414 pub idle_timeout: Option<Duration>,
415 pub write_timeout: Option<Duration>,
416 pub verify_cert: bool,
417 pub verify_hostname: bool,
418 #[cfg(feature = "s2n")]
419 pub use_system_certs: bool,
420 pub alternative_cn: Option<String>,
422 pub alpn: ALPN,
423 pub ca: Option<Arc<CaType>>,
424 pub tcp_keepalive: Option<TcpKeepalive>,
425 pub tcp_recv_buf: Option<usize>,
426 pub dscp: Option<u8>,
427 pub h2_ping_interval: Option<Duration>,
428 #[cfg(feature = "s2n")]
429 pub psk: Option<Arc<PskType>>,
430 #[cfg(feature = "s2n")]
431 pub s2n_security_policy: Option<S2NPolicy>,
432 #[cfg(feature = "s2n")]
433 pub max_blinding_delay: Option<u32>,
434 pub max_h2_streams: usize,
436 pub allow_h1_response_invalid_content_length: bool,
443 pub extra_proxy_headers: BTreeMap<String, Vec<u8>>,
444 pub curves: Option<&'static str>,
447 pub second_keyshare: bool,
449 pub tcp_fast_open: bool,
451 pub tracer: Option<Tracer>,
453 pub custom_l4: Option<Arc<dyn L4Connect + Send + Sync>>,
455 #[derivative(Debug = "ignore")]
456 pub upstream_tcp_sock_tweak_hook:
457 Option<Arc<dyn Fn(&TcpSocket) -> Result<()> + Send + Sync + 'static>>,
458 #[derivative(Debug = "ignore")]
459 pub proxy_digest_user_data_hook: Option<ProxyDigestUserDataHook>,
460 #[cfg(feature = "openssl_derived")]
465 #[derivative(Debug = "ignore")]
466 pub upstream_tls_handshake_complete_hook: Option<HandshakeCompleteHook>,
467}
468
469impl PeerOptions {
470 pub fn new() -> Self {
472 PeerOptions {
473 bind_to: None,
474 connection_timeout: None,
475 total_connection_timeout: None,
476 read_timeout: None,
477 idle_timeout: None,
478 write_timeout: None,
479 verify_cert: true,
480 verify_hostname: true,
481 #[cfg(feature = "s2n")]
482 use_system_certs: true,
483 alternative_cn: None,
484 alpn: ALPN::H1,
485 ca: None,
486 tcp_keepalive: None,
487 tcp_recv_buf: None,
488 dscp: None,
489 h2_ping_interval: None,
490 #[cfg(feature = "s2n")]
491 psk: None,
492 #[cfg(feature = "s2n")]
493 s2n_security_policy: None,
494 #[cfg(feature = "s2n")]
495 max_blinding_delay: None,
496 max_h2_streams: 1,
497 allow_h1_response_invalid_content_length: false,
498 extra_proxy_headers: BTreeMap::new(),
499 curves: None,
500 second_keyshare: true, tcp_fast_open: false,
502 tracer: None,
503 custom_l4: None,
504 upstream_tcp_sock_tweak_hook: None,
505 proxy_digest_user_data_hook: None,
506 #[cfg(feature = "openssl_derived")]
507 upstream_tls_handshake_complete_hook: None,
508 }
509 }
510
511 pub fn set_http_version(&mut self, max: u8, min: u8) {
513 self.alpn = ALPN::new(max, min);
514 }
515}
516
517impl Display for PeerOptions {
518 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
519 if let Some(b) = self.bind_to.as_ref() {
520 write!(f, "bind_to: {:?},", b)?;
521 }
522 if let Some(t) = self.connection_timeout {
523 write!(f, "conn_timeout: {:?},", t)?;
524 }
525 if let Some(t) = self.total_connection_timeout {
526 write!(f, "total_conn_timeout: {:?},", t)?;
527 }
528 if self.verify_cert {
529 write!(f, "verify_cert: true,")?;
530 }
531 if self.verify_hostname {
532 write!(f, "verify_hostname: true,")?;
533 }
534 #[cfg(feature = "s2n")]
535 if self.use_system_certs {
536 write!(f, "use_system_certs: true,")?;
537 }
538 if let Some(cn) = &self.alternative_cn {
539 write!(f, "alt_cn: {},", cn)?;
540 }
541 write!(f, "alpn: {},", self.alpn)?;
542 if let Some(cas) = &self.ca {
543 for ca in cas.iter() {
544 write!(
545 f,
546 "CA: {}, expire: {},",
547 get_organization_unit(ca).unwrap_or_default(),
548 ca.not_after()
549 )?;
550 }
551 }
552 #[cfg(feature = "s2n")]
553 if let Some(policy) = &self.s2n_security_policy {
554 write!(f, "s2n_security_policy: {:?}, ", policy)?;
555 }
556 #[cfg(feature = "s2n")]
557 if let Some(psk_config) = &self.psk {
558 for psk in &psk_config.keys {
559 write!(
560 f,
561 "psk_identity: {}",
562 String::from_utf8_lossy(psk.identity.as_slice())
563 )?;
564 }
565 }
566 if let Some(tcp_keepalive) = &self.tcp_keepalive {
567 write!(f, "tcp_keepalive: {},", tcp_keepalive)?;
568 }
569 if let Some(h2_ping_interval) = self.h2_ping_interval {
570 write!(f, "h2_ping_interval: {:?},", h2_ping_interval)?;
571 }
572 Ok(())
573 }
574}
575
576#[derive(Debug, Clone)]
578pub struct HttpPeer {
579 pub _address: SocketAddr,
580 pub scheme: Scheme,
581 pub sni: String,
582 pub proxy: Option<Proxy>,
583 pub client_cert_key: Option<Arc<CertKey>>,
584 pub group_key: u64,
587 pub options: PeerOptions,
588}
589
590impl HttpPeer {
591 pub fn is_tls(&self) -> bool {
593 match self.scheme {
594 Scheme::HTTP => false,
595 Scheme::HTTPS => true,
596 }
597 }
598
599 fn new_from_sockaddr(address: SocketAddr, tls: bool, sni: String) -> Self {
600 HttpPeer {
601 _address: address,
602 scheme: Scheme::from_tls_bool(tls),
603 sni,
604 proxy: None,
605 client_cert_key: None,
606 group_key: 0,
607 options: PeerOptions::new(),
608 }
609 }
610
611 pub fn new<A: ToInetSocketAddrs>(address: A, tls: bool, sni: String) -> Self {
613 let mut addrs_iter = address.to_socket_addrs().unwrap(); let addr = addrs_iter.next().unwrap();
615 Self::new_from_sockaddr(SocketAddr::Inet(addr), tls, sni)
616 }
617
618 #[cfg(unix)]
620 pub fn new_uds(path: &str, tls: bool, sni: String) -> Result<Self> {
621 let addr = SocketAddr::Unix(
622 UnixSocketAddr::from_pathname(Path::new(path)).or_err(SocketError, "invalid path")?,
623 );
624 Ok(Self::new_from_sockaddr(addr, tls, sni))
625 }
626
627 pub fn new_proxy(
630 next_hop: &str,
631 ip_addr: IpAddr,
632 port: u16,
633 tls: bool,
634 sni: &str,
635 headers: BTreeMap<String, Vec<u8>>,
636 ) -> Self {
637 HttpPeer {
638 _address: SocketAddr::Inet(InetSocketAddr::new(ip_addr, port)),
639 scheme: Scheme::from_tls_bool(tls),
640 sni: sni.to_string(),
641 proxy: Some(Proxy {
642 next_hop: PathBuf::from(next_hop).into(),
643 host: ip_addr.to_string(),
644 port,
645 headers,
646 }),
647 client_cert_key: None,
648 group_key: 0,
649 options: PeerOptions::new(),
650 }
651 }
652
653 pub fn new_mtls<A: ToInetSocketAddrs>(
655 address: A,
656 sni: String,
657 client_cert_key: Arc<CertKey>,
658 ) -> Self {
659 let mut peer = Self::new(address, true, sni);
660 peer.client_cert_key = Some(client_cert_key);
661 peer
662 }
663
664 fn peer_hash(&self) -> u64 {
665 let mut hasher = AHasher::default();
666 self.hash(&mut hasher);
667 hasher.finish()
668 }
669}
670
671impl Hash for HttpPeer {
672 fn hash<H: Hasher>(&self, state: &mut H) {
673 self._address.hash(state);
674 self.scheme.hash(state);
675 self.proxy.hash(state);
676 self.sni.hash(state);
677 self.client_cert_key.hash(state);
679 self.verify_cert().hash(state);
681 self.verify_hostname().hash(state);
682 self.alternative_cn().hash(state);
683 #[cfg(feature = "s2n")]
684 self.get_psk().hash(state);
685 self.group_key.hash(state);
686 self.options.max_h2_streams.hash(state);
688 }
689}
690
691impl Display for HttpPeer {
692 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
693 write!(f, "addr: {}, scheme: {}", self._address, self.scheme)?;
694 if !self.sni.is_empty() {
695 write!(f, ", sni: {}", self.sni)?;
696 }
697 if let Some(p) = self.proxy.as_ref() {
698 write!(f, ", proxy: {p}")?;
699 }
700 if let Some(cert) = &self.client_cert_key {
701 write!(f, ", client cert: {}", cert)?;
702 }
703 Ok(())
704 }
705}
706
707impl Peer for HttpPeer {
708 fn address(&self) -> &SocketAddr {
709 &self._address
710 }
711
712 fn tls(&self) -> bool {
713 self.is_tls()
714 }
715
716 fn sni(&self) -> &str {
717 &self.sni
718 }
719
720 fn reuse_hash(&self) -> u64 {
722 self.peer_hash()
723 }
724
725 fn get_peer_options(&self) -> Option<&PeerOptions> {
726 Some(&self.options)
727 }
728
729 fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
730 Some(&mut self.options)
731 }
732
733 fn get_proxy(&self) -> Option<&Proxy> {
734 self.proxy.as_ref()
735 }
736
737 #[cfg(unix)]
738 fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
739 if let Some(proxy) = self.get_proxy() {
740 proxy.next_hop.check_fd_match(fd)
741 } else {
742 self.address().check_fd_match(fd)
743 }
744 }
745
746 #[cfg(windows)]
747 fn matches_sock<V: AsRawSocket>(&self, sock: V) -> bool {
748 use crate::protocols::ConnSockReusable;
749
750 if let Some(proxy) = self.get_proxy() {
751 panic!("windows do not support peers with proxy")
752 } else {
753 self.address().check_sock_match(sock)
754 }
755 }
756
757 fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
758 self.client_cert_key.as_ref()
759 }
760
761 fn get_tracer(&self) -> Option<Tracer> {
762 self.options.tracer.clone()
763 }
764}
765
766#[derive(Debug, Hash, Clone)]
768pub struct Proxy {
769 pub next_hop: Box<Path>, pub host: String, pub port: u16, pub headers: BTreeMap<String, Vec<u8>>, }
774
775impl Display for Proxy {
776 fn fmt(&self, f: &mut Formatter) -> FmtResult {
777 write!(
778 f,
779 "next_hop: {}, host: {}, port: {}",
780 self.next_hop.display(),
781 self.host,
782 self.port
783 )
784 }
785}