1use crate::connectors::{l4::BindTo, L4Connect};
18use crate::protocols::l4::socket::SocketAddr;
19use crate::protocols::tls::CaType;
20#[cfg(unix)]
21use crate::protocols::ConnFdReusable;
22use crate::protocols::TcpKeepalive;
23use crate::utils::tls::{get_organization_unit, CertKey};
24use ahash::AHasher;
25use derivative::Derivative;
26use pingora_error::{
27 ErrorType::{InternalError, SocketError},
28 OrErr, Result,
29};
30use std::collections::BTreeMap;
31use std::fmt::{Display, Formatter, Result as FmtResult};
32use std::hash::{Hash, Hasher};
33use std::net::{IpAddr, SocketAddr as InetSocketAddr, ToSocketAddrs as ToInetSocketAddrs};
34#[cfg(unix)]
35use std::os::unix::{net::SocketAddr as UnixSocketAddr, prelude::AsRawFd};
36#[cfg(windows)]
37use std::os::windows::io::AsRawSocket;
38use std::path::{Path, PathBuf};
39use std::sync::Arc;
40use std::time::Duration;
41use tokio::net::TcpSocket;
42
43pub use crate::protocols::tls::ALPN;
44
45pub trait Tracing: Send + Sync + std::fmt::Debug {
47 fn on_connected(&self);
49 fn on_disconnected(&self);
51 fn boxed_clone(&self) -> Box<dyn Tracing>;
53}
54
55#[derive(Debug)]
57pub struct Tracer(pub Box<dyn Tracing>);
58
59impl Clone for Tracer {
60 fn clone(&self) -> Self {
61 Tracer(self.0.boxed_clone())
62 }
63}
64
65pub trait Peer: Display + Clone {
68 fn address(&self) -> &SocketAddr;
70 fn tls(&self) -> bool;
72 fn sni(&self) -> &str;
74 fn reuse_hash(&self) -> u64;
79 fn get_proxy(&self) -> Option<&Proxy> {
81 None
82 }
83 fn get_peer_options(&self) -> Option<&PeerOptions> {
87 None
88 }
89 fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
91 None
92 }
93 fn verify_cert(&self) -> bool {
95 match self.get_peer_options() {
96 Some(opt) => opt.verify_cert,
97 None => false,
98 }
99 }
100 fn verify_hostname(&self) -> bool {
102 match self.get_peer_options() {
103 Some(opt) => opt.verify_hostname,
104 None => false,
105 }
106 }
107 fn alternative_cn(&self) -> Option<&String> {
112 match self.get_peer_options() {
113 Some(opt) => opt.alternative_cn.as_ref(),
114 None => None,
115 }
116 }
117 fn bind_to(&self) -> Option<&BindTo> {
119 match self.get_peer_options() {
120 Some(opt) => opt.bind_to.as_ref(),
121 None => None,
122 }
123 }
124 fn connection_timeout(&self) -> Option<Duration> {
126 match self.get_peer_options() {
127 Some(opt) => opt.connection_timeout,
128 None => None,
129 }
130 }
131 fn total_connection_timeout(&self) -> Option<Duration> {
133 match self.get_peer_options() {
134 Some(opt) => opt.total_connection_timeout,
135 None => None,
136 }
137 }
138 fn idle_timeout(&self) -> Option<Duration> {
141 self.get_peer_options().and_then(|o| o.idle_timeout)
142 }
143
144 fn get_alpn(&self) -> Option<&ALPN> {
146 self.get_peer_options().map(|opt| &opt.alpn)
147 }
148
149 fn get_ca(&self) -> Option<&Arc<CaType>> {
153 match self.get_peer_options() {
154 Some(opt) => opt.ca.as_ref(),
155 None => None,
156 }
157 }
158
159 fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
161 None
162 }
163
164 fn tcp_keepalive(&self) -> Option<&TcpKeepalive> {
166 self.get_peer_options()
167 .and_then(|o| o.tcp_keepalive.as_ref())
168 }
169
170 fn h2_ping_interval(&self) -> Option<Duration> {
172 self.get_peer_options().and_then(|o| o.h2_ping_interval)
173 }
174
175 fn tcp_recv_buf(&self) -> Option<usize> {
177 self.get_peer_options().and_then(|o| o.tcp_recv_buf)
178 }
179
180 fn dscp(&self) -> Option<u8> {
183 self.get_peer_options().and_then(|o| o.dscp)
184 }
185
186 fn tcp_fast_open(&self) -> bool {
188 self.get_peer_options()
189 .map(|o| o.tcp_fast_open)
190 .unwrap_or_default()
191 }
192
193 #[cfg(unix)]
194 fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
195 self.address().check_fd_match(fd)
196 }
197
198 #[cfg(windows)]
199 fn matches_sock<V: AsRawSocket>(&self, sock: V) -> bool {
200 use crate::protocols::ConnSockReusable;
201 self.address().check_sock_match(sock)
202 }
203
204 fn get_tracer(&self) -> Option<Tracer> {
205 None
206 }
207
208 fn upstream_tcp_sock_tweak_hook(
212 &self,
213 ) -> Option<&Arc<dyn Fn(&TcpSocket) -> Result<()> + Send + Sync + 'static>> {
214 self.get_peer_options()?
215 .upstream_tcp_sock_tweak_hook
216 .as_ref()
217 }
218}
219
220#[derive(Debug, Clone)]
222pub struct BasicPeer {
223 pub _address: SocketAddr,
224 pub sni: String,
225 pub options: PeerOptions,
226}
227
228impl BasicPeer {
229 pub fn new(address: &str) -> Self {
231 let addr = SocketAddr::Inet(address.parse().unwrap()); Self::new_from_sockaddr(addr)
233 }
234
235 #[cfg(unix)]
237 pub fn new_uds<P: AsRef<Path>>(path: P) -> Result<Self> {
238 let addr = SocketAddr::Unix(
239 UnixSocketAddr::from_pathname(path.as_ref())
240 .or_err(InternalError, "while creating BasicPeer")?,
241 );
242 Ok(Self::new_from_sockaddr(addr))
243 }
244
245 fn new_from_sockaddr(sockaddr: SocketAddr) -> Self {
246 BasicPeer {
247 _address: sockaddr,
248 sni: "".to_string(), options: PeerOptions::new(),
250 }
251 }
252}
253
254impl Display for BasicPeer {
255 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
256 write!(f, "{:?}", self)
257 }
258}
259
260impl Peer for BasicPeer {
261 fn address(&self) -> &SocketAddr {
262 &self._address
263 }
264
265 fn tls(&self) -> bool {
266 !self.sni.is_empty()
267 }
268
269 fn bind_to(&self) -> Option<&BindTo> {
270 None
271 }
272
273 fn sni(&self) -> &str {
274 &self.sni
275 }
276
277 fn reuse_hash(&self) -> u64 {
279 let mut hasher = AHasher::default();
280 self._address.hash(&mut hasher);
281 hasher.finish()
282 }
283
284 fn get_peer_options(&self) -> Option<&PeerOptions> {
285 Some(&self.options)
286 }
287}
288
289#[derive(Hash, Clone, Debug, PartialEq)]
291pub enum Scheme {
292 HTTP,
293 HTTPS,
294}
295
296impl Display for Scheme {
297 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
298 match self {
299 Scheme::HTTP => write!(f, "HTTP"),
300 Scheme::HTTPS => write!(f, "HTTPS"),
301 }
302 }
303}
304
305impl Scheme {
306 pub fn from_tls_bool(tls: bool) -> Self {
307 if tls {
308 Self::HTTPS
309 } else {
310 Self::HTTP
311 }
312 }
313}
314
315#[non_exhaustive]
319#[derive(Clone, Derivative)]
320#[derivative(Debug)]
321pub struct PeerOptions {
322 pub bind_to: Option<BindTo>,
323 pub connection_timeout: Option<Duration>,
324 pub total_connection_timeout: Option<Duration>,
325 pub read_timeout: Option<Duration>,
326 pub idle_timeout: Option<Duration>,
327 pub write_timeout: Option<Duration>,
328 pub verify_cert: bool,
329 pub verify_hostname: bool,
330 pub alternative_cn: Option<String>,
332 pub alpn: ALPN,
333 pub ca: Option<Arc<CaType>>,
334 pub tcp_keepalive: Option<TcpKeepalive>,
335 pub tcp_recv_buf: Option<usize>,
336 pub dscp: Option<u8>,
337 pub h2_ping_interval: Option<Duration>,
338 pub max_h2_streams: usize,
340 pub extra_proxy_headers: BTreeMap<String, Vec<u8>>,
341 pub curves: Option<&'static str>,
344 pub second_keyshare: bool,
346 pub tcp_fast_open: bool,
348 pub tracer: Option<Tracer>,
350 pub custom_l4: Option<Arc<dyn L4Connect + Send + Sync>>,
352 #[derivative(Debug = "ignore")]
353 pub upstream_tcp_sock_tweak_hook:
354 Option<Arc<dyn Fn(&TcpSocket) -> Result<()> + Send + Sync + 'static>>,
355}
356
357impl PeerOptions {
358 pub fn new() -> Self {
360 PeerOptions {
361 bind_to: None,
362 connection_timeout: None,
363 total_connection_timeout: None,
364 read_timeout: None,
365 idle_timeout: None,
366 write_timeout: None,
367 verify_cert: true,
368 verify_hostname: true,
369 alternative_cn: None,
370 alpn: ALPN::H1,
371 ca: None,
372 tcp_keepalive: None,
373 tcp_recv_buf: None,
374 dscp: None,
375 h2_ping_interval: None,
376 max_h2_streams: 1,
377 extra_proxy_headers: BTreeMap::new(),
378 curves: None,
379 second_keyshare: true, tcp_fast_open: false,
381 tracer: None,
382 custom_l4: None,
383 upstream_tcp_sock_tweak_hook: None,
384 }
385 }
386
387 pub fn set_http_version(&mut self, max: u8, min: u8) {
389 self.alpn = ALPN::new(max, min);
390 }
391}
392
393impl Display for PeerOptions {
394 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
395 if let Some(b) = self.bind_to.as_ref() {
396 write!(f, "bind_to: {:?},", b)?;
397 }
398 if let Some(t) = self.connection_timeout {
399 write!(f, "conn_timeout: {:?},", t)?;
400 }
401 if let Some(t) = self.total_connection_timeout {
402 write!(f, "total_conn_timeout: {:?},", t)?;
403 }
404 if self.verify_cert {
405 write!(f, "verify_cert: true,")?;
406 }
407 if self.verify_hostname {
408 write!(f, "verify_hostname: true,")?;
409 }
410 if let Some(cn) = &self.alternative_cn {
411 write!(f, "alt_cn: {},", cn)?;
412 }
413 write!(f, "alpn: {},", self.alpn)?;
414 if let Some(cas) = &self.ca {
415 for ca in cas.iter() {
416 write!(
417 f,
418 "CA: {}, expire: {},",
419 get_organization_unit(ca).unwrap_or_default(),
420 ca.not_after()
421 )?;
422 }
423 }
424 if let Some(tcp_keepalive) = &self.tcp_keepalive {
425 write!(f, "tcp_keepalive: {},", tcp_keepalive)?;
426 }
427 if let Some(h2_ping_interval) = self.h2_ping_interval {
428 write!(f, "h2_ping_interval: {:?},", h2_ping_interval)?;
429 }
430 Ok(())
431 }
432}
433
434#[derive(Debug, Clone)]
436pub struct HttpPeer {
437 pub _address: SocketAddr,
438 pub scheme: Scheme,
439 pub sni: String,
440 pub proxy: Option<Proxy>,
441 pub client_cert_key: Option<Arc<CertKey>>,
442 pub group_key: u64,
445 pub options: PeerOptions,
446}
447
448impl HttpPeer {
449 pub fn is_tls(&self) -> bool {
451 match self.scheme {
452 Scheme::HTTP => false,
453 Scheme::HTTPS => true,
454 }
455 }
456
457 fn new_from_sockaddr(address: SocketAddr, tls: bool, sni: String) -> Self {
458 HttpPeer {
459 _address: address,
460 scheme: Scheme::from_tls_bool(tls),
461 sni,
462 proxy: None,
463 client_cert_key: None,
464 group_key: 0,
465 options: PeerOptions::new(),
466 }
467 }
468
469 pub fn new<A: ToInetSocketAddrs>(address: A, tls: bool, sni: String) -> Self {
471 let mut addrs_iter = address.to_socket_addrs().unwrap(); let addr = addrs_iter.next().unwrap();
473 Self::new_from_sockaddr(SocketAddr::Inet(addr), tls, sni)
474 }
475
476 #[cfg(unix)]
478 pub fn new_uds(path: &str, tls: bool, sni: String) -> Result<Self> {
479 let addr = SocketAddr::Unix(
480 UnixSocketAddr::from_pathname(Path::new(path)).or_err(SocketError, "invalid path")?,
481 );
482 Ok(Self::new_from_sockaddr(addr, tls, sni))
483 }
484
485 pub fn new_proxy(
488 next_hop: &str,
489 ip_addr: IpAddr,
490 port: u16,
491 tls: bool,
492 sni: &str,
493 headers: BTreeMap<String, Vec<u8>>,
494 ) -> Self {
495 HttpPeer {
496 _address: SocketAddr::Inet(InetSocketAddr::new(ip_addr, port)),
497 scheme: Scheme::from_tls_bool(tls),
498 sni: sni.to_string(),
499 proxy: Some(Proxy {
500 next_hop: PathBuf::from(next_hop).into(),
501 host: ip_addr.to_string(),
502 port,
503 headers,
504 }),
505 client_cert_key: None,
506 group_key: 0,
507 options: PeerOptions::new(),
508 }
509 }
510
511 fn peer_hash(&self) -> u64 {
512 let mut hasher = AHasher::default();
513 self.hash(&mut hasher);
514 hasher.finish()
515 }
516}
517
518impl Hash for HttpPeer {
519 fn hash<H: Hasher>(&self, state: &mut H) {
520 self._address.hash(state);
521 self.scheme.hash(state);
522 self.proxy.hash(state);
523 self.sni.hash(state);
524 self.client_cert_key.hash(state);
526 self.verify_cert().hash(state);
528 self.verify_hostname().hash(state);
529 self.alternative_cn().hash(state);
530 self.group_key.hash(state);
531 }
532}
533
534impl Display for HttpPeer {
535 fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
536 write!(f, "addr: {}, scheme: {},", self._address, self.scheme)?;
537 if !self.sni.is_empty() {
538 write!(f, "sni: {},", self.sni)?;
539 }
540 if let Some(p) = self.proxy.as_ref() {
541 write!(f, "proxy: {p},")?;
542 }
543 if let Some(cert) = &self.client_cert_key {
544 write!(f, "client cert: {},", cert)?;
545 }
546 Ok(())
547 }
548}
549
550impl Peer for HttpPeer {
551 fn address(&self) -> &SocketAddr {
552 &self._address
553 }
554
555 fn tls(&self) -> bool {
556 self.is_tls()
557 }
558
559 fn sni(&self) -> &str {
560 &self.sni
561 }
562
563 fn reuse_hash(&self) -> u64 {
565 self.peer_hash()
566 }
567
568 fn get_peer_options(&self) -> Option<&PeerOptions> {
569 Some(&self.options)
570 }
571
572 fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> {
573 Some(&mut self.options)
574 }
575
576 fn get_proxy(&self) -> Option<&Proxy> {
577 self.proxy.as_ref()
578 }
579
580 #[cfg(unix)]
581 fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool {
582 if let Some(proxy) = self.get_proxy() {
583 proxy.next_hop.check_fd_match(fd)
584 } else {
585 self.address().check_fd_match(fd)
586 }
587 }
588
589 #[cfg(windows)]
590 fn matches_sock<V: AsRawSocket>(&self, sock: V) -> bool {
591 use crate::protocols::ConnSockReusable;
592
593 if let Some(proxy) = self.get_proxy() {
594 panic!("windows do not support peers with proxy")
595 } else {
596 self.address().check_sock_match(sock)
597 }
598 }
599
600 fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> {
601 self.client_cert_key.as_ref()
602 }
603
604 fn get_tracer(&self) -> Option<Tracer> {
605 self.options.tracer.clone()
606 }
607}
608
609#[derive(Debug, Hash, Clone)]
611pub struct Proxy {
612 pub next_hop: Box<Path>, pub host: String, pub port: u16, pub headers: BTreeMap<String, Vec<u8>>, }
617
618impl Display for Proxy {
619 fn fmt(&self, f: &mut Formatter) -> FmtResult {
620 write!(
621 f,
622 "next_hop: {}, host: {}, port: {}",
623 self.next_hop.display(),
624 self.host,
625 self.port
626 )
627 }
628}