use iroh::address_lookup::{DnsAddressLookup, PkarrPublisher};
use iroh::endpoint::{IdleTimeout, QuicTransportConfig, TransportAddrUsage};
use iroh::{Endpoint, RelayMode, SecretKey};
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crate::pool::ConnectionPool;
use crate::server::ServeHandle;
use crate::stream::{HandleStore, StoreConfig};
use crate::{ALPN, ALPN_DUPLEX};
#[derive(Debug, Clone, Default)]
pub struct NetworkingOptions {
pub relay_mode: Option<String>,
pub relays: Vec<String>,
pub bind_addrs: Vec<String>,
pub idle_timeout_ms: Option<u64>,
pub proxy_url: Option<String>,
pub proxy_from_env: bool,
pub disabled: bool,
}
#[derive(Debug, Clone)]
pub struct DiscoveryOptions {
pub dns_server: Option<String>,
pub enabled: bool,
}
impl Default for DiscoveryOptions {
fn default() -> Self {
Self {
dns_server: None,
enabled: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct PoolOptions {
pub max_connections: Option<usize>,
pub idle_timeout_ms: Option<u64>,
}
#[derive(Debug, Clone, Default)]
pub struct StreamingOptions {
pub channel_capacity: Option<usize>,
pub max_chunk_size_bytes: Option<usize>,
pub drain_timeout_ms: Option<u64>,
pub handle_ttl_ms: Option<u64>,
}
#[derive(Debug, Clone, Default)]
pub struct NodeOptions {
pub key: Option<[u8; 32]>,
pub networking: NetworkingOptions,
pub discovery: DiscoveryOptions,
pub pool: PoolOptions,
pub streaming: StreamingOptions,
pub capabilities: Vec<String>,
pub keylog: bool,
pub max_header_size: Option<usize>,
pub server_limits: crate::server::ServerLimits,
#[cfg(feature = "compression")]
pub compression: Option<CompressionOptions>,
}
#[cfg(feature = "compression")]
#[derive(Debug, Clone)]
pub struct CompressionOptions {
pub min_body_bytes: usize,
pub level: Option<u32>,
}
#[derive(Clone)]
pub struct IrohEndpoint {
pub(crate) inner: Arc<EndpointInner>,
}
pub(crate) struct EndpointInner {
pub ep: Endpoint,
pub node_id_str: String,
pub pool: ConnectionPool,
pub max_header_size: usize,
pub server_limits: crate::server::ServerLimits,
pub handles: HandleStore,
pub serve_handle: std::sync::Mutex<Option<ServeHandle>>,
pub serve_done_rx: std::sync::Mutex<Option<tokio::sync::watch::Receiver<bool>>>,
pub closed_tx: tokio::sync::watch::Sender<bool>,
pub closed_rx: tokio::sync::watch::Receiver<bool>,
pub active_connections: Arc<AtomicUsize>,
pub active_requests: Arc<AtomicUsize>,
#[cfg(feature = "compression")]
pub compression: Option<CompressionOptions>,
}
impl IrohEndpoint {
pub async fn bind(opts: NodeOptions) -> Result<Self, crate::CoreError> {
if opts.networking.disabled
&& opts
.networking
.relay_mode
.as_deref()
.is_some_and(|m| !matches!(m, "disabled"))
{
return Err(crate::CoreError::invalid_input(
"networking.disabled is true but relay_mode is set to a non-disabled value; \
set relay_mode to \"disabled\" or omit it when networking.disabled is true",
));
}
let relay_mode = if opts.networking.disabled {
RelayMode::Disabled
} else {
match opts.networking.relay_mode.as_deref() {
None | Some("default") => RelayMode::Default,
Some("staging") => RelayMode::Staging,
Some("disabled") => RelayMode::Disabled,
Some("custom") => {
if opts.networking.relays.is_empty() {
return Err(crate::CoreError::invalid_input(
"relay_mode \"custom\" requires at least one URL in `relays`",
));
}
let urls = opts
.networking
.relays
.iter()
.map(|u| {
u.parse::<iroh::RelayUrl>()
.map_err(crate::CoreError::invalid_input)
})
.collect::<Result<Vec<_>, _>>()?;
RelayMode::custom(urls)
}
Some(other) => {
return Err(crate::CoreError::invalid_input(format!(
"unknown relay_mode: {other}"
)))
}
}
};
let alpns: Vec<Vec<u8>> = if opts.capabilities.is_empty() {
vec![ALPN_DUPLEX.to_vec(), ALPN.to_vec()]
} else {
let mut list: Vec<Vec<u8>> = opts
.capabilities
.iter()
.map(|c| c.as_bytes().to_vec())
.collect();
if !list.iter().any(|a| a == ALPN) {
list.push(ALPN.to_vec());
}
list
};
let mut builder = Endpoint::empty_builder(relay_mode).alpns(alpns);
if !opts.networking.disabled && opts.discovery.enabled {
if let Some(ref url_str) = opts.discovery.dns_server {
let url: url::Url = url_str.parse().map_err(|e| {
crate::CoreError::invalid_input(format!("invalid dns_discovery URL: {e}"))
})?;
builder = builder
.address_lookup(PkarrPublisher::builder(url.clone()))
.address_lookup(DnsAddressLookup::builder(
url.host_str().unwrap_or_default().to_string(),
));
} else {
builder = builder
.address_lookup(PkarrPublisher::n0_dns())
.address_lookup(DnsAddressLookup::n0_dns());
}
}
if let Some(key_bytes) = opts.key {
builder = builder.secret_key(SecretKey::from_bytes(&key_bytes));
}
if let Some(ms) = opts.networking.idle_timeout_ms {
let timeout = IdleTimeout::try_from(Duration::from_millis(ms)).map_err(|e| {
crate::CoreError::invalid_input(format!("idle_timeout_ms out of range: {e}"))
})?;
let transport = QuicTransportConfig::builder()
.max_idle_timeout(Some(timeout))
.build();
builder = builder.transport_config(transport);
}
for addr_str in &opts.networking.bind_addrs {
let sock: std::net::SocketAddr = addr_str.parse().map_err(|e| {
crate::CoreError::invalid_input(format!("invalid bind address \"{addr_str}\": {e}"))
})?;
builder = builder.bind_addr(sock).map_err(|e| {
crate::CoreError::invalid_input(format!("bind address \"{addr_str}\": {e}"))
})?;
}
if let Some(ref proxy) = opts.networking.proxy_url {
let url: url::Url = proxy
.parse()
.map_err(|e| crate::CoreError::invalid_input(format!("invalid proxy URL: {e}")))?;
builder = builder.proxy_url(url);
} else if opts.networking.proxy_from_env {
builder = builder.proxy_from_env();
}
if opts.keylog {
builder = builder.keylog(true);
}
let ep = builder.bind().await.map_err(classify_bind_error)?;
let node_id_str = crate::base32_encode(ep.id().as_bytes());
let store_config = StoreConfig {
channel_capacity: opts
.streaming
.channel_capacity
.unwrap_or(crate::stream::DEFAULT_CHANNEL_CAPACITY)
.max(1),
max_chunk_size: opts
.streaming
.max_chunk_size_bytes
.unwrap_or(crate::stream::DEFAULT_MAX_CHUNK_SIZE)
.max(1),
drain_timeout: Duration::from_millis(
opts.streaming
.drain_timeout_ms
.unwrap_or(crate::stream::DEFAULT_DRAIN_TIMEOUT_MS),
),
max_handles: crate::stream::DEFAULT_MAX_HANDLES,
ttl: Duration::from_millis(
opts.streaming
.handle_ttl_ms
.unwrap_or(crate::stream::DEFAULT_SLAB_TTL_MS),
),
};
let sweep_ttl = store_config.ttl;
let (closed_tx, closed_rx) = tokio::sync::watch::channel(false);
let inner = Arc::new(EndpointInner {
ep,
node_id_str,
pool: ConnectionPool::new(
opts.pool.max_connections,
opts.pool
.idle_timeout_ms
.map(std::time::Duration::from_millis),
),
max_header_size: match opts.max_header_size {
None | Some(0) => 64 * 1024,
Some(n) => n,
},
server_limits: {
let mut sl = opts.server_limits.clone();
if sl.max_consecutive_errors.is_none() {
sl.max_consecutive_errors = Some(5);
}
sl
},
handles: HandleStore::new(store_config),
serve_handle: std::sync::Mutex::new(None),
serve_done_rx: std::sync::Mutex::new(None),
closed_tx,
closed_rx,
active_connections: Arc::new(AtomicUsize::new(0)),
active_requests: Arc::new(AtomicUsize::new(0)),
#[cfg(feature = "compression")]
compression: opts.compression,
});
if !sweep_ttl.is_zero() {
let weak = Arc::downgrade(&inner);
tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_secs(60));
loop {
ticker.tick().await;
let Some(inner) = weak.upgrade() else {
break;
};
inner.handles.sweep(sweep_ttl);
drop(inner); }
});
}
Ok(Self { inner })
}
pub fn node_id(&self) -> &str {
&self.inner.node_id_str
}
pub fn max_consecutive_errors(&self) -> usize {
self.inner.server_limits.max_consecutive_errors.unwrap_or(5)
}
pub fn serve_options(&self) -> crate::server::ServeOptions {
self.inner.server_limits.clone()
}
pub fn secret_key_bytes(&self) -> [u8; 32] {
self.inner.ep.secret_key().to_bytes()
}
pub async fn close(&self) {
let handle = self
.inner
.serve_handle
.lock()
.unwrap_or_else(|e| e.into_inner())
.take();
if let Some(h) = handle {
h.drain().await;
}
self.inner.ep.close().await;
let _ = self.inner.closed_tx.send(true);
}
pub async fn close_force(&self) {
let handle = self
.inner
.serve_handle
.lock()
.unwrap_or_else(|e| e.into_inner())
.take();
if let Some(h) = handle {
h.abort();
}
self.inner.ep.close().await;
let _ = self.inner.closed_tx.send(true);
}
pub async fn wait_closed(&self) {
let mut rx = self.inner.closed_rx.clone();
let _ = rx.wait_for(|v| *v).await;
}
pub fn set_serve_handle(&self, handle: ServeHandle) {
*self
.inner
.serve_done_rx
.lock()
.unwrap_or_else(|e| e.into_inner()) = Some(handle.subscribe_done());
*self
.inner
.serve_handle
.lock()
.unwrap_or_else(|e| e.into_inner()) = Some(handle);
}
pub fn stop_serve(&self) {
if let Some(h) = self
.inner
.serve_handle
.lock()
.unwrap_or_else(|e| e.into_inner())
.as_ref()
{
h.shutdown();
}
}
pub async fn wait_serve_stop(&self) {
let rx = self
.inner
.serve_done_rx
.lock()
.unwrap_or_else(|e| e.into_inner())
.clone();
if let Some(mut rx) = rx {
let _ = rx.wait_for(|v| *v).await;
}
}
pub fn raw(&self) -> &Endpoint {
&self.inner.ep
}
pub fn handles(&self) -> &HandleStore {
&self.inner.handles
}
pub fn max_header_size(&self) -> usize {
self.inner.max_header_size
}
pub(crate) fn pool(&self) -> &ConnectionPool {
&self.inner.pool
}
pub fn endpoint_stats(&self) -> EndpointStats {
let (active_readers, active_writers, active_sessions, total_handles) =
self.inner.handles.count_handles();
let pool_size = self.inner.pool.entry_count_approx();
let active_connections = self.inner.active_connections.load(Ordering::Relaxed);
let active_requests = self.inner.active_requests.load(Ordering::Relaxed);
EndpointStats {
active_readers,
active_writers,
active_sessions,
total_handles,
pool_size,
active_connections,
active_requests,
}
}
#[cfg(feature = "compression")]
pub fn compression(&self) -> Option<&CompressionOptions> {
self.inner.compression.as_ref()
}
pub fn bound_sockets(&self) -> Vec<std::net::SocketAddr> {
self.inner.ep.bound_sockets()
}
pub fn node_addr(&self) -> NodeAddrInfo {
let addr = self.inner.ep.addr();
let mut addrs = Vec::new();
for relay in addr.relay_urls() {
addrs.push(relay.to_string());
}
for da in addr.ip_addrs() {
addrs.push(da.to_string());
}
NodeAddrInfo {
id: self.inner.node_id_str.clone(),
addrs,
}
}
pub fn home_relay(&self) -> Option<String> {
self.inner
.ep
.addr()
.relay_urls()
.next()
.map(|u| u.to_string())
}
pub async fn peer_info(&self, node_id_b32: &str) -> Option<NodeAddrInfo> {
let bytes = crate::base32_decode(node_id_b32).ok()?;
let arr: [u8; 32] = bytes.try_into().ok()?;
let pk = iroh::PublicKey::from_bytes(&arr).ok()?;
let info = self.inner.ep.remote_info(pk).await?;
let id = crate::base32_encode(info.id().as_bytes());
let mut addrs = Vec::new();
for a in info.addrs() {
match a.addr() {
iroh::TransportAddr::Ip(sock) => addrs.push(sock.to_string()),
iroh::TransportAddr::Relay(url) => addrs.push(url.to_string()),
other => addrs.push(format!("{:?}", other)),
}
}
Some(NodeAddrInfo { id, addrs })
}
pub async fn peer_stats(&self, node_id_b32: &str) -> Option<PeerStats> {
let bytes = crate::base32_decode(node_id_b32).ok()?;
let arr: [u8; 32] = bytes.try_into().ok()?;
let pk = iroh::PublicKey::from_bytes(&arr).ok()?;
let info = self.inner.ep.remote_info(pk).await?;
let mut paths = Vec::new();
let mut has_active_relay = false;
let mut active_relay_url: Option<String> = None;
for a in info.addrs() {
let is_relay = a.addr().is_relay();
let is_active = matches!(a.usage(), TransportAddrUsage::Active);
let addr_str = match a.addr() {
iroh::TransportAddr::Ip(sock) => sock.to_string(),
iroh::TransportAddr::Relay(url) => {
if is_active {
has_active_relay = true;
active_relay_url = Some(url.to_string());
}
url.to_string()
}
other => format!("{:?}", other),
};
paths.push(PathInfo {
relay: is_relay,
addr: addr_str,
active: is_active,
});
}
let (rtt_ms, bytes_sent, bytes_received, lost_packets, sent_packets, congestion_window) =
if let Some(pooled) = self.inner.pool.get_existing(pk, crate::ALPN).await {
let s = pooled.conn.stats();
let rtt = pooled.conn.rtt(iroh::endpoint::PathId::ZERO);
(
rtt.map(|d| d.as_secs_f64() * 1000.0),
Some(s.udp_tx.bytes),
Some(s.udp_rx.bytes),
None, None, None, )
} else {
(None, None, None, None, None, None)
};
Some(PeerStats {
relay: has_active_relay,
relay_url: active_relay_url,
paths,
rtt_ms,
bytes_sent,
bytes_received,
lost_packets,
sent_packets,
congestion_window,
})
}
}
fn classify_bind_error(e: impl std::fmt::Display) -> crate::CoreError {
let msg = e.to_string();
crate::CoreError::connection_failed(msg)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeAddrInfo {
pub id: String,
pub addrs: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct EndpointStats {
pub active_readers: usize,
pub active_writers: usize,
pub active_sessions: usize,
pub total_handles: usize,
pub pool_size: u64,
pub active_connections: usize,
pub active_requests: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ConnectionEvent {
pub peer_id: String,
pub connected: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerStats {
pub relay: bool,
pub relay_url: Option<String>,
pub paths: Vec<PathInfo>,
pub rtt_ms: Option<f64>,
pub bytes_sent: Option<u64>,
pub bytes_received: Option<u64>,
pub lost_packets: Option<u64>,
pub sent_packets: Option<u64>,
pub congestion_window: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PathInfo {
pub relay: bool,
pub addr: String,
pub active: bool,
}
pub fn parse_direct_addrs(
addrs: &Option<Vec<String>>,
) -> Result<Option<Vec<std::net::SocketAddr>>, String> {
match addrs {
None => Ok(None),
Some(v) => {
let mut out = Vec::with_capacity(v.len());
for s in v {
let addr = s
.parse::<std::net::SocketAddr>()
.map_err(|e| format!("invalid direct address {s:?}: {e}"))?;
out.push(addr);
}
Ok(Some(out))
}
}
}