use {
super::link::Link,
crate::{
PeerId,
SecretKey,
discovery::{SignedPeerEntry, ping::Ping},
network::{
NetworkId,
Success,
link::{LinkError, OpenError, Protocol},
},
primitives::{IntoIterOrSingle, Short},
},
core::time::Duration,
iroh::{Endpoint, EndpointAddr, address_lookup::AddressLookup},
std::{fmt, sync::Arc},
tokio::sync::SetOnce,
tokio_util::sync::CancellationToken,
};
pub struct LocalNode(Arc<Inner>);
impl LocalNode {
pub fn network_id(&self) -> &NetworkId {
&self.0.network_id
}
pub fn endpoint(&self) -> &Endpoint {
&self.0.endpoint
}
pub fn addr(&self) -> EndpointAddr {
self.endpoint().addr()
}
pub fn id(&self) -> PeerId {
self.0.endpoint.id()
}
pub fn secret_key(&self) -> &SecretKey {
self.0.endpoint.secret_key()
}
pub async fn online(&self) {
self.0.ready_signal.wait().await;
self.0.endpoint.online().await;
}
}
impl LocalNode {
pub(crate) fn new(network_id: NetworkId, endpoint: Endpoint) -> Self {
Self(Arc::new(Inner {
network_id,
endpoint,
ready_signal: SetOnce::new(),
termination: CancellationToken::new(),
}))
}
pub(crate) fn mark_ready(&self) {
let _ = self.0.ready_signal.set(());
}
pub(crate) fn termination(&self) -> &CancellationToken {
&self.0.termination
}
#[allow(unused)]
pub(crate) fn connect<P: Protocol>(
&self,
remote: impl Into<EndpointAddr>,
) -> impl Future<Output = Result<Link<P>, OpenError>> + Send + 'static {
let local = self.clone();
let remote = remote.into();
let cancel = self.termination().clone();
async move { Link::open_with_cancel(&local, remote, cancel).await }
}
pub(crate) fn connect_with_cancel<P: Protocol>(
&self,
remote: impl Into<EndpointAddr>,
cancel: CancellationToken,
) -> impl Future<Output = Result<Link<P>, OpenError>> + Send + 'static {
let local = self.clone();
let remote = remote.into();
async move { Link::open_with_cancel(&local, remote, cancel).await }
}
pub(crate) fn observe<'a, V>(
&self,
addrs: impl IntoIterOrSingle<&'a EndpointAddr, V>,
) {
let Ok(addr_lookup) = self.endpoint().address_lookup() else {
return; };
for addr in addrs.iterator() {
addr_lookup.publish(&addr.clone().into());
}
}
pub(crate) fn ping(
&self,
peer: impl Into<EndpointAddr>,
timeout: Option<std::time::Duration>,
) -> impl Future<Output = Result<SignedPeerEntry, LinkError>> + Send + 'static
{
let peer = peer.into();
let local = self.clone();
let timeout = timeout.unwrap_or_else(|| Duration::from_secs(5));
async move {
let peer_id = peer.id;
let mut link = tokio::time::timeout(timeout, local.connect::<Ping>(peer))
.await?
.inspect_err(|e| {
tracing::debug!(
error = %e,
network = %local.network_id(),
peer = %Short(&peer_id),
"ping failed"
);
})?;
link.send(&()).await.inspect_err(|e| {
tracing::debug!(
error = %e,
network = %local.network_id(),
peer = %Short(&peer_id),
"failed to send ping query"
);
})?;
let entry = link.recv::<SignedPeerEntry>().await.inspect_err(|e| {
tracing::debug!(
error = %e,
network = %local.network_id(),
peer = %Short(&peer_id),
"failed to receive ping response"
);
})?;
link.close(Success).await.inspect_err(|e| {
tracing::debug!(
error = %e,
network = %local.network_id(),
peer = %Short(&peer_id),
"failed to close ping link"
);
})?;
Ok(entry)
}
}
}
impl Clone for LocalNode {
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}
impl fmt::Debug for LocalNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LocalNode")
.field("network_id", &self.0.network_id)
.field("endpoint", &self.0.endpoint)
.finish()
}
}
impl fmt::Display for LocalNode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "LocalNode({})", self.0.endpoint.id())
}
}
struct Inner {
network_id: NetworkId,
endpoint: Endpoint,
ready_signal: SetOnce<()>,
termination: CancellationToken,
}