Skip to main content

iroh_http_core/
endpoint.rs

1//! Iroh endpoint lifecycle — create, share, and close.
2
3use iroh::address_lookup::{DnsAddressLookup, PkarrPublisher};
4use iroh::endpoint::{IdleTimeout, QuicTransportConfig, TransportAddrUsage};
5use iroh::{Endpoint, RelayMode, SecretKey};
6use serde::{Deserialize, Serialize};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use crate::pool::ConnectionPool;
12use crate::server::ServeHandle;
13use crate::stream::{HandleStore, StoreConfig};
14use crate::{ALPN, ALPN_DUPLEX};
15
16/// Networking / QUIC transport configuration.
17#[derive(Debug, Clone, Default)]
18pub struct NetworkingOptions {
19    /// Relay server mode. `"default"`, `"staging"`, `"disabled"`, or `"custom"`. Default: `"default"`.
20    pub relay_mode: Option<String>,
21    /// Custom relay server URLs. Only used when `relay_mode` is `"custom"`.
22    pub relays: Vec<String>,
23    /// UDP socket addresses to bind. Empty means OS-assigned.
24    pub bind_addrs: Vec<String>,
25    /// Milliseconds before an idle QUIC connection is cleaned up.
26    pub idle_timeout_ms: Option<u64>,
27    /// HTTP proxy URL for relay traffic.
28    pub proxy_url: Option<String>,
29    /// Read `HTTP_PROXY` / `HTTPS_PROXY` env vars for proxy config.
30    pub proxy_from_env: bool,
31    /// Disable relay servers and DNS discovery entirely. Overrides `relay_mode`.
32    /// Useful for in-process tests where endpoints connect via direct addresses.
33    pub disabled: bool,
34}
35
36/// DNS-based peer discovery configuration.
37#[derive(Debug, Clone)]
38pub struct DiscoveryOptions {
39    /// DNS discovery server URL. Uses n0 DNS defaults when `None`.
40    pub dns_server: Option<String>,
41    /// Whether to enable DNS discovery. Default: `true`.
42    pub enabled: bool,
43}
44
45impl Default for DiscoveryOptions {
46    fn default() -> Self {
47        Self {
48            dns_server: None,
49            enabled: true,
50        }
51    }
52}
53
54/// Connection-pool tuning.
55#[derive(Debug, Clone, Default)]
56pub struct PoolOptions {
57    /// Maximum number of idle connections to keep in the pool.
58    pub max_connections: Option<usize>,
59    /// Milliseconds a pooled connection may remain idle before being evicted.
60    pub idle_timeout_ms: Option<u64>,
61}
62
63/// Body-streaming and handle-store configuration.
64#[derive(Debug, Clone, Default)]
65pub struct StreamingOptions {
66    /// Capacity (in chunks) of each body channel. Default: 32.
67    pub channel_capacity: Option<usize>,
68    /// Maximum byte length of a single chunk in `send_chunk`. Default: 65536.
69    pub max_chunk_size_bytes: Option<usize>,
70    /// Milliseconds to wait for a slow body reader. Default: 30000.
71    pub drain_timeout_ms: Option<u64>,
72    /// TTL in ms for slab handle entries. `0` disables sweeping. Default: 300000.
73    pub handle_ttl_ms: Option<u64>,
74    /// How often (in ms) the TTL sweep task runs. Default: 60000 (60 s).
75    /// Reducing this lowers the worst-case leaked-handle window at the cost of
76    /// more frequent write-lock acquisitions on every handle registry.
77    /// Useful for short-lived endpoints and test fixtures.
78    pub sweep_interval_ms: Option<u64>,
79}
80
81/// Configuration passed to [`IrohEndpoint::bind`].
82#[derive(Debug, Clone, Default)]
83pub struct NodeOptions {
84    /// 32-byte Ed25519 secret key. Generate a fresh one when `None`.
85    pub key: Option<[u8; 32]>,
86    /// Networking / QUIC transport configuration.
87    pub networking: NetworkingOptions,
88    /// DNS-based peer discovery configuration.
89    pub discovery: DiscoveryOptions,
90    /// Connection-pool tuning.
91    pub pool: PoolOptions,
92    /// Body-streaming and handle-store configuration.
93    pub streaming: StreamingOptions,
94    /// ALPN capabilities to advertise. Empty = advertise iroh-http/2 and iroh-http/2-duplex.
95    pub capabilities: Vec<String>,
96    /// Write TLS session keys to $SSLKEYLOGFILE. Dev/debug only.
97    pub keylog: bool,
98    /// Maximum byte size of the HTTP/1.1 request or response head. `None` or `0` = 65536.
99    pub max_header_size: Option<usize>,
100    /// Server-side limits forwarded to the serve loop.
101    pub server_limits: crate::server::ServerLimits,
102    #[cfg(feature = "compression")]
103    pub compression: Option<CompressionOptions>,
104}
105
106/// Compression options for response bodies.
107/// Only used when the `compression` feature is enabled.
108#[cfg(feature = "compression")]
109#[derive(Debug, Clone)]
110pub struct CompressionOptions {
111    /// Minimum body size in bytes before compression is applied. Default: 512.
112    pub min_body_bytes: usize,
113    /// Zstd compression level (1–22). `None` uses the zstd default (3).
114    pub level: Option<u32>,
115}
116
117/// A shared Iroh endpoint.
118///
119/// Clone-able (cheap Arc clone).  All fetch and serve calls on the same node
120/// share one endpoint and therefore one stable QUIC identity.
121#[derive(Clone)]
122pub struct IrohEndpoint {
123    pub(crate) inner: Arc<EndpointInner>,
124}
125
126pub(crate) struct EndpointInner {
127    pub ep: Endpoint,
128    /// The node's own base32-encoded public key (stable for the lifetime of the key).
129    pub node_id_str: String,
130    /// Connection pool for reusing QUIC connections across fetch/connect calls.
131    pub pool: ConnectionPool,
132    /// Maximum byte size of an HTTP/1.1 head (request or response).
133    pub max_header_size: usize,
134    /// Server-side limits forwarded to the serve loop.
135    pub server_limits: crate::server::ServerLimits,
136    /// Per-endpoint handle store — owns all body readers, writers,
137    /// sessions, request-head channels, and fetch-cancel tokens.
138    pub handles: HandleStore,
139    /// Active serve handle, if `serve()` has been called.
140    pub serve_handle: std::sync::Mutex<Option<ServeHandle>>,
141    /// Done-signal receiver from the active serve task.
142    /// Stored separately so `wait_serve_stop()` can await it without holding
143    /// the `serve_handle` lock for the duration of the wait.
144    pub serve_done_rx: std::sync::Mutex<Option<tokio::sync::watch::Receiver<bool>>>,
145    /// Signals `true` when the endpoint has fully closed (either explicitly or
146    /// because the serve loop exited due to native shutdown).
147    pub closed_tx: tokio::sync::watch::Sender<bool>,
148    pub closed_rx: tokio::sync::watch::Receiver<bool>,
149    /// Number of currently active QUIC connections (incremented by serve loop,
150    /// decremented via RAII guard when each connection task exits).
151    pub active_connections: Arc<AtomicUsize>,
152    /// Number of currently in-flight HTTP requests (incremented when a
153    /// bi-stream is accepted, decremented when the request task exits).
154    pub active_requests: Arc<AtomicUsize>,
155    /// Sender for transport-level events (pool hits/misses, path changes, sweep).
156    pub event_tx: tokio::sync::mpsc::Sender<crate::events::TransportEvent>,
157    /// Receiver for transport-level events.  Wrapped in Mutex+Option so
158    /// `subscribe_events()` can take it exactly once for the platform drain task.
159    pub event_rx:
160        std::sync::Mutex<Option<tokio::sync::mpsc::Receiver<crate::events::TransportEvent>>>,
161    /// Per-peer path-change subscriptions.
162    /// Key: node_id_str. Populated lazily when `subscribe_path_changes` is called.
163    pub path_subs: dashmap::DashMap<String, tokio::sync::mpsc::UnboundedSender<PathInfo>>,
164    /// Body compression options, if the feature is enabled.
165    #[cfg(feature = "compression")]
166    pub compression: Option<CompressionOptions>,
167}
168
169impl IrohEndpoint {
170    /// Bind an Iroh endpoint with the supplied options.
171    pub async fn bind(opts: NodeOptions) -> Result<Self, crate::CoreError> {
172        // Validate: if networking is disabled, relay_mode should not be explicitly set to a
173        // network-requiring mode.
174        if opts.networking.disabled
175            && opts
176                .networking
177                .relay_mode
178                .as_deref()
179                .is_some_and(|m| !matches!(m, "disabled"))
180        {
181            return Err(crate::CoreError::invalid_input(
182                "networking.disabled is true but relay_mode is set to a non-disabled value; \
183                 set relay_mode to \"disabled\" or omit it when networking.disabled is true",
184            ));
185        }
186
187        let relay_mode = if opts.networking.disabled {
188            RelayMode::Disabled
189        } else {
190            match opts.networking.relay_mode.as_deref() {
191                None | Some("default") => RelayMode::Default,
192                Some("staging") => RelayMode::Staging,
193                Some("disabled") => RelayMode::Disabled,
194                Some("custom") => {
195                    if opts.networking.relays.is_empty() {
196                        return Err(crate::CoreError::invalid_input(
197                            "relay_mode \"custom\" requires at least one URL in `relays`",
198                        ));
199                    }
200                    let urls = opts
201                        .networking
202                        .relays
203                        .iter()
204                        .map(|u| {
205                            u.parse::<iroh::RelayUrl>()
206                                .map_err(crate::CoreError::invalid_input)
207                        })
208                        .collect::<Result<Vec<_>, _>>()?;
209                    RelayMode::custom(urls)
210                }
211                Some(other) => {
212                    return Err(crate::CoreError::invalid_input(format!(
213                        "unknown relay_mode: {other}"
214                    )))
215                }
216            }
217        };
218
219        let alpns: Vec<Vec<u8>> = if opts.capabilities.is_empty() {
220            // Advertise both ALPN variants.
221            vec![ALPN_DUPLEX.to_vec(), ALPN.to_vec()]
222        } else {
223            let mut list: Vec<Vec<u8>> = opts
224                .capabilities
225                .iter()
226                .map(|c| c.as_bytes().to_vec())
227                .collect();
228            // Always include the base protocol so the node can talk to base-only peers.
229            if !list.iter().any(|a| a == ALPN) {
230                list.push(ALPN.to_vec());
231            }
232            list
233        };
234
235        let mut builder = Endpoint::empty_builder(relay_mode).alpns(alpns);
236
237        // DNS discovery (enabled by default unless networking.disabled).
238        if !opts.networking.disabled && opts.discovery.enabled {
239            if let Some(ref url_str) = opts.discovery.dns_server {
240                let url: url::Url = url_str.parse().map_err(|e| {
241                    crate::CoreError::invalid_input(format!("invalid dns_discovery URL: {e}"))
242                })?;
243                builder = builder
244                    .address_lookup(PkarrPublisher::builder(url.clone()))
245                    .address_lookup(DnsAddressLookup::builder(
246                        url.host_str().unwrap_or_default().to_string(),
247                    ));
248            } else {
249                builder = builder
250                    .address_lookup(PkarrPublisher::n0_dns())
251                    .address_lookup(DnsAddressLookup::n0_dns());
252            }
253        }
254
255        if let Some(key_bytes) = opts.key {
256            builder = builder.secret_key(SecretKey::from_bytes(&key_bytes));
257        }
258
259        if let Some(ms) = opts.networking.idle_timeout_ms {
260            let timeout = IdleTimeout::try_from(Duration::from_millis(ms)).map_err(|e| {
261                crate::CoreError::invalid_input(format!("idle_timeout_ms out of range: {e}"))
262            })?;
263            let transport = QuicTransportConfig::builder()
264                .max_idle_timeout(Some(timeout))
265                // Limit inbound bidirectional streams per connection to bound
266                // slowloris-style resource exhaustion (P2-4).
267                .max_concurrent_bidi_streams(128u32.into())
268                .build();
269            builder = builder.transport_config(transport);
270        } else {
271            // Even without a custom idle timeout, cap concurrent bidi streams.
272            let transport = QuicTransportConfig::builder()
273                .max_concurrent_bidi_streams(128u32.into())
274                .build();
275            builder = builder.transport_config(transport);
276        }
277
278        // Bind address(es).
279        for addr_str in &opts.networking.bind_addrs {
280            let sock: std::net::SocketAddr = addr_str.parse().map_err(|e| {
281                crate::CoreError::invalid_input(format!("invalid bind address \"{addr_str}\": {e}"))
282            })?;
283            builder = builder.bind_addr(sock).map_err(|e| {
284                crate::CoreError::invalid_input(format!("bind address \"{addr_str}\": {e}"))
285            })?;
286        }
287
288        // Proxy configuration.
289        if let Some(ref proxy) = opts.networking.proxy_url {
290            let url: url::Url = proxy
291                .parse()
292                .map_err(|e| crate::CoreError::invalid_input(format!("invalid proxy URL: {e}")))?;
293            builder = builder.proxy_url(url);
294        } else if opts.networking.proxy_from_env {
295            builder = builder.proxy_from_env();
296        }
297
298        // TLS keylog for Wireshark debugging.
299        if opts.keylog {
300            builder = builder.keylog(true);
301        }
302
303        let ep = builder.bind().await.map_err(classify_bind_error)?;
304
305        let node_id_str = crate::base32_encode(ep.id().as_bytes());
306
307        let store_config = StoreConfig {
308            channel_capacity: opts
309                .streaming
310                .channel_capacity
311                .unwrap_or(crate::stream::DEFAULT_CHANNEL_CAPACITY)
312                .max(1),
313            max_chunk_size: opts
314                .streaming
315                .max_chunk_size_bytes
316                .unwrap_or(crate::stream::DEFAULT_MAX_CHUNK_SIZE)
317                .max(1),
318            drain_timeout: Duration::from_millis(
319                opts.streaming
320                    .drain_timeout_ms
321                    .unwrap_or(crate::stream::DEFAULT_DRAIN_TIMEOUT_MS),
322            ),
323            max_handles: crate::stream::DEFAULT_MAX_HANDLES,
324            ttl: Duration::from_millis(
325                opts.streaming
326                    .handle_ttl_ms
327                    .unwrap_or(crate::stream::DEFAULT_SLAB_TTL_MS),
328            ),
329        };
330        let sweep_ttl = store_config.ttl;
331        let sweep_interval = Duration::from_millis(
332            opts.streaming
333                .sweep_interval_ms
334                .unwrap_or(crate::stream::DEFAULT_SWEEP_INTERVAL_MS),
335        );
336        let (closed_tx, closed_rx) = tokio::sync::watch::channel(false);
337        let (event_tx, event_rx) = tokio::sync::mpsc::channel::<crate::events::TransportEvent>(256);
338
339        let inner = Arc::new(EndpointInner {
340            ep,
341            node_id_str,
342            pool: ConnectionPool::new(
343                opts.pool.max_connections,
344                opts.pool
345                    .idle_timeout_ms
346                    .map(std::time::Duration::from_millis),
347                Some(event_tx.clone()),
348            ),
349            // ISS-020: treat 0 as "use default" — it would otherwise underflow
350            // the hyper minimum (ISS-001).  None also defaults to 64 KB.
351            max_header_size: match opts.max_header_size {
352                None | Some(0) => 64 * 1024,
353                Some(n) => n,
354            },
355            server_limits: {
356                let mut sl = opts.server_limits.clone();
357                if sl.max_consecutive_errors.is_none() {
358                    sl.max_consecutive_errors = Some(5);
359                }
360                sl
361            },
362            handles: HandleStore::new(store_config),
363            serve_handle: std::sync::Mutex::new(None),
364            serve_done_rx: std::sync::Mutex::new(None),
365            closed_tx,
366            closed_rx,
367            active_connections: Arc::new(AtomicUsize::new(0)),
368            active_requests: Arc::new(AtomicUsize::new(0)),
369            event_tx,
370            event_rx: std::sync::Mutex::new(Some(event_rx)),
371            path_subs: dashmap::DashMap::new(),
372            #[cfg(feature = "compression")]
373            compression: opts.compression,
374        });
375
376        // Start per-endpoint sweep task (held alive via Weak reference).
377        if !sweep_ttl.is_zero() {
378            let weak = Arc::downgrade(&inner);
379            tokio::spawn(async move {
380                let mut ticker = tokio::time::interval(sweep_interval);
381                loop {
382                    ticker.tick().await;
383                    let Some(inner) = weak.upgrade() else {
384                        break;
385                    };
386                    inner.handles.sweep(sweep_ttl);
387                    drop(inner); // release strong ref between ticks
388                }
389            });
390        }
391
392        Ok(Self { inner })
393    }
394
395    /// The node's public key as a lowercase base32 string.
396    pub fn node_id(&self) -> &str {
397        &self.inner.node_id_str
398    }
399
400    /// The configured consecutive-error limit for the serve loop.
401    pub fn max_consecutive_errors(&self) -> usize {
402        self.inner.server_limits.max_consecutive_errors.unwrap_or(5)
403    }
404
405    /// Immediately run a TTL sweep on all handle registries, evicting any
406    /// entries whose TTL has expired.
407    ///
408    /// The background sweep task already runs this automatically on its
409    /// configured interval. `sweep_now()` is provided for test fixtures and
410    /// short-lived endpoints that cannot wait for the next tick.
411    ///
412    /// Returns immediately if the endpoint was created with `handle_ttl_ms: Some(0)`
413    /// (sweeping disabled).
414    pub fn sweep_now(&self) {
415        let ttl = self.inner.handles.config.ttl;
416        if !ttl.is_zero() {
417            self.inner.handles.sweep(ttl);
418        }
419    }
420
421    /// Build a [`ServeOptions`] from the endpoint's stored configuration.
422    ///
423    /// Platform adapters should call this instead of constructing `ServeOptions`
424    /// manually so that all server-limit fields are forwarded consistently.
425    pub fn serve_options(&self) -> crate::server::ServeOptions {
426        self.inner.server_limits.clone()
427    }
428
429    /// The node's raw secret key bytes (32 bytes).
430    ///
431    /// This is the Ed25519 private key that establishes the node's cryptographic identity.
432    /// Use it **only** to persist and later restore the key via `NodeOptions::secret_key`.
433    ///
434    /// # Security
435    ///
436    /// **These 32 bytes are the irrecoverable private key for this node.**
437    /// Anyone who obtains them can impersonate this node permanently — there is no revocation.
438    ///
439    /// - **Never log, print, or include in error payloads.** Debug formatters, tracing
440    ///   spans, and generic error handlers are common accidental leak vectors.
441    /// - **Encrypt at rest.** Store in a secrets vault or OS keychain, not in
442    ///   plaintext config files or databases.
443    /// - **Zeroize after use.** Call `zeroize::Zeroize::zeroize()` on the returned
444    ///   array (or use a `secrecy`/`zeroize` wrapper) once you have persisted the bytes
445    ///   to an encrypted store. The returned `[u8; 32]` is NOT automatically zeroed on drop.
446    /// - **Never include in network responses, crash dumps, or analytics.**
447    #[must_use]
448    pub fn secret_key_bytes(&self) -> [u8; 32] {
449        self.inner.ep.secret_key().to_bytes()
450    }
451
452    /// Graceful close: signal the serve loop to stop accepting, wait for
453    /// in-flight requests to drain (up to the configured drain timeout),
454    /// then close the QUIC endpoint.
455    ///
456    /// If no serve loop is running, closes the endpoint immediately.
457    /// The handle store (all registries) is freed when the last `IrohEndpoint`
458    /// clone is dropped — no explicit unregister is needed.
459    pub async fn close(&self) {
460        // ISS-027: drain in-flight requests *before* dropping so that request
461        // handlers can still access their reader/writer/trailer handles
462        // during the drain window.
463        let handle = self
464            .inner
465            .serve_handle
466            .lock()
467            .unwrap_or_else(|e| e.into_inner())
468            .take();
469        if let Some(h) = handle {
470            h.drain().await;
471        }
472        self.inner.ep.close().await;
473        let _ = self.inner.closed_tx.send(true);
474    }
475
476    /// Immediate close: abort the serve loop and close the endpoint with
477    /// no drain period.
478    pub async fn close_force(&self) {
479        let handle = self
480            .inner
481            .serve_handle
482            .lock()
483            .unwrap_or_else(|e| e.into_inner())
484            .take();
485        if let Some(h) = handle {
486            h.abort();
487        }
488        self.inner.ep.close().await;
489        let _ = self.inner.closed_tx.send(true);
490    }
491
492    /// Wait until this endpoint has been closed (either explicitly via `close()` /
493    /// `close_force()`, or because the native QUIC stack shut down).
494    ///
495    /// Returns immediately if already closed.
496    pub async fn wait_closed(&self) {
497        let mut rx = self.inner.closed_rx.clone();
498        let _ = rx.wait_for(|v| *v).await;
499    }
500
501    /// Store a serve handle so that `close()` can drain it.
502    pub fn set_serve_handle(&self, handle: ServeHandle) {
503        *self
504            .inner
505            .serve_done_rx
506            .lock()
507            .unwrap_or_else(|e| e.into_inner()) = Some(handle.subscribe_done());
508        *self
509            .inner
510            .serve_handle
511            .lock()
512            .unwrap_or_else(|e| e.into_inner()) = Some(handle);
513    }
514
515    /// Signal the serve loop to stop accepting new connections.
516    ///
517    /// Returns immediately — does NOT close the endpoint or drain in-flight
518    /// requests.  The handle is preserved so `close()` can still drain later.
519    pub fn stop_serve(&self) {
520        if let Some(h) = self
521            .inner
522            .serve_handle
523            .lock()
524            .unwrap_or_else(|e| e.into_inner())
525            .as_ref()
526        {
527            h.shutdown();
528        }
529    }
530
531    /// Wait until the serve loop has fully exited (serve task drained and finished).
532    ///
533    /// Returns immediately if `serve()` was never called.
534    pub async fn wait_serve_stop(&self) {
535        let rx = self
536            .inner
537            .serve_done_rx
538            .lock()
539            .unwrap_or_else(|e| e.into_inner())
540            .clone();
541        if let Some(mut rx) = rx {
542            // wait_for returns Err only if the sender is dropped; being dropped
543            // also means the task has exited, so treat both outcomes as "done".
544            let _ = rx.wait_for(|v| *v).await;
545        }
546    }
547
548    pub fn raw(&self) -> &Endpoint {
549        &self.inner.ep
550    }
551
552    /// Per-endpoint handle store.
553    pub fn handles(&self) -> &HandleStore {
554        &self.inner.handles
555    }
556
557    /// Maximum byte size of an HTTP/1.1 head.
558    pub fn max_header_size(&self) -> usize {
559        self.inner.max_header_size
560    }
561
562    /// Maximum decompressed response-body bytes accepted per outgoing fetch.
563    pub fn max_response_body_bytes(&self) -> usize {
564        self.inner
565            .server_limits
566            .max_response_body_bytes
567            .unwrap_or(crate::server::DEFAULT_MAX_RESPONSE_BODY_BYTES)
568    }
569
570    /// Access the connection pool.
571    pub(crate) fn pool(&self) -> &ConnectionPool {
572        &self.inner.pool
573    }
574
575    /// Snapshot of current endpoint statistics.
576    ///
577    /// All fields are point-in-time reads and may change between calls.
578    pub fn endpoint_stats(&self) -> EndpointStats {
579        let (active_readers, active_writers, active_sessions, total_handles) =
580            self.inner.handles.count_handles();
581        let pool_size = self.inner.pool.entry_count_approx();
582        let active_connections = self.inner.active_connections.load(Ordering::Relaxed);
583        let active_requests = self.inner.active_requests.load(Ordering::Relaxed);
584        EndpointStats {
585            active_readers,
586            active_writers,
587            active_sessions,
588            total_handles,
589            pool_size,
590            active_connections,
591            active_requests,
592        }
593    }
594
595    /// Compression options, if the `compression` feature is enabled.
596    #[cfg(feature = "compression")]
597    pub fn compression(&self) -> Option<&CompressionOptions> {
598        self.inner.compression.as_ref()
599    }
600
601    /// Returns the local socket addresses this endpoint is bound to.
602    pub fn bound_sockets(&self) -> Vec<std::net::SocketAddr> {
603        self.inner.ep.bound_sockets()
604    }
605
606    /// Full node address: node ID + relay URL(s) + direct socket addresses.
607    pub fn node_addr(&self) -> NodeAddrInfo {
608        let addr = self.inner.ep.addr();
609        let mut addrs = Vec::new();
610        for relay in addr.relay_urls() {
611            addrs.push(relay.to_string());
612        }
613        for da in addr.ip_addrs() {
614            addrs.push(da.to_string());
615        }
616        NodeAddrInfo {
617            id: self.inner.node_id_str.clone(),
618            addrs,
619        }
620    }
621
622    /// Home relay URL, or `None` if not connected to a relay.
623    pub fn home_relay(&self) -> Option<String> {
624        self.inner
625            .ep
626            .addr()
627            .relay_urls()
628            .next()
629            .map(|u| u.to_string())
630    }
631
632    /// Known addresses for a remote peer, or `None` if not in the endpoint's cache.
633    pub async fn peer_info(&self, node_id_b32: &str) -> Option<NodeAddrInfo> {
634        let bytes = crate::base32_decode(node_id_b32).ok()?;
635        let arr: [u8; 32] = bytes.try_into().ok()?;
636        let pk = iroh::PublicKey::from_bytes(&arr).ok()?;
637        let info = self.inner.ep.remote_info(pk).await?;
638        let id = crate::base32_encode(info.id().as_bytes());
639        let mut addrs = Vec::new();
640        for a in info.addrs() {
641            match a.addr() {
642                iroh::TransportAddr::Ip(sock) => addrs.push(sock.to_string()),
643                iroh::TransportAddr::Relay(url) => addrs.push(url.to_string()),
644                other => addrs.push(format!("{:?}", other)),
645            }
646        }
647        Some(NodeAddrInfo { id, addrs })
648    }
649
650    /// Per-peer connection statistics.
651    ///
652    /// Returns path information for each known transport address, including
653    /// whether each path is via a relay or direct, and which is active.
654    pub async fn peer_stats(&self, node_id_b32: &str) -> Option<PeerStats> {
655        let bytes = crate::base32_decode(node_id_b32).ok()?;
656        let arr: [u8; 32] = bytes.try_into().ok()?;
657        let pk = iroh::PublicKey::from_bytes(&arr).ok()?;
658        let info = self.inner.ep.remote_info(pk).await?;
659
660        let mut paths = Vec::new();
661        let mut has_active_relay = false;
662        let mut active_relay_url: Option<String> = None;
663
664        for a in info.addrs() {
665            let is_relay = a.addr().is_relay();
666            let is_active = matches!(a.usage(), TransportAddrUsage::Active);
667
668            let addr_str = match a.addr() {
669                iroh::TransportAddr::Ip(sock) => sock.to_string(),
670                iroh::TransportAddr::Relay(url) => {
671                    if is_active {
672                        has_active_relay = true;
673                        active_relay_url = Some(url.to_string());
674                    }
675                    url.to_string()
676                }
677                other => format!("{:?}", other),
678            };
679
680            paths.push(PathInfo {
681                relay: is_relay,
682                addr: addr_str,
683                active: is_active,
684            });
685        }
686
687        // Enrich with QUIC connection-level stats if a pooled connection exists.
688        let (rtt_ms, bytes_sent, bytes_received, lost_packets, sent_packets, congestion_window) =
689            if let Some(pooled) = self.inner.pool.get_existing(pk, crate::ALPN).await {
690                let s = pooled.conn.stats();
691                let rtt = pooled.conn.rtt(iroh::endpoint::PathId::ZERO);
692                (
693                    rtt.map(|d| d.as_secs_f64() * 1000.0),
694                    Some(s.udp_tx.bytes),
695                    Some(s.udp_rx.bytes),
696                    None, // quinn path stats not exposed via iroh ConnectionStats
697                    None, // quinn path stats not exposed via iroh ConnectionStats
698                    None, // quinn path stats not exposed via iroh ConnectionStats
699                )
700            } else {
701                (None, None, None, None, None, None)
702            };
703
704        Some(PeerStats {
705            relay: has_active_relay,
706            relay_url: active_relay_url,
707            paths,
708            rtt_ms,
709            bytes_sent,
710            bytes_received,
711            lost_packets,
712            sent_packets,
713            congestion_window,
714        })
715    }
716
717    /// Take the transport event receiver, handing it off to a platform drain task.
718    ///
719    /// May only be called once per endpoint.  The drain task owns the receiver and
720    /// loops until `event_tx` is dropped (i.e. the endpoint closes).  Returns `None`
721    /// if the receiver was already taken (i.e. `subscribe_events` was called before).
722    pub fn subscribe_events(
723        &self,
724    ) -> Option<tokio::sync::mpsc::Receiver<crate::events::TransportEvent>> {
725        self.inner
726            .event_rx
727            .lock()
728            .unwrap_or_else(|e| e.into_inner())
729            .take()
730    }
731
732    /// Subscribe to path changes for a specific peer.
733    ///
734    /// Spawns a background watcher task the first time a given peer is subscribed.
735    /// The watcher polls `peer_stats()` every 200 ms and emits on the returned
736    /// channel whenever the active path changes.
737    pub fn subscribe_path_changes(
738        &self,
739        node_id_str: &str,
740    ) -> tokio::sync::mpsc::UnboundedReceiver<PathInfo> {
741        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
742        // Replace any existing sender; old watcher exits when it detects is_closed().
743        self.inner.path_subs.insert(node_id_str.to_string(), tx);
744
745        let ep = self.clone();
746        let nid = node_id_str.to_string();
747        let event_tx = self.inner.event_tx.clone();
748
749        tokio::spawn(async move {
750            let mut last_key: Option<String> = None;
751            let mut closed_rx = ep.inner.closed_rx.clone();
752            loop {
753                // Exit immediately if the endpoint has been closed.
754                if *closed_rx.borrow() {
755                    ep.inner.path_subs.remove(&nid);
756                    break;
757                }
758                let is_closed = ep
759                    .inner
760                    .path_subs
761                    .get(&nid)
762                    .map(|s| s.is_closed())
763                    .unwrap_or(true);
764                if is_closed {
765                    ep.inner.path_subs.remove(&nid);
766                    break;
767                }
768
769                if let Some(stats) = ep.peer_stats(&nid).await {
770                    if let Some(active) = stats.paths.iter().find(|p| p.active) {
771                        let key = format!("{}:{}", active.relay, active.addr);
772                        if Some(&key) != last_key.as_ref() {
773                            last_key = Some(key);
774                            if let Some(sender) = ep.inner.path_subs.get(&nid) {
775                                let _ = sender.send(active.clone());
776                            }
777                            let _ = event_tx.try_send(crate::events::TransportEvent::path_change(
778                                &nid,
779                                &active.addr,
780                                active.relay,
781                            ));
782                        }
783                    }
784                }
785
786                // Sleep 200 ms, but wake early if the endpoint is being closed.
787                tokio::select! {
788                    _ = tokio::time::sleep(std::time::Duration::from_millis(200)) => {}
789                    result = closed_rx.wait_for(|v| *v) => {
790                        let _ = result;
791                        ep.inner.path_subs.remove(&nid);
792                        break;
793                    }
794                }
795            }
796        });
797
798        rx
799    }
800}
801
802// ── Bind-error classification ────────────────────────────────────────────────
803
804/// Classify a bind error into a prefixed string for the JS error mapper.
805fn classify_bind_error(e: impl std::fmt::Display) -> crate::CoreError {
806    let msg = e.to_string();
807    crate::CoreError::connection_failed(msg)
808}
809
810// ── NodeAddr info ────────────────────────────────────────────────────────────
811
812/// Serialisable node address: node ID + relay and direct addresses.
813#[derive(Debug, Clone, Serialize, Deserialize)]
814pub struct NodeAddrInfo {
815    /// Base32-encoded public key.
816    pub id: String,
817    /// Relay URLs and/or `ip:port` direct addresses.
818    pub addrs: Vec<String>,
819}
820
821// ── Observability types ──────────────────────────────────────────────────────
822
823/// Endpoint-level observability snapshot.
824///
825/// Returned by [`IrohEndpoint::endpoint_stats`].  All counts are point-in-time reads
826/// and may change between calls.
827#[derive(Debug, Clone, Serialize, Deserialize, Default)]
828#[serde(rename_all = "camelCase")]
829pub struct EndpointStats {
830    /// Number of currently open body reader handles.
831    pub active_readers: usize,
832    /// Number of currently open body writer handles.
833    pub active_writers: usize,
834    /// Number of live QUIC sessions (WebTransport connections).
835    pub active_sessions: usize,
836    /// Total number of allocated (reader + writer + session + other) handles.
837    pub total_handles: usize,
838    /// Number of QUIC connections currently cached in the connection pool.
839    pub pool_size: u64,
840    /// Number of live QUIC connections accepted by the serve loop.
841    pub active_connections: usize,
842    /// Number of HTTP requests currently being processed.
843    pub active_requests: usize,
844}
845
846/// A connection lifecycle event fired when a QUIC peer connection opens or closes.
847#[derive(Debug, Clone, Serialize, Deserialize)]
848#[serde(rename_all = "camelCase")]
849pub struct ConnectionEvent {
850    /// Base32-encoded public key of the peer.
851    pub peer_id: String,
852    /// `true` when this is the first connection from the peer (0→1), `false` when the last one closes (1→0).
853    pub connected: bool,
854}
855
856/// Per-peer connection statistics.
857#[derive(Debug, Clone, Serialize, Deserialize)]
858pub struct PeerStats {
859    /// Whether the peer is connected via a relay server (vs direct).
860    pub relay: bool,
861    /// Active relay URL, if any.
862    pub relay_url: Option<String>,
863    /// All known paths to this peer.
864    pub paths: Vec<PathInfo>,
865    /// Round-trip time in milliseconds.  `None` if no active QUIC connection is pooled.
866    pub rtt_ms: Option<f64>,
867    /// Total UDP bytes sent to this peer.  `None` if no active QUIC connection is pooled.
868    pub bytes_sent: Option<u64>,
869    /// Total UDP bytes received from this peer.  `None` if no active QUIC connection is pooled.
870    pub bytes_received: Option<u64>,
871    /// Total packets lost on the QUIC path.  `None` if no active QUIC connection is pooled.
872    pub lost_packets: Option<u64>,
873    /// Total packets sent on the QUIC path.  `None` if no active QUIC connection is pooled.
874    pub sent_packets: Option<u64>,
875    /// Current congestion window in bytes.  `None` if no active QUIC connection is pooled.
876    pub congestion_window: Option<u64>,
877}
878
879/// Network path information for a single transport address.
880#[derive(Debug, Clone, Serialize, Deserialize)]
881pub struct PathInfo {
882    /// Whether this path goes through a relay server.
883    pub relay: bool,
884    /// The relay URL (if relay) or `ip:port` (if direct).
885    pub addr: String,
886    /// Whether this is the currently selected/active path.
887    pub active: bool,
888}
889
890/// Parse an optional list of socket address strings into `SocketAddr` values.
891///
892/// Returns `Err` if any string cannot be parsed as a `host:port` address so
893/// that callers can surface misconfiguration rather than silently ignoring it.
894pub fn parse_direct_addrs(
895    addrs: &Option<Vec<String>>,
896) -> Result<Option<Vec<std::net::SocketAddr>>, String> {
897    match addrs {
898        None => Ok(None),
899        Some(v) => {
900            let mut out = Vec::with_capacity(v.len());
901            for s in v {
902                let addr = s
903                    .parse::<std::net::SocketAddr>()
904                    .map_err(|e| format!("invalid direct address {s:?}: {e}"))?;
905                out.push(addr);
906            }
907            Ok(Some(out))
908        }
909    }
910}