Skip to main content

portmapper/
lib.rs

1//! Port mapping client and service.
2
3use std::{
4    net::{Ipv4Addr, SocketAddrV4},
5    num::NonZeroU16,
6    sync::Arc,
7    time::{Duration, Instant},
8};
9
10use current_mapping::CurrentMapping;
11use futures_lite::StreamExt;
12use n0_error::{e, stack_error};
13use netwatch::interfaces::HomeRouter;
14use tokio::sync::{mpsc, oneshot, watch};
15use tokio_util::task::AbortOnDropHandle;
16use tracing::{Instrument, debug, info_span, trace};
17
18mod current_mapping;
19mod mapping;
20mod metrics;
21mod nat_pmp;
22mod pcp;
23mod upnp;
24mod util;
25mod defaults {
26    use std::time::Duration;
27
28    /// Maximum duration a UPnP search can take before timing out.
29    pub(crate) const UPNP_SEARCH_TIMEOUT: Duration = Duration::from_secs(1);
30
31    /// Timeout to receive a response from a PCP server.
32    pub(crate) const PCP_RECV_TIMEOUT: Duration = Duration::from_millis(500);
33
34    /// Timeout to receive a response from a NAT-PMP server.
35    pub(crate) const NAT_PMP_RECV_TIMEOUT: Duration = Duration::from_millis(500);
36}
37
38pub use metrics::Metrics;
39
40/// If a port mapping service has been seen within the last [`AVAILABILITY_TRUST_DURATION`] it will
41/// not be probed again.
42const AVAILABILITY_TRUST_DURATION: Duration = Duration::from_secs(60 * 10); // 10 minutes
43
44/// Capacity of the channel to communicate with the long-running service.
45const SERVICE_CHANNEL_CAPACITY: usize = 32; // should be plenty
46
47/// If a port mapping service has not been seen within the last [`UNAVAILABILITY_TRUST_DURATION`]
48/// we allow trying a mapping using said protocol.
49const UNAVAILABILITY_TRUST_DURATION: Duration = Duration::from_secs(5);
50
51/// Output of a port mapping probe.
52#[derive(Debug, Clone, PartialEq, Eq, derive_more::Display)]
53#[display("portmap={{ UPnP: {upnp}, PMP: {nat_pmp}, PCP: {pcp} }}")]
54pub struct ProbeOutput {
55    /// If UPnP can be considered available.
56    pub upnp: bool,
57    /// If PCP can be considered available.
58    pub pcp: bool,
59    /// If PMP can be considered available.
60    pub nat_pmp: bool,
61}
62
63impl ProbeOutput {
64    /// Indicates if all port mapping protocols are available.
65    pub fn all_available(&self) -> bool {
66        self.upnp && self.pcp && self.nat_pmp
67    }
68}
69
70#[allow(missing_docs)]
71#[stack_error(derive, add_meta)]
72#[derive(Clone)]
73#[non_exhaustive]
74pub enum ProbeError {
75    #[error("Mapping channel is full")]
76    ChannelFull,
77    #[error("Mapping channel is closed")]
78    ChannelClosed,
79    #[error("No gateway found for probe")]
80    NoGateway,
81    #[error("gateway found is ipv6, ignoring")]
82    Ipv6Gateway,
83    #[error("Probe task stopped. is_panic: {is_panic}, is_cancelled: {is_cancelled}")]
84    Join { is_panic: bool, is_cancelled: bool },
85}
86
87#[derive(derive_more::Debug)]
88enum Message {
89    /// Attempt to get a mapping if the local port is set but there is no mapping.
90    ProcureMapping,
91    /// Request to update the local port.
92    ///
93    /// The resulting external address can be obtained subscribing using
94    /// [`Client::watch_external_address`].
95    /// A value of `None` will deactivate port mapping.
96    UpdateLocalPort { local_port: Option<NonZeroU16> },
97    /// Request to probe the port mapping protocols.
98    ///
99    /// The requester should wait for the result at the [`oneshot::Receiver`] counterpart of the
100    /// [`oneshot::Sender`].
101    Probe {
102        /// Sender side to communicate the result of the probe.
103        #[debug("_")]
104        result_tx: oneshot::Sender<Result<ProbeOutput, ProbeError>>,
105    },
106}
107
108/// Configuration for UDP or TCP network protocol.
109#[derive(Debug, Clone, Copy)]
110pub enum Protocol {
111    /// UDP protocol.
112    Udp,
113    /// TCP protocol.
114    Tcp,
115}
116
117/// Configures which port mapping protocols are enabled in the [`Service`].
118#[derive(Debug, Clone)]
119pub struct Config {
120    /// Whether UPnP is enabled.
121    pub enable_upnp: bool,
122    /// Whether PCP is enabled.
123    pub enable_pcp: bool,
124    /// Whether PMP is enabled.
125    pub enable_nat_pmp: bool,
126    /// Whether to use UDP or TCP.
127    pub protocol: Protocol,
128}
129
130impl Default for Config {
131    /// By default all port mapping protocols are enabled for UDP.
132    fn default() -> Self {
133        Config {
134            enable_upnp: true,
135            enable_pcp: true,
136            enable_nat_pmp: true,
137            protocol: Protocol::Udp,
138        }
139    }
140}
141
142/// Port mapping client.
143#[derive(Debug, Clone)]
144pub struct Client {
145    /// A watcher over the most recent external address obtained from port mapping.
146    ///
147    /// See [`watch::Receiver`].
148    port_mapping: watch::Receiver<Option<SocketAddrV4>>,
149    /// Channel used to communicate with the port mapping service.
150    service_tx: mpsc::Sender<Message>,
151    /// Metrics collected by the service.
152    metrics: Arc<Metrics>,
153    /// A handle to the service that will cancel the spawned task once the client is dropped.
154    _service_handle: std::sync::Arc<AbortOnDropHandle<()>>,
155}
156
157impl Default for Client {
158    fn default() -> Self {
159        Self::new(Config::default())
160    }
161}
162
163impl Client {
164    /// Create a new port mapping client.
165    pub fn new(config: Config) -> Self {
166        Self::with_metrics(config, Default::default())
167    }
168
169    /// Creates a new port mapping client with a previously created metrics collector.
170    pub fn with_metrics(config: Config, metrics: Arc<Metrics>) -> Self {
171        let (service_tx, service_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY);
172
173        let (service, watcher) = Service::new(config, service_rx, metrics.clone());
174
175        let handle = AbortOnDropHandle::new(tokio::spawn(
176            async move { service.run().await }.instrument(info_span!("portmapper.service")),
177        ));
178
179        Client {
180            port_mapping: watcher,
181            service_tx,
182            metrics,
183            _service_handle: std::sync::Arc::new(handle),
184        }
185    }
186
187    /// Request a probe to the port mapping protocols.
188    ///
189    /// Returns the [`oneshot::Receiver`] used to obtain the result of the probe.
190    pub fn probe(&self) -> oneshot::Receiver<Result<ProbeOutput, ProbeError>> {
191        let (result_tx, result_rx) = oneshot::channel();
192
193        if let Err(e) = self.service_tx.try_send(Message::Probe { result_tx }) {
194            use mpsc::error::TrySendError::*;
195
196            // recover the sender and return the error there
197            let (result_tx, e) = match e {
198                Full(Message::Probe { result_tx }) => (result_tx, e!(ProbeError::ChannelFull)),
199                Closed(Message::Probe { result_tx }) => (result_tx, e!(ProbeError::ChannelClosed)),
200                Full(_) | Closed(_) => unreachable!("Sent value is a probe."),
201            };
202
203            // sender was just created. If it's dropped we have two send error and are likely
204            // shutting down
205            // NOTE: second Err is infallible match due to being the sent value
206            if let Err(Err(e)) = result_tx.send(Err(e)) {
207                trace!("Failed to request probe: {e}")
208            }
209        }
210        result_rx
211    }
212
213    /// Try to get a mapping for the last local port if there isn't one already.
214    pub fn procure_mapping(&self) {
215        // requester can't really do anything with this error if returned, so we log it
216        if let Err(e) = self.service_tx.try_send(Message::ProcureMapping) {
217            trace!("Failed to request mapping {e}")
218        }
219    }
220
221    /// Update the local port.
222    ///
223    /// If the port changes, this will trigger a port mapping attempt.
224    pub fn update_local_port(&self, local_port: NonZeroU16) {
225        let local_port = Some(local_port);
226        // requester can't really do anything with this error if returned, so we log it
227        if let Err(e) = self
228            .service_tx
229            .try_send(Message::UpdateLocalPort { local_port })
230        {
231            trace!("Failed to update local port {e}")
232        }
233    }
234
235    /// Deactivate port mapping.
236    pub fn deactivate(&self) {
237        // requester can't really do anything with this error if returned, so we log it
238        if let Err(e) = self
239            .service_tx
240            .try_send(Message::UpdateLocalPort { local_port: None })
241        {
242            trace!("Failed to deactivate port mapping {e}")
243        }
244    }
245
246    /// Watch the external address for changes in the mappings.
247    pub fn watch_external_address(&self) -> watch::Receiver<Option<SocketAddrV4>> {
248        self.port_mapping.clone()
249    }
250
251    /// Returns the metrics collected by the service.
252    pub fn metrics(&self) -> &Arc<Metrics> {
253        &self.metrics
254    }
255}
256
257/// Port mapping protocol information obtained during a probe.
258#[derive(Debug)]
259struct Probe {
260    /// When was the probe last updated.
261    last_probe: Instant,
262    /// The last [`upnp::Gateway`] and when was it last seen.
263    last_upnp_gateway_addr: Option<(upnp::Gateway, Instant)>,
264    /// Last time PCP was seen.
265    last_pcp: Option<Instant>,
266    /// Last time NAT-PMP was seen.
267    last_nat_pmp: Option<Instant>,
268}
269
270impl Probe {
271    /// An empty probe set to `now`.
272    fn empty() -> Self {
273        Self {
274            last_probe: Instant::now(),
275            last_upnp_gateway_addr: None,
276            last_pcp: None,
277            last_nat_pmp: None,
278        }
279    }
280    /// Create a new probe based on a previous output.
281    async fn from_output(
282        config: Config,
283        output: ProbeOutput,
284        local_ip: Ipv4Addr,
285        gateway: Ipv4Addr,
286        metrics: Arc<Metrics>,
287    ) -> Probe {
288        let ProbeOutput { upnp, pcp, nat_pmp } = output;
289        let Config {
290            enable_upnp,
291            enable_pcp,
292            enable_nat_pmp,
293            protocol: _,
294        } = config;
295        let mut upnp_probing_task = util::MaybeFuture {
296            inner: (enable_upnp && !upnp).then(|| {
297                let metrics = metrics.clone();
298                Box::pin(async move {
299                    upnp::probe_available(&metrics)
300                        .await
301                        .map(|addr| (addr, Instant::now()))
302                })
303            }),
304        };
305
306        let mut pcp_probing_task = util::MaybeFuture {
307            inner: (enable_pcp && !pcp).then(|| {
308                let metrics = metrics.clone();
309                Box::pin(async move {
310                    metrics.pcp_probes.inc();
311                    pcp::probe_available(local_ip, gateway)
312                        .await
313                        .then(Instant::now)
314                })
315            }),
316        };
317
318        let mut nat_pmp_probing_task = util::MaybeFuture {
319            inner: (enable_nat_pmp && !nat_pmp).then(|| {
320                Box::pin(async {
321                    nat_pmp::probe_available(local_ip, gateway)
322                        .await
323                        .then(Instant::now)
324                })
325            }),
326        };
327
328        if upnp_probing_task.inner.is_some() {
329            metrics.upnp_probes.inc();
330        }
331
332        let mut upnp_done = upnp_probing_task.inner.is_none();
333        let mut pcp_done = pcp_probing_task.inner.is_none();
334        let mut nat_pmp_done = nat_pmp_probing_task.inner.is_none();
335
336        let mut probe = Probe::empty();
337
338        while !upnp_done || !pcp_done || !nat_pmp_done {
339            tokio::select! {
340                last_upnp_gateway_addr = &mut upnp_probing_task, if !upnp_done => {
341                    trace!("tick: upnp probe ready");
342                    probe.last_upnp_gateway_addr = last_upnp_gateway_addr;
343                    upnp_done = true;
344                },
345                last_nat_pmp = &mut nat_pmp_probing_task, if !nat_pmp_done => {
346                    trace!("tick: nat_pmp probe ready");
347                    probe.last_nat_pmp = last_nat_pmp;
348                    nat_pmp_done = true;
349                },
350                last_pcp = &mut pcp_probing_task, if !pcp_done => {
351                    trace!("tick: pcp probe ready");
352                    probe.last_pcp = last_pcp;
353                    pcp_done = true;
354                },
355            }
356        }
357
358        probe
359    }
360
361    /// Returns a [`ProbeOutput`] indicating which services can be considered available.
362    fn output(&self) -> ProbeOutput {
363        let now = Instant::now();
364
365        // check if the last UPnP gateway is valid
366        let upnp = self
367            .last_upnp_gateway_addr
368            .as_ref()
369            .map(|(_gateway_addr, last_probed)| *last_probed + AVAILABILITY_TRUST_DURATION > now)
370            .unwrap_or_default();
371
372        let pcp = self
373            .last_pcp
374            .as_ref()
375            .map(|last_probed| *last_probed + AVAILABILITY_TRUST_DURATION > now)
376            .unwrap_or_default();
377
378        let nat_pmp = self
379            .last_nat_pmp
380            .as_ref()
381            .map(|last_probed| *last_probed + AVAILABILITY_TRUST_DURATION > now)
382            .unwrap_or_default();
383
384        ProbeOutput { upnp, pcp, nat_pmp }
385    }
386
387    /// Updates a probe with the `Some` values of another probe that is _assumed_ newer.
388    fn update(&mut self, probe: Probe, metrics: &Arc<Metrics>) {
389        let Probe {
390            last_probe,
391            last_upnp_gateway_addr,
392            last_pcp,
393            last_nat_pmp,
394        } = probe;
395        if last_upnp_gateway_addr.is_some() {
396            metrics.upnp_available.inc();
397            let new_gateway = last_upnp_gateway_addr
398                .as_ref()
399                .map(|(addr, _last_seen)| addr);
400            let old_gateway = self
401                .last_upnp_gateway_addr
402                .as_ref()
403                .map(|(addr, _last_seen)| addr);
404            if new_gateway != old_gateway {
405                metrics.upnp_gateway_updated.inc();
406                debug!(
407                    "upnp gateway changed {:?} -> {:?}",
408                    old_gateway
409                        .map(|gw| gw.to_string())
410                        .unwrap_or("None".into()),
411                    new_gateway
412                        .map(|gw| gw.to_string())
413                        .unwrap_or("None".into())
414                )
415            };
416            self.last_upnp_gateway_addr = last_upnp_gateway_addr;
417        }
418        if last_pcp.is_some() {
419            metrics.pcp_available.inc();
420            self.last_pcp = last_pcp;
421        }
422        if last_nat_pmp.is_some() {
423            self.last_nat_pmp = last_nat_pmp;
424        }
425
426        self.last_probe = last_probe;
427    }
428}
429
430// mainly to make clippy happy
431type ProbeResult = Result<ProbeOutput, ProbeError>;
432
433/// A port mapping client.
434#[derive(Debug)]
435pub struct Service {
436    config: Config,
437    /// Local port to map.
438    local_port: Option<NonZeroU16>,
439    /// Channel over which the service is informed of messages.
440    ///
441    /// The service will stop when all senders are gone.
442    rx: mpsc::Receiver<Message>,
443    /// Currently active mapping.
444    current_mapping: CurrentMapping,
445    /// Last updated probe.
446    full_probe: Probe,
447    /// Task attempting to get a port mapping.
448    ///
449    /// This task will be cancelled if a request to set the local port arrives before it's
450    /// finished.
451    mapping_task: Option<AbortOnDropHandle<Result<mapping::Mapping, mapping::Error>>>,
452    /// Task probing the necessary protocols.
453    ///
454    /// Requests for a probe that arrive while this task is still in progress will receive the same
455    /// result.
456    probing_task: Option<(AbortOnDropHandle<Probe>, Vec<oneshot::Sender<ProbeResult>>)>,
457    metrics: Arc<Metrics>,
458}
459
460impl Service {
461    fn new(
462        config: Config,
463        rx: mpsc::Receiver<Message>,
464        metrics: Arc<Metrics>,
465    ) -> (Self, watch::Receiver<Option<SocketAddrV4>>) {
466        let (current_mapping, watcher) = CurrentMapping::new(metrics.clone());
467        let mut full_probe = Probe::empty();
468        if let Some(in_the_past) = full_probe
469            .last_probe
470            .checked_sub(AVAILABILITY_TRUST_DURATION)
471        {
472            // we want to do a first full probe, so set is as expired on start-up
473            full_probe.last_probe = in_the_past;
474        }
475        let service = Service {
476            config,
477            local_port: None,
478            rx,
479            current_mapping,
480            full_probe,
481            mapping_task: None,
482            probing_task: None,
483            metrics,
484        };
485
486        (service, watcher)
487    }
488
489    /// Clears the current mapping and releases it.
490    async fn invalidate_mapping(&mut self) {
491        if let Some(old_mapping) = self.current_mapping.update(None)
492            && let Err(e) = old_mapping.release().await
493        {
494            debug!("failed to release mapping {e}");
495        }
496    }
497
498    async fn run(mut self) {
499        debug!("portmap starting");
500        loop {
501            tokio::select! {
502                msg = self.rx.recv() => {
503                    trace!("tick: msg {msg:?}");
504                    match msg {
505                        Some(msg) => {
506                            self.handle_msg(msg).await;
507                        },
508                        None => {
509                            debug!("portmap service channel dropped. Likely shutting down.");
510                            break;
511                        }
512                    }
513                }
514                mapping_result = util::MaybeFuture{ inner: self.mapping_task.as_mut() } => {
515                    trace!("tick: mapping ready");
516                    // regardless of outcome, the task is finished, clear it
517                    self.mapping_task = None;
518                    // there isn't really a way to react to a join error here. Flatten it to make
519                    // it easier to work with
520                    self.on_mapping_result(mapping_result);
521                }
522                probe_result = util::MaybeFuture{ inner: self.probing_task.as_mut().map(|(fut, _rec)| fut) } => {
523                    trace!("tick: probe ready");
524                    // retrieve the receivers and clear the task
525                    let receivers = self.probing_task.take().expect("is some").1;
526                    let probe_result = probe_result.map_err(|e| e!(ProbeError::Join { is_panic: e.is_panic(), is_cancelled: e.is_cancelled() }));
527                    self.on_probe_result(probe_result, receivers);
528                }
529                Some(event) = self.current_mapping.next() => {
530                    trace!("tick: mapping event {event:?}");
531                    match event {
532                        current_mapping::Event::Renew { external_ip, external_port } | current_mapping::Event::Expired { external_ip, external_port } => {
533                            self.get_mapping(Some((external_ip, external_port)));
534                        },
535                    }
536
537                }
538            }
539        }
540    }
541
542    fn on_probe_result(
543        &mut self,
544        result: Result<Probe, ProbeError>,
545        receivers: Vec<oneshot::Sender<ProbeResult>>,
546    ) {
547        let result = result.map(|probe| {
548            self.full_probe.update(probe, &self.metrics);
549            // TODO(@divma): the gateway of the current mapping could have changed. Tailscale
550            // still assumes the current mapping is valid/active and will return it even after
551            // this
552            let output = self.full_probe.output();
553            trace!(?output, "probe output");
554            output
555        });
556        for tx in receivers {
557            // ignore the error. If the receiver is no longer there we don't really care
558            let _ = tx.send(result.clone());
559        }
560    }
561
562    fn on_mapping_result(
563        &mut self,
564        result: Result<Result<mapping::Mapping, mapping::Error>, tokio::task::JoinError>,
565    ) {
566        match result {
567            Ok(Ok(mapping)) => {
568                self.current_mapping.update(Some(mapping));
569            }
570            Ok(Err(e)) => {
571                debug!("failed to get a port mapping {e}");
572                self.metrics.mapping_failures.inc();
573            }
574            Err(e) => {
575                debug!("failed to get a port mapping {e}");
576                self.metrics.mapping_failures.inc();
577            }
578        }
579    }
580
581    async fn handle_msg(&mut self, msg: Message) {
582        match msg {
583            Message::ProcureMapping => self.update_local_port(self.local_port).await,
584            Message::UpdateLocalPort { local_port } => self.update_local_port(local_port).await,
585            Message::Probe { result_tx } => self.probe_request(result_tx),
586        }
587    }
588
589    /// Updates the local port of the port mapping service.
590    ///
591    /// If the port changed, any port mapping task is cancelled. If the new port is some, it will
592    /// start a new port mapping task.
593    async fn update_local_port(&mut self, local_port: Option<NonZeroU16>) {
594        // ignore requests to update the local port in a way that does not produce a change
595        if local_port != self.local_port {
596            self.metrics.local_port_updates.inc();
597            let old_port = std::mem::replace(&mut self.local_port, local_port);
598
599            // clear the current mapping task if any
600
601            let dropped_task = self.mapping_task.take();
602            // check if the dropped task had finished to reduce log noise
603            let did_cancel = dropped_task
604                .map(|task| !task.is_finished())
605                .unwrap_or_default();
606
607            if did_cancel {
608                debug!(
609                    "canceled mapping task due to local port update. Old: {:?} New: {:?}",
610                    old_port, self.local_port
611                )
612            }
613
614            // get the current external port if any to try to get it again
615            let external_addr = self.current_mapping.external();
616
617            // since the port has changed, the current mapping is no longer valid and should be
618            // released
619
620            if external_addr.is_some() {
621                self.invalidate_mapping().await;
622            }
623
624            // start a new mapping task to account for the new port if necessary
625            self.get_mapping(external_addr);
626        } else if self.current_mapping.external().is_none() {
627            // if the local port has not changed, but there is no active mapping try to get one
628            self.get_mapping(None)
629        }
630    }
631
632    fn get_mapping(&mut self, external_addr: Option<(Ipv4Addr, NonZeroU16)>) {
633        if let Some(local_port) = self.local_port {
634            self.metrics.mapping_attempts.inc();
635
636            let (local_ip, gateway) = match ip_and_gateway() {
637                Ok(ip_and_gw) => ip_and_gw,
638                Err(e) => return debug!("can't get mapping: {e}"),
639            };
640
641            let ProbeOutput { upnp, pcp, nat_pmp } = self.full_probe.output();
642
643            debug!("getting a port mapping for {local_ip}:{local_port} -> {external_addr:?}");
644            let recently_probed =
645                self.full_probe.last_probe + UNAVAILABILITY_TRUST_DURATION > Instant::now();
646            let protocol = self.config.protocol;
647            // strategy:
648            // 1. check the available services and prefer pcp, then nat_pmp then upnp since it's
649            //    the most unreliable, but possibly the most deployed one
650            // 2. if no service was available, fallback to upnp if enabled, followed by pcp and
651            //    nat_pmp
652            self.mapping_task = if pcp {
653                // try pcp if available first
654                let task = mapping::Mapping::new_pcp(
655                    protocol,
656                    local_ip,
657                    local_port,
658                    gateway,
659                    external_addr,
660                );
661                Some(AbortOnDropHandle::new(tokio::spawn(
662                    task.instrument(info_span!("pcp")),
663                )))
664            } else if nat_pmp {
665                // next nat_pmp if available
666                let task = mapping::Mapping::new_nat_pmp(
667                    protocol,
668                    local_ip,
669                    local_port,
670                    gateway,
671                    external_addr,
672                );
673                Some(AbortOnDropHandle::new(tokio::spawn(
674                    task.instrument(info_span!("pmp")),
675                )))
676            } else if upnp || self.config.enable_upnp {
677                // next upnp if available or enabled
678                let external_port = external_addr.map(|(_addr, port)| port);
679                let gateway = self
680                    .full_probe
681                    .last_upnp_gateway_addr
682                    .as_ref()
683                    .map(|(gateway, _last_seen)| gateway.clone());
684                let task = mapping::Mapping::new_upnp(
685                    protocol,
686                    local_ip,
687                    local_port,
688                    gateway,
689                    external_port,
690                );
691
692                Some(AbortOnDropHandle::new(tokio::spawn(
693                    task.instrument(info_span!("upnp")),
694                )))
695            } else if !recently_probed && self.config.enable_pcp {
696                // if no service is available and the default fallback (upnp) is disabled, try pcp
697                // first
698                let task = mapping::Mapping::new_pcp(
699                    protocol,
700                    local_ip,
701                    local_port,
702                    gateway,
703                    external_addr,
704                );
705
706                Some(AbortOnDropHandle::new(tokio::spawn(
707                    task.instrument(info_span!("pcp")),
708                )))
709            } else if !recently_probed && self.config.enable_nat_pmp {
710                // finally try nat_pmp if enabled
711                let task = mapping::Mapping::new_nat_pmp(
712                    protocol,
713                    local_ip,
714                    local_port,
715                    gateway,
716                    external_addr,
717                );
718                Some(AbortOnDropHandle::new(tokio::spawn(
719                    task.instrument(info_span!("pmp")),
720                )))
721            } else {
722                // give up
723                return;
724            }
725        }
726    }
727
728    /// Handles a probe request.
729    ///
730    /// If there is a task getting a probe, the receiver will be added with any other waiting for a
731    /// result. If no probe is underway, a result can be returned immediately if it's still
732    /// considered valid. Otherwise, a new probe task will be started.
733    fn probe_request(&mut self, result_tx: oneshot::Sender<Result<ProbeOutput, ProbeError>>) {
734        match self.probing_task.as_mut() {
735            Some((_task_handle, receivers)) => receivers.push(result_tx),
736            None => {
737                let probe_output = self.full_probe.output();
738                if probe_output.all_available() {
739                    // we don't care if the requester is no longer there
740                    let _ = result_tx.send(Ok(probe_output));
741                } else {
742                    self.metrics.probes_started.inc();
743
744                    let (local_ip, gateway) = match ip_and_gateway() {
745                        Ok(ip_and_gw) => ip_and_gw,
746                        Err(e) => {
747                            // there is no guarantee this will be displayed, so log it anyway
748                            debug!("could not start probe: {e}");
749                            let _ = result_tx.send(Err(e));
750                            return;
751                        }
752                    };
753
754                    let config = self.config.clone();
755                    let metrics = self.metrics.clone();
756                    let handle = tokio::spawn(
757                        async move {
758                            Probe::from_output(config, probe_output, local_ip, gateway, metrics)
759                                .await
760                        }
761                        .instrument(info_span!("portmapper.probe")),
762                    );
763                    let receivers = vec![result_tx];
764                    self.probing_task = Some((AbortOnDropHandle::new(handle), receivers));
765                }
766            }
767        }
768    }
769}
770
771/// Gets the local ip and gateway address for port mapping.
772fn ip_and_gateway() -> Result<(Ipv4Addr, Ipv4Addr), ProbeError> {
773    let Some(HomeRouter { gateway, my_ip }) = HomeRouter::new() else {
774        return Err(e!(ProbeError::NoGateway));
775    };
776
777    let local_ip = match my_ip {
778        Some(std::net::IpAddr::V4(ip))
779            if !ip.is_unspecified() && !ip.is_loopback() && !ip.is_multicast() =>
780        {
781            ip
782        }
783        other => {
784            debug!("no address suitable for port mapping found ({other:?}), using localhost");
785            Ipv4Addr::LOCALHOST
786        }
787    };
788
789    let std::net::IpAddr::V4(gateway) = gateway else {
790        return Err(e!(ProbeError::Ipv6Gateway));
791    };
792
793    Ok((local_ip, gateway))
794}