use std::{
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};
use iroh_base::TransportAddr;
use n0_future::{StreamExt, time::Duration};
use noq::WeakPathHandle;
use noq_proto::PathId;
use smallvec::SmallVec;
use tokio::sync::{Notify, broadcast, futures::Notified};
use tokio_stream::{
Stream,
wrappers::{BroadcastStream, errors::BroadcastStreamRecvError},
};
use tracing::warn;
use crate::{
endpoint::PathStats,
socket::transports::{self, LocalTransportAddr},
};
const BROADCAST_CAPACITY: usize = 8;
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum PathEvent {
#[non_exhaustive]
Opened {
id: PathId,
remote_addr: TransportAddr,
local_addr: LocalTransportAddr,
},
#[non_exhaustive]
Closed {
id: PathId,
remote_addr: TransportAddr,
local_addr: LocalTransportAddr,
last_stats: Box<PathStats>,
},
#[non_exhaustive]
Selected {
id: PathId,
remote_addr: TransportAddr,
local_addr: LocalTransportAddr,
},
#[non_exhaustive]
Lagged {
missed: u64,
},
}
#[derive(Clone, derive_more::Debug)]
#[debug("PathData({}, {})", self.handle.id(), self.remote_addr)]
struct PathData {
handle: WeakPathHandle,
remote_addr: TransportAddr,
local_addr: LocalTransportAddr,
}
impl PathData {
fn upgrade(&self, _conn: &noq::Connection) -> noq::Path {
self.handle
.upgrade()
.expect("Wrong Connection reference passed to PathData::upgrade")
}
}
#[derive(Default, Debug, Clone)]
struct State {
list: SmallVec<[PathData; 4]>,
selected: Option<PathId>,
closed: bool,
}
#[derive(Debug)]
struct Shared {
state: Mutex<State>,
notify: Notify,
}
#[derive(Debug)]
pub(super) struct PathStateSender {
shared: Arc<Shared>,
events: broadcast::Sender<PathEvent>,
}
impl PathStateSender {
pub(super) fn new() -> (Self, PathStateReceiver) {
let (events, _) = broadcast::channel(BROADCAST_CAPACITY);
let shared = Arc::new(Shared {
state: Default::default(),
notify: Notify::new(),
});
let receiver = PathStateReceiver {
shared: shared.clone(),
events: events.downgrade(),
};
let sender = PathStateSender { shared, events };
(sender, receiver)
}
pub(super) fn record_opened(
&self,
handle: WeakPathHandle,
network_path: transports::FourTuple,
) {
let id = handle.id();
let remote_addr: TransportAddr = network_path.remote().into();
let local_addr = network_path.local();
{
let mut state = self.shared.state.lock().expect("poisoned");
let entry = PathData {
handle,
remote_addr: remote_addr.clone(),
local_addr: local_addr.clone(),
};
match state.list.iter().position(|e| e.handle.id() == id) {
Some(idx) => state.list[idx] = entry,
None => state.list.push(entry),
}
}
self.shared.notify.notify_waiters();
let _ = self.events.send(PathEvent::Opened {
id,
remote_addr,
local_addr,
});
}
pub(super) fn record_abandoned(&self, id: PathId, conn: &noq::Connection) {
let removed = {
let mut state = self.shared.state.lock().expect("poisoned");
if state.selected == Some(id) {
state.selected = None;
}
state
.list
.iter()
.position(|e| e.handle.id() == id)
.map(|pos| state.list.remove(pos))
};
if let Some(data) = removed {
let stats = data.upgrade(conn).stats();
self.shared.notify.notify_waiters();
let _ = self.events.send(PathEvent::Closed {
id,
remote_addr: data.remote_addr.clone(),
local_addr: data.local_addr.clone(),
last_stats: Box::new(stats),
});
}
}
pub(super) fn record_selected(&self, network_path: &transports::FourTuple) {
let remote_addr: TransportAddr = network_path.remote().into();
let local_addr = network_path.local();
let event = {
let mut state = self.shared.state.lock().expect("poisoned");
let selected_path_id = state
.list
.iter()
.find(|p| p.remote_addr == remote_addr && p.local_addr == local_addr)
.map(|p| p.handle.id());
if selected_path_id != state.selected {
state.selected = selected_path_id;
selected_path_id.map(|path_id| PathEvent::Selected {
id: path_id,
remote_addr: remote_addr.clone(),
local_addr: local_addr.clone(),
})
} else {
None
}
};
if let Some(event) = event {
let _ = self.events.send(event);
self.shared.notify.notify_waiters();
}
}
pub(super) fn close(self, closed: noq::Closed) {
let mut state = self.shared.state.lock().expect("poisoned");
if !state.closed {
for path in state.list.iter() {
if let Some(stats) = closed
.path_stats
.iter()
.find(|(id, _stats)| *id == path.handle.id())
.map(|(_id, stats)| stats)
{
let _ = self.events.send(PathEvent::Closed {
id: path.handle.id(),
remote_addr: path.remote_addr.clone(),
local_addr: path.local_addr.clone(),
last_stats: Box::new(*stats),
});
} else {
warn!(
"Connection close event is missing path stats for path {}",
path.handle.id()
);
}
}
state.closed = true;
self.shared.notify.notify_waiters();
}
}
}
impl Drop for PathStateSender {
fn drop(&mut self) {
let mut state = self.shared.state.lock().expect("poisoned");
if !state.closed {
state.closed = true;
self.shared.notify.notify_waiters();
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct PathStateReceiver {
shared: Arc<Shared>,
events: broadcast::WeakSender<PathEvent>,
}
impl PathStateReceiver {
pub(crate) fn get<'a>(&self, conn: &'a noq::Connection) -> PathList<'a> {
PathList {
snapshot: self.shared.state.lock().expect("poisoned").clone(),
conn,
}
}
pub(crate) fn events(&self) -> PathEventStream {
let receiver = if let Some(sender) = self.events.upgrade() {
sender.subscribe()
} else {
let (_tx, rx) = broadcast::channel(1);
rx
};
PathEventStream {
inner: BroadcastStream::new(receiver),
}
}
pub(crate) fn stream<'a>(&'a self, conn: &'a noq::Connection) -> PathListStream<'a> {
PathListStream {
shared: &self.shared,
conn,
notified: Box::pin(self.shared.notify.notified()),
first_poll: true,
}
}
}
#[derive(Clone, derive_more::Debug)]
pub struct PathList<'conn> {
snapshot: State,
#[debug(skip)]
conn: &'conn noq::Connection,
}
impl<'conn> PathList<'conn> {
pub fn len(&self) -> usize {
self.snapshot.list.len()
}
pub fn is_empty(&self) -> bool {
self.snapshot.list.is_empty()
}
pub fn iter(&self) -> PathListIter<'_> {
PathListIter {
inner: self.snapshot.list.iter(),
selected: self.snapshot.selected,
conn: self.conn,
}
}
pub fn get(&self, id: PathId) -> Option<Path<'_>> {
self.iter().find(|p| p.id() == id)
}
}
impl<'a> IntoIterator for &'a PathList<'a> {
type IntoIter = PathListIter<'a>;
type Item = Path<'a>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
#[derive(Debug)]
pub struct PathListIter<'a> {
inner: std::slice::Iter<'a, PathData>,
selected: Option<PathId>,
conn: &'a noq::Connection,
}
impl<'a> PathListIter<'a> {
fn item(&self, data: &'a PathData) -> Path<'a> {
Path {
data,
is_selected: self.selected == Some(data.handle.id()),
conn: self.conn,
}
}
}
impl<'a> Iterator for PathListIter<'a> {
type Item = Path<'a>;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next().map(|d| self.item(d))
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl<'a> DoubleEndedIterator for PathListIter<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
self.inner.next_back().map(|d| self.item(d))
}
}
impl ExactSizeIterator for PathListIter<'_> {}
#[derive(Clone, Debug)]
pub struct Path<'a> {
data: &'a PathData,
is_selected: bool,
conn: &'a noq::Connection,
}
impl<'conn> Path<'conn> {
pub fn id(&self) -> PathId {
self.data.handle.id()
}
pub fn remote_addr(&self) -> &TransportAddr {
&self.data.remote_addr
}
pub fn local_addr(&self) -> &LocalTransportAddr {
&self.data.local_addr
}
pub fn is_selected(&self) -> bool {
self.is_selected
}
pub fn is_ip(&self) -> bool {
self.data.remote_addr.is_ip()
}
pub fn is_relay(&self) -> bool {
self.data.remote_addr.is_relay()
}
pub fn stats(&self) -> PathStats {
self.data.upgrade(self.conn).stats()
}
pub fn rtt(&self) -> Duration {
self.stats().rtt
}
}
#[derive(Debug)]
pub struct PathListStream<'conn> {
shared: &'conn Shared,
conn: &'conn noq::Connection,
notified: Pin<Box<Notified<'conn>>>,
first_poll: bool,
}
impl<'conn> Stream for PathListStream<'conn> {
type Item = PathList<'conn>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.first_poll {
this.first_poll = false;
} else {
std::task::ready!(this.notified.as_mut().poll(cx));
this.notified.set(this.shared.notify.notified());
}
this.notified.as_mut().enable();
let snapshot = this.shared.state.lock().expect("poisoned").clone();
if snapshot.closed {
Poll::Ready(None)
} else {
Poll::Ready(Some(PathList {
snapshot,
conn: this.conn,
}))
}
}
}
#[derive(Debug)]
pub struct PathEventStream {
inner: BroadcastStream<PathEvent>,
}
impl Stream for PathEventStream {
type Item = PathEvent;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.poll_next(cx).map(|event| match event? {
Ok(event) => Some(event),
Err(BroadcastStreamRecvError::Lagged(missed)) => Some(PathEvent::Lagged { missed }),
})
}
}