use std::{
future::Future,
net::{Ipv4Addr, SocketAddrV4},
num::NonZeroU16,
pin::Pin,
sync::Arc,
task::Poll,
time::Duration,
};
use tokio::{sync::watch, time};
use tracing::{debug, trace};
use crate::Metrics;
pub(super) trait Mapping: std::fmt::Debug + Unpin {
fn external(&self) -> (Ipv4Addr, NonZeroU16);
fn half_lifetime(&self) -> Duration;
}
impl Mapping for super::mapping::Mapping {
fn external(&self) -> (Ipv4Addr, NonZeroU16) {
super::mapping::PortMapped::external(self)
}
fn half_lifetime(&self) -> Duration {
super::mapping::PortMapped::half_lifetime(self)
}
}
#[derive(Debug)]
struct ActiveMapping<M> {
mapping: M,
deadline: Pin<Box<time::Sleep>>,
expire_after: bool,
}
impl<M: Mapping> ActiveMapping<M> {
fn new(mapping: M) -> Self {
let deadline = Box::pin(time::sleep(mapping.half_lifetime()));
ActiveMapping {
mapping,
deadline,
expire_after: false,
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub(super) enum Event {
Renew {
external_ip: Ipv4Addr,
external_port: NonZeroU16,
},
Expired {
external_ip: Ipv4Addr,
external_port: NonZeroU16,
},
}
#[derive(derive_more::Debug)]
pub(super) struct CurrentMapping<M = super::mapping::Mapping> {
mapping: Option<ActiveMapping<M>>,
address_tx: watch::Sender<Option<SocketAddrV4>>,
#[debug(skip)]
waker: Option<std::task::Waker>,
metrics: Arc<Metrics>,
}
impl<M: Mapping> CurrentMapping<M> {
pub(super) fn new(metrics: Arc<Metrics>) -> (Self, watch::Receiver<Option<SocketAddrV4>>) {
let (address_tx, address_rx) = watch::channel(None);
let wrapper = CurrentMapping {
mapping: None,
address_tx,
waker: None,
metrics,
};
(wrapper, address_rx)
}
pub(super) fn update(&mut self, mapping: Option<M>) -> Option<M> {
debug!("new port mapping {mapping:?}");
let maybe_external_addr = mapping.as_ref().map(|mapping| {
let (ip, port) = mapping.external();
SocketAddrV4::new(ip, port.into())
});
let old_mapping = std::mem::replace(&mut self.mapping, mapping.map(ActiveMapping::new))
.map(|mapping| mapping.mapping);
if let Some(waker) = &self.waker {
waker.wake_by_ref()
}
self.address_tx.send_if_modified(|old_addr| {
let old_addr = std::mem::replace(old_addr, maybe_external_addr);
let update = old_addr != maybe_external_addr;
if update {
self.metrics.external_address_updated.inc();
};
update
});
old_mapping
}
fn poll(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Event> {
if let Some(waker) = &self.waker {
if waker.will_wake(cx.waker()) {
self.waker = Some(cx.waker().clone());
}
} else {
self.waker = Some(cx.waker().clone());
}
if let Some(ActiveMapping {
mapping,
deadline,
expire_after,
}) = &mut self.mapping
&& deadline.as_mut().poll(cx).is_ready()
{
let (external_ip, external_port) = mapping.external();
return if *expire_after {
trace!("mapping expired {mapping:?}");
self.update(None);
Poll::Ready(Event::Expired {
external_ip,
external_port,
})
} else {
*deadline = Box::pin(time::sleep(mapping.half_lifetime()));
*expire_after = true;
trace!("due for renewal {mapping:?}");
Poll::Ready(Event::Renew {
external_ip,
external_port,
})
};
}
Poll::Pending
}
pub(crate) fn external(&self) -> Option<(Ipv4Addr, NonZeroU16)> {
self.mapping
.as_ref()
.map(|mapping| mapping.mapping.external())
}
}
impl<M: Mapping> futures_lite::Stream for CurrentMapping<M> {
type Item = Event;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
(*self.as_mut()).poll(cx).map(Some)
}
}
#[cfg(test)]
mod tests {
use futures_lite::StreamExt;
use super::*;
type M = (Ipv4Addr, NonZeroU16);
const HALF_LIFETIME_SECS: u64 = 1;
impl Mapping for M {
fn external(&self) -> M {
*self
}
fn half_lifetime(&self) -> Duration {
Duration::from_secs(HALF_LIFETIME_SECS)
}
}
#[tokio::test]
#[ntest::timeout(2500)]
async fn report_renew_expire_report() {
const TEST_PORT: NonZeroU16 = NonZeroU16::new(9586).unwrap();
const TEST_IP: std::net::Ipv4Addr = std::net::Ipv4Addr::LOCALHOST;
let (mut c, mut watcher) = CurrentMapping::<M>::new(Default::default());
let now = std::time::Instant::now();
c.update(Some((TEST_IP, TEST_PORT)));
time::timeout(Duration::from_millis(10), watcher.changed())
.await
.expect("change is as immediate as it can be.")
.expect("sender is alive");
let addr = watcher.borrow_and_update().unwrap();
assert_eq!(addr.ip(), &TEST_IP);
assert_eq!(addr.port(), Into::<u16>::into(TEST_PORT));
let event = c.next().await.expect("Renewal is reported");
assert_eq!(
event,
Event::Renew {
external_ip: TEST_IP,
external_port: TEST_PORT
}
);
assert_eq!(now.elapsed().as_secs(), HALF_LIFETIME_SECS);
assert!(!watcher.has_changed().unwrap());
let event = c.next().await.expect("Expiry is reported");
assert_eq!(
event,
Event::Expired {
external_ip: TEST_IP,
external_port: TEST_PORT
}
);
assert_eq!(now.elapsed().as_secs(), 2 * HALF_LIFETIME_SECS);
time::timeout(Duration::from_millis(10), watcher.changed())
.await
.expect("change is as immediate as it can be.")
.expect("sender is alive");
assert!(watcher.borrow_and_update().is_none());
}
}