use std::task::Poll;
use iroh_base::TransportAddr;
use n0_future::time::Duration;
use n0_watcher::{Watchable, Watcher};
use noq::WeakPathHandle;
use noq_proto::PathId;
use smallvec::SmallVec;
use crate::{endpoint::PathStats, socket::transports};
#[derive(Debug, Clone, Eq, PartialEq, Default)]
pub struct PathInfoList(SmallVec<[PathInfo; 4]>);
impl PathInfoList {
pub fn iter(&self) -> impl Iterator<Item = &PathInfo> {
self.0.iter()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn len(&self) -> usize {
self.0.len()
}
}
#[derive(Debug)]
pub struct PathInfoListIter(smallvec::IntoIter<[PathInfo; 4]>);
impl IntoIterator for PathInfoList {
type Item = PathInfo;
type IntoIter = PathInfoListIter;
fn into_iter(self) -> Self::IntoIter {
PathInfoListIter(self.0.into_iter())
}
}
impl IntoIterator for PathWatcher {
type Item = PathInfo;
type IntoIter = PathInfoListIter;
fn into_iter(mut self) -> Self::IntoIter {
self.get().into_iter()
}
}
impl Iterator for PathInfoListIter {
type Item = PathInfo;
fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
struct CloseablePathList {
paths: PathInfoList,
closed: bool,
}
#[derive(Clone, Debug)]
pub struct PathWatcher {
paths_watcher: n0_watcher::Direct<CloseablePathList>,
selected_path_watcher: n0_watcher::Direct<Option<transports::Addr>>,
current_paths: PathInfoList,
current_selected_path: Option<transports::Addr>,
}
impl PathWatcher {
fn update_selected(&mut self) {
if let Some(path) = self.selected_path_watcher.peek()
&& Some(path) != self.current_selected_path.as_ref()
{
self.current_selected_path = Some(path.clone());
}
if let Some(selected_path) = self.current_selected_path.as_ref() {
for p in self.current_paths.0.iter_mut() {
p.is_selected = selected_path == p.remote_addr();
}
}
}
}
impl Watcher for PathWatcher {
type Value = PathInfoList;
fn update(&mut self) -> bool {
let mut updated = false;
if self.paths_watcher.update() {
updated = true;
self.current_paths = self.paths_watcher.peek().paths.clone();
}
if self.selected_path_watcher.update() {
updated = true;
}
if updated {
self.update_selected();
}
updated
}
fn peek(&self) -> &Self::Value {
&self.current_paths
}
fn is_connected(&self) -> bool {
self.paths_watcher.is_connected()
&& self.selected_path_watcher.is_connected()
&& !self.paths_watcher.peek().closed
}
fn poll_updated(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), n0_watcher::Disconnected>> {
if self.paths_watcher.peek().closed {
return Poll::Ready(Err(n0_watcher::Disconnected));
}
let mut is_ready = false;
if self.paths_watcher.poll_updated(cx)?.is_ready() {
self.current_paths = self.paths_watcher.peek().paths.clone();
is_ready = true;
}
if self.selected_path_watcher.poll_updated(cx)?.is_ready() {
is_ready = true;
}
if is_ready {
self.update_selected();
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
}
#[derive(derive_more::Debug, Clone, Eq, PartialEq)]
pub struct PathInfo {
#[debug("{}", path.id())]
path: WeakPathHandle,
remote_addr: TransportAddr,
is_abandoned: bool,
is_selected: bool,
}
impl PathInfo {
fn new(conn: &noq::Connection, id: PathId, remote_addr: TransportAddr) -> Option<Self> {
let path = conn.path(id)?;
Some(PathInfo {
path: path.weak_handle(),
remote_addr,
is_abandoned: path.status().is_err(),
is_selected: false,
})
}
pub fn id(&self) -> PathId {
self.path.id()
}
pub fn remote_addr(&self) -> &TransportAddr {
&self.remote_addr
}
pub fn is_selected(&self) -> bool {
self.is_selected
}
pub fn is_closed(&self) -> bool {
self.is_abandoned
}
pub fn is_ip(&self) -> bool {
self.remote_addr().is_ip()
}
pub fn is_relay(&self) -> bool {
self.remote_addr().is_relay()
}
pub fn stats(&self) -> Option<PathStats> {
self.path.upgrade().map(|p| p.stats())
}
pub fn rtt(&self) -> Option<Duration> {
self.stats().map(|s| s.rtt)
}
}
#[derive(Debug, Clone)]
pub(crate) struct PathWatchable {
paths: Watchable<CloseablePathList>,
selected_path: Watchable<Option<transports::Addr>>,
}
impl PathWatchable {
pub(super) fn new(selected_path: Watchable<Option<transports::Addr>>) -> Self {
let value = CloseablePathList {
paths: Default::default(),
closed: false,
};
Self {
paths: Watchable::new(value),
selected_path,
}
}
pub(super) fn close(&self) {
let mut value = self.paths.get();
value.closed = true;
self.paths.set(value).ok();
}
pub(super) fn insert(&self, conn: &noq::Connection, id: PathId, remote_addr: TransportAddr) {
if let Some(data) = PathInfo::new(conn, id, remote_addr) {
self.update(move |list| list.0.push(data));
}
}
pub(super) fn set_abandoned(&self, id: PathId) {
self.update(|list| {
if let Some(item) = list.0.iter_mut().find(|p| p.path.id() == id) {
item.is_abandoned = true;
}
});
}
fn update(&self, f: impl FnOnce(&mut PathInfoList)) {
let mut value = self.paths.get();
f(&mut value.paths);
if !self.paths.has_watchers() {
value.paths.0.retain(|p| !p.is_abandoned);
value.paths.0.shrink_to_fit();
}
self.paths.set(value).ok();
}
pub(crate) fn watch(&self) -> PathWatcher {
let paths_watcher = self.paths.watch();
let selected_path_watcher = self.selected_path.watch();
let mut watcher = PathWatcher {
current_paths: paths_watcher.peek().paths.clone(),
current_selected_path: None,
paths_watcher,
selected_path_watcher,
};
watcher.update_selected();
watcher
}
}