1use std::{
4 net::{Ipv4Addr, SocketAddrV4},
5 num::NonZeroU16,
6 sync::Arc,
7 time::{Duration, Instant},
8};
9
10use current_mapping::CurrentMapping;
11use futures_lite::StreamExt;
12use n0_error::{e, stack_error};
13use netwatch::interfaces::HomeRouter;
14use tokio::sync::{mpsc, oneshot, watch};
15use tokio_util::task::AbortOnDropHandle;
16use tracing::{Instrument, debug, info_span, trace};
17
18mod current_mapping;
19mod mapping;
20mod metrics;
21mod nat_pmp;
22mod pcp;
23mod upnp;
24mod util;
25mod defaults {
26 use std::time::Duration;
27
28 pub(crate) const UPNP_SEARCH_TIMEOUT: Duration = Duration::from_secs(1);
30
31 pub(crate) const PCP_RECV_TIMEOUT: Duration = Duration::from_millis(500);
33
34 pub(crate) const NAT_PMP_RECV_TIMEOUT: Duration = Duration::from_millis(500);
36}
37
38pub use metrics::Metrics;
39
40const AVAILABILITY_TRUST_DURATION: Duration = Duration::from_secs(60 * 10); const SERVICE_CHANNEL_CAPACITY: usize = 32; const UNAVAILABILITY_TRUST_DURATION: Duration = Duration::from_secs(5);
50
51#[derive(Debug, Clone, PartialEq, Eq, derive_more::Display)]
53#[display("portmap={{ UPnP: {upnp}, PMP: {nat_pmp}, PCP: {pcp} }}")]
54pub struct ProbeOutput {
55 pub upnp: bool,
57 pub pcp: bool,
59 pub nat_pmp: bool,
61}
62
63impl ProbeOutput {
64 pub fn all_available(&self) -> bool {
66 self.upnp && self.pcp && self.nat_pmp
67 }
68}
69
70#[allow(missing_docs)]
71#[stack_error(derive, add_meta)]
72#[derive(Clone)]
73#[non_exhaustive]
74pub enum ProbeError {
75 #[error("Mapping channel is full")]
76 ChannelFull,
77 #[error("Mapping channel is closed")]
78 ChannelClosed,
79 #[error("No gateway found for probe")]
80 NoGateway,
81 #[error("gateway found is ipv6, ignoring")]
82 Ipv6Gateway,
83 #[error("Probe task stopped. is_panic: {is_panic}, is_cancelled: {is_cancelled}")]
84 Join { is_panic: bool, is_cancelled: bool },
85}
86
87#[derive(derive_more::Debug)]
88enum Message {
89 ProcureMapping,
91 UpdateLocalPort { local_port: Option<NonZeroU16> },
97 Probe {
102 #[debug("_")]
104 result_tx: oneshot::Sender<Result<ProbeOutput, ProbeError>>,
105 },
106}
107
108#[derive(Debug, Clone, Copy)]
110pub enum Protocol {
111 Udp,
113 Tcp,
115}
116
117#[derive(Debug, Clone)]
119pub struct Config {
120 pub enable_upnp: bool,
122 pub enable_pcp: bool,
124 pub enable_nat_pmp: bool,
126 pub protocol: Protocol,
128}
129
130impl Default for Config {
131 fn default() -> Self {
133 Config {
134 enable_upnp: true,
135 enable_pcp: true,
136 enable_nat_pmp: true,
137 protocol: Protocol::Udp,
138 }
139 }
140}
141
142#[derive(Debug, Clone)]
144pub struct Client {
145 port_mapping: watch::Receiver<Option<SocketAddrV4>>,
149 service_tx: mpsc::Sender<Message>,
151 metrics: Arc<Metrics>,
153 _service_handle: std::sync::Arc<AbortOnDropHandle<()>>,
155}
156
157impl Default for Client {
158 fn default() -> Self {
159 Self::new(Config::default())
160 }
161}
162
163impl Client {
164 pub fn new(config: Config) -> Self {
166 Self::with_metrics(config, Default::default())
167 }
168
169 pub fn with_metrics(config: Config, metrics: Arc<Metrics>) -> Self {
171 let (service_tx, service_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY);
172
173 let (service, watcher) = Service::new(config, service_rx, metrics.clone());
174
175 let handle = AbortOnDropHandle::new(tokio::spawn(
176 async move { service.run().await }.instrument(info_span!("portmapper.service")),
177 ));
178
179 Client {
180 port_mapping: watcher,
181 service_tx,
182 metrics,
183 _service_handle: std::sync::Arc::new(handle),
184 }
185 }
186
187 pub fn probe(&self) -> oneshot::Receiver<Result<ProbeOutput, ProbeError>> {
191 let (result_tx, result_rx) = oneshot::channel();
192
193 if let Err(e) = self.service_tx.try_send(Message::Probe { result_tx }) {
194 use mpsc::error::TrySendError::*;
195
196 let (result_tx, e) = match e {
198 Full(Message::Probe { result_tx }) => (result_tx, e!(ProbeError::ChannelFull)),
199 Closed(Message::Probe { result_tx }) => (result_tx, e!(ProbeError::ChannelClosed)),
200 Full(_) | Closed(_) => unreachable!("Sent value is a probe."),
201 };
202
203 if let Err(Err(e)) = result_tx.send(Err(e)) {
207 trace!("Failed to request probe: {e}")
208 }
209 }
210 result_rx
211 }
212
213 pub fn procure_mapping(&self) {
215 if let Err(e) = self.service_tx.try_send(Message::ProcureMapping) {
217 trace!("Failed to request mapping {e}")
218 }
219 }
220
221 pub fn update_local_port(&self, local_port: NonZeroU16) {
225 let local_port = Some(local_port);
226 if let Err(e) = self
228 .service_tx
229 .try_send(Message::UpdateLocalPort { local_port })
230 {
231 trace!("Failed to update local port {e}")
232 }
233 }
234
235 pub fn deactivate(&self) {
237 if let Err(e) = self
239 .service_tx
240 .try_send(Message::UpdateLocalPort { local_port: None })
241 {
242 trace!("Failed to deactivate port mapping {e}")
243 }
244 }
245
246 pub fn watch_external_address(&self) -> watch::Receiver<Option<SocketAddrV4>> {
248 self.port_mapping.clone()
249 }
250
251 pub fn metrics(&self) -> &Arc<Metrics> {
253 &self.metrics
254 }
255}
256
257#[derive(Debug)]
259struct Probe {
260 last_probe: Instant,
262 last_upnp_gateway_addr: Option<(upnp::Gateway, Instant)>,
264 last_pcp: Option<Instant>,
266 last_nat_pmp: Option<Instant>,
268}
269
270impl Probe {
271 fn empty() -> Self {
273 Self {
274 last_probe: Instant::now(),
275 last_upnp_gateway_addr: None,
276 last_pcp: None,
277 last_nat_pmp: None,
278 }
279 }
280 async fn from_output(
282 config: Config,
283 output: ProbeOutput,
284 local_ip: Ipv4Addr,
285 gateway: Ipv4Addr,
286 metrics: Arc<Metrics>,
287 ) -> Probe {
288 let ProbeOutput { upnp, pcp, nat_pmp } = output;
289 let Config {
290 enable_upnp,
291 enable_pcp,
292 enable_nat_pmp,
293 protocol: _,
294 } = config;
295 let mut upnp_probing_task = util::MaybeFuture {
296 inner: (enable_upnp && !upnp).then(|| {
297 let metrics = metrics.clone();
298 Box::pin(async move {
299 upnp::probe_available(&metrics)
300 .await
301 .map(|addr| (addr, Instant::now()))
302 })
303 }),
304 };
305
306 let mut pcp_probing_task = util::MaybeFuture {
307 inner: (enable_pcp && !pcp).then(|| {
308 let metrics = metrics.clone();
309 Box::pin(async move {
310 metrics.pcp_probes.inc();
311 pcp::probe_available(local_ip, gateway)
312 .await
313 .then(Instant::now)
314 })
315 }),
316 };
317
318 let mut nat_pmp_probing_task = util::MaybeFuture {
319 inner: (enable_nat_pmp && !nat_pmp).then(|| {
320 Box::pin(async {
321 nat_pmp::probe_available(local_ip, gateway)
322 .await
323 .then(Instant::now)
324 })
325 }),
326 };
327
328 if upnp_probing_task.inner.is_some() {
329 metrics.upnp_probes.inc();
330 }
331
332 let mut upnp_done = upnp_probing_task.inner.is_none();
333 let mut pcp_done = pcp_probing_task.inner.is_none();
334 let mut nat_pmp_done = nat_pmp_probing_task.inner.is_none();
335
336 let mut probe = Probe::empty();
337
338 while !upnp_done || !pcp_done || !nat_pmp_done {
339 tokio::select! {
340 last_upnp_gateway_addr = &mut upnp_probing_task, if !upnp_done => {
341 trace!("tick: upnp probe ready");
342 probe.last_upnp_gateway_addr = last_upnp_gateway_addr;
343 upnp_done = true;
344 },
345 last_nat_pmp = &mut nat_pmp_probing_task, if !nat_pmp_done => {
346 trace!("tick: nat_pmp probe ready");
347 probe.last_nat_pmp = last_nat_pmp;
348 nat_pmp_done = true;
349 },
350 last_pcp = &mut pcp_probing_task, if !pcp_done => {
351 trace!("tick: pcp probe ready");
352 probe.last_pcp = last_pcp;
353 pcp_done = true;
354 },
355 }
356 }
357
358 probe
359 }
360
361 fn output(&self) -> ProbeOutput {
363 let now = Instant::now();
364
365 let upnp = self
367 .last_upnp_gateway_addr
368 .as_ref()
369 .map(|(_gateway_addr, last_probed)| *last_probed + AVAILABILITY_TRUST_DURATION > now)
370 .unwrap_or_default();
371
372 let pcp = self
373 .last_pcp
374 .as_ref()
375 .map(|last_probed| *last_probed + AVAILABILITY_TRUST_DURATION > now)
376 .unwrap_or_default();
377
378 let nat_pmp = self
379 .last_nat_pmp
380 .as_ref()
381 .map(|last_probed| *last_probed + AVAILABILITY_TRUST_DURATION > now)
382 .unwrap_or_default();
383
384 ProbeOutput { upnp, pcp, nat_pmp }
385 }
386
387 fn update(&mut self, probe: Probe, metrics: &Arc<Metrics>) {
389 let Probe {
390 last_probe,
391 last_upnp_gateway_addr,
392 last_pcp,
393 last_nat_pmp,
394 } = probe;
395 if last_upnp_gateway_addr.is_some() {
396 metrics.upnp_available.inc();
397 let new_gateway = last_upnp_gateway_addr
398 .as_ref()
399 .map(|(addr, _last_seen)| addr);
400 let old_gateway = self
401 .last_upnp_gateway_addr
402 .as_ref()
403 .map(|(addr, _last_seen)| addr);
404 if new_gateway != old_gateway {
405 metrics.upnp_gateway_updated.inc();
406 debug!(
407 "upnp gateway changed {:?} -> {:?}",
408 old_gateway
409 .map(|gw| gw.to_string())
410 .unwrap_or("None".into()),
411 new_gateway
412 .map(|gw| gw.to_string())
413 .unwrap_or("None".into())
414 )
415 };
416 self.last_upnp_gateway_addr = last_upnp_gateway_addr;
417 }
418 if last_pcp.is_some() {
419 metrics.pcp_available.inc();
420 self.last_pcp = last_pcp;
421 }
422 if last_nat_pmp.is_some() {
423 self.last_nat_pmp = last_nat_pmp;
424 }
425
426 self.last_probe = last_probe;
427 }
428}
429
430type ProbeResult = Result<ProbeOutput, ProbeError>;
432
433#[derive(Debug)]
435pub struct Service {
436 config: Config,
437 local_port: Option<NonZeroU16>,
439 rx: mpsc::Receiver<Message>,
443 current_mapping: CurrentMapping,
445 full_probe: Probe,
447 mapping_task: Option<AbortOnDropHandle<Result<mapping::Mapping, mapping::Error>>>,
452 probing_task: Option<(AbortOnDropHandle<Probe>, Vec<oneshot::Sender<ProbeResult>>)>,
457 metrics: Arc<Metrics>,
458}
459
460impl Service {
461 fn new(
462 config: Config,
463 rx: mpsc::Receiver<Message>,
464 metrics: Arc<Metrics>,
465 ) -> (Self, watch::Receiver<Option<SocketAddrV4>>) {
466 let (current_mapping, watcher) = CurrentMapping::new(metrics.clone());
467 let mut full_probe = Probe::empty();
468 if let Some(in_the_past) = full_probe
469 .last_probe
470 .checked_sub(AVAILABILITY_TRUST_DURATION)
471 {
472 full_probe.last_probe = in_the_past;
474 }
475 let service = Service {
476 config,
477 local_port: None,
478 rx,
479 current_mapping,
480 full_probe,
481 mapping_task: None,
482 probing_task: None,
483 metrics,
484 };
485
486 (service, watcher)
487 }
488
489 async fn invalidate_mapping(&mut self) {
491 if let Some(old_mapping) = self.current_mapping.update(None)
492 && let Err(e) = old_mapping.release().await
493 {
494 debug!("failed to release mapping {e}");
495 }
496 }
497
498 async fn run(mut self) {
499 debug!("portmap starting");
500 loop {
501 tokio::select! {
502 msg = self.rx.recv() => {
503 trace!("tick: msg {msg:?}");
504 match msg {
505 Some(msg) => {
506 self.handle_msg(msg).await;
507 },
508 None => {
509 debug!("portmap service channel dropped. Likely shutting down.");
510 break;
511 }
512 }
513 }
514 mapping_result = util::MaybeFuture{ inner: self.mapping_task.as_mut() } => {
515 trace!("tick: mapping ready");
516 self.mapping_task = None;
518 self.on_mapping_result(mapping_result);
521 }
522 probe_result = util::MaybeFuture{ inner: self.probing_task.as_mut().map(|(fut, _rec)| fut) } => {
523 trace!("tick: probe ready");
524 let receivers = self.probing_task.take().expect("is some").1;
526 let probe_result = probe_result.map_err(|e| e!(ProbeError::Join { is_panic: e.is_panic(), is_cancelled: e.is_cancelled() }));
527 self.on_probe_result(probe_result, receivers);
528 }
529 Some(event) = self.current_mapping.next() => {
530 trace!("tick: mapping event {event:?}");
531 match event {
532 current_mapping::Event::Renew { external_ip, external_port } | current_mapping::Event::Expired { external_ip, external_port } => {
533 self.get_mapping(Some((external_ip, external_port)));
534 },
535 }
536
537 }
538 }
539 }
540 }
541
542 fn on_probe_result(
543 &mut self,
544 result: Result<Probe, ProbeError>,
545 receivers: Vec<oneshot::Sender<ProbeResult>>,
546 ) {
547 let result = result.map(|probe| {
548 self.full_probe.update(probe, &self.metrics);
549 let output = self.full_probe.output();
553 trace!(?output, "probe output");
554 output
555 });
556 for tx in receivers {
557 let _ = tx.send(result.clone());
559 }
560 }
561
562 fn on_mapping_result(
563 &mut self,
564 result: Result<Result<mapping::Mapping, mapping::Error>, tokio::task::JoinError>,
565 ) {
566 match result {
567 Ok(Ok(mapping)) => {
568 self.current_mapping.update(Some(mapping));
569 }
570 Ok(Err(e)) => {
571 debug!("failed to get a port mapping {e}");
572 self.metrics.mapping_failures.inc();
573 }
574 Err(e) => {
575 debug!("failed to get a port mapping {e}");
576 self.metrics.mapping_failures.inc();
577 }
578 }
579 }
580
581 async fn handle_msg(&mut self, msg: Message) {
582 match msg {
583 Message::ProcureMapping => self.update_local_port(self.local_port).await,
584 Message::UpdateLocalPort { local_port } => self.update_local_port(local_port).await,
585 Message::Probe { result_tx } => self.probe_request(result_tx),
586 }
587 }
588
589 async fn update_local_port(&mut self, local_port: Option<NonZeroU16>) {
594 if local_port != self.local_port {
596 self.metrics.local_port_updates.inc();
597 let old_port = std::mem::replace(&mut self.local_port, local_port);
598
599 let dropped_task = self.mapping_task.take();
602 let did_cancel = dropped_task
604 .map(|task| !task.is_finished())
605 .unwrap_or_default();
606
607 if did_cancel {
608 debug!(
609 "canceled mapping task due to local port update. Old: {:?} New: {:?}",
610 old_port, self.local_port
611 )
612 }
613
614 let external_addr = self.current_mapping.external();
616
617 if external_addr.is_some() {
621 self.invalidate_mapping().await;
622 }
623
624 self.get_mapping(external_addr);
626 } else if self.current_mapping.external().is_none() {
627 self.get_mapping(None)
629 }
630 }
631
632 fn get_mapping(&mut self, external_addr: Option<(Ipv4Addr, NonZeroU16)>) {
633 if let Some(local_port) = self.local_port {
634 self.metrics.mapping_attempts.inc();
635
636 let (local_ip, gateway) = match ip_and_gateway() {
637 Ok(ip_and_gw) => ip_and_gw,
638 Err(e) => return debug!("can't get mapping: {e}"),
639 };
640
641 let ProbeOutput { upnp, pcp, nat_pmp } = self.full_probe.output();
642
643 debug!("getting a port mapping for {local_ip}:{local_port} -> {external_addr:?}");
644 let recently_probed =
645 self.full_probe.last_probe + UNAVAILABILITY_TRUST_DURATION > Instant::now();
646 let protocol = self.config.protocol;
647 self.mapping_task = if pcp {
653 let task = mapping::Mapping::new_pcp(
655 protocol,
656 local_ip,
657 local_port,
658 gateway,
659 external_addr,
660 );
661 Some(AbortOnDropHandle::new(tokio::spawn(
662 task.instrument(info_span!("pcp")),
663 )))
664 } else if nat_pmp {
665 let task = mapping::Mapping::new_nat_pmp(
667 protocol,
668 local_ip,
669 local_port,
670 gateway,
671 external_addr,
672 );
673 Some(AbortOnDropHandle::new(tokio::spawn(
674 task.instrument(info_span!("pmp")),
675 )))
676 } else if upnp || self.config.enable_upnp {
677 let external_port = external_addr.map(|(_addr, port)| port);
679 let gateway = self
680 .full_probe
681 .last_upnp_gateway_addr
682 .as_ref()
683 .map(|(gateway, _last_seen)| gateway.clone());
684 let task = mapping::Mapping::new_upnp(
685 protocol,
686 local_ip,
687 local_port,
688 gateway,
689 external_port,
690 );
691
692 Some(AbortOnDropHandle::new(tokio::spawn(
693 task.instrument(info_span!("upnp")),
694 )))
695 } else if !recently_probed && self.config.enable_pcp {
696 let task = mapping::Mapping::new_pcp(
699 protocol,
700 local_ip,
701 local_port,
702 gateway,
703 external_addr,
704 );
705
706 Some(AbortOnDropHandle::new(tokio::spawn(
707 task.instrument(info_span!("pcp")),
708 )))
709 } else if !recently_probed && self.config.enable_nat_pmp {
710 let task = mapping::Mapping::new_nat_pmp(
712 protocol,
713 local_ip,
714 local_port,
715 gateway,
716 external_addr,
717 );
718 Some(AbortOnDropHandle::new(tokio::spawn(
719 task.instrument(info_span!("pmp")),
720 )))
721 } else {
722 return;
724 }
725 }
726 }
727
728 fn probe_request(&mut self, result_tx: oneshot::Sender<Result<ProbeOutput, ProbeError>>) {
734 match self.probing_task.as_mut() {
735 Some((_task_handle, receivers)) => receivers.push(result_tx),
736 None => {
737 let probe_output = self.full_probe.output();
738 if probe_output.all_available() {
739 let _ = result_tx.send(Ok(probe_output));
741 } else {
742 self.metrics.probes_started.inc();
743
744 let (local_ip, gateway) = match ip_and_gateway() {
745 Ok(ip_and_gw) => ip_and_gw,
746 Err(e) => {
747 debug!("could not start probe: {e}");
749 let _ = result_tx.send(Err(e));
750 return;
751 }
752 };
753
754 let config = self.config.clone();
755 let metrics = self.metrics.clone();
756 let handle = tokio::spawn(
757 async move {
758 Probe::from_output(config, probe_output, local_ip, gateway, metrics)
759 .await
760 }
761 .instrument(info_span!("portmapper.probe")),
762 );
763 let receivers = vec![result_tx];
764 self.probing_task = Some((AbortOnDropHandle::new(handle), receivers));
765 }
766 }
767 }
768 }
769}
770
771fn ip_and_gateway() -> Result<(Ipv4Addr, Ipv4Addr), ProbeError> {
773 let Some(HomeRouter { gateway, my_ip }) = HomeRouter::new() else {
774 return Err(e!(ProbeError::NoGateway));
775 };
776
777 let local_ip = match my_ip {
778 Some(std::net::IpAddr::V4(ip))
779 if !ip.is_unspecified() && !ip.is_loopback() && !ip.is_multicast() =>
780 {
781 ip
782 }
783 other => {
784 debug!("no address suitable for port mapping found ({other:?}), using localhost");
785 Ipv4Addr::LOCALHOST
786 }
787 };
788
789 let std::net::IpAddr::V4(gateway) = gateway else {
790 return Err(e!(ProbeError::Ipv6Gateway));
791 };
792
793 Ok((local_ip, gateway))
794}