Skip to main content

iroh_http_core/
endpoint.rs

1//! Iroh endpoint lifecycle — create, share, and close.
2
3use iroh::address_lookup::{DnsAddressLookup, PkarrPublisher};
4use iroh::endpoint::{Builder, IdleTimeout, QuicTransportConfig, TransportAddrUsage};
5use iroh::{Endpoint, RelayMode, SecretKey};
6use serde::{Deserialize, Serialize};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use crate::pool::ConnectionPool;
12use crate::server::ServeHandle;
13use crate::stream::{HandleStore, StoreConfig};
14use crate::{ALPN, ALPN_DUPLEX};
15
16/// Networking / QUIC transport configuration.
17#[derive(Debug, Clone, Default)]
18pub struct NetworkingOptions {
19    /// Relay server mode. `"default"`, `"staging"`, `"disabled"`, or `"custom"`. Default: `"default"`.
20    pub relay_mode: Option<String>,
21    /// Custom relay server URLs. Only used when `relay_mode` is `"custom"`.
22    pub relays: Vec<String>,
23    /// UDP socket addresses to bind. Empty means OS-assigned.
24    pub bind_addrs: Vec<String>,
25    /// Milliseconds before an idle QUIC connection is cleaned up.
26    pub idle_timeout_ms: Option<u64>,
27    /// HTTP proxy URL for relay traffic.
28    pub proxy_url: Option<String>,
29    /// Read `HTTP_PROXY` / `HTTPS_PROXY` env vars for proxy config.
30    pub proxy_from_env: bool,
31    /// Disable relay servers and DNS discovery entirely. Overrides `relay_mode`.
32    /// Useful for in-process tests where endpoints connect via direct addresses.
33    pub disabled: bool,
34}
35
36/// DNS-based peer discovery configuration.
37#[derive(Debug, Clone)]
38pub struct DiscoveryOptions {
39    /// DNS discovery server URL. Uses n0 DNS defaults when `None`.
40    pub dns_server: Option<String>,
41    /// Whether to enable DNS discovery. Default: `true`.
42    pub enabled: bool,
43}
44
45impl Default for DiscoveryOptions {
46    fn default() -> Self {
47        Self {
48            dns_server: None,
49            enabled: true,
50        }
51    }
52}
53
54/// Connection-pool tuning.
55#[derive(Debug, Clone, Default)]
56pub struct PoolOptions {
57    /// Maximum number of idle connections to keep in the pool.
58    pub max_connections: Option<usize>,
59    /// Milliseconds a pooled connection may remain idle before being evicted.
60    pub idle_timeout_ms: Option<u64>,
61}
62
63/// Body-streaming and handle-store configuration.
64#[derive(Debug, Clone, Default)]
65pub struct StreamingOptions {
66    /// Capacity (in chunks) of each body channel. Default: 32.
67    pub channel_capacity: Option<usize>,
68    /// Maximum byte length of a single chunk in `send_chunk`. Default: 65536.
69    pub max_chunk_size_bytes: Option<usize>,
70    /// Milliseconds to wait for a slow body reader. Default: 30000.
71    pub drain_timeout_ms: Option<u64>,
72    /// TTL in ms for slab handle entries. `0` disables sweeping. Default: 300000.
73    pub handle_ttl_ms: Option<u64>,
74    /// How often (in ms) the TTL sweep task runs. Default: 60000 (60 s).
75    /// Reducing this lowers the worst-case leaked-handle window at the cost of
76    /// more frequent write-lock acquisitions on every handle registry.
77    /// Useful for short-lived endpoints and test fixtures.
78    pub sweep_interval_ms: Option<u64>,
79}
80
81/// Configuration passed to [`IrohEndpoint::bind`].
82#[derive(Debug, Clone, Default)]
83pub struct NodeOptions {
84    /// 32-byte Ed25519 secret key. Generate a fresh one when `None`.
85    pub key: Option<[u8; 32]>,
86    /// Networking / QUIC transport configuration.
87    pub networking: NetworkingOptions,
88    /// DNS-based peer discovery configuration.
89    pub discovery: DiscoveryOptions,
90    /// Connection-pool tuning.
91    pub pool: PoolOptions,
92    /// Body-streaming and handle-store configuration.
93    pub streaming: StreamingOptions,
94    /// ALPN capabilities to advertise.
95    ///
96    /// Valid values: [`ALPN_STR`](crate::ALPN_STR) (`"iroh-http/2"`) and
97    /// [`ALPN_DUPLEX_STR`](crate::ALPN_DUPLEX_STR) (`"iroh-http/2-duplex"`).
98    ///
99    /// When empty (the default), both protocols are advertised. When non-empty,
100    /// the base protocol (`iroh-http/2`) is automatically injected if not
101    /// already present. Unknown values cause [`IrohEndpoint::bind`] to return
102    /// an error.
103    pub capabilities: Vec<String>,
104    /// Write TLS session keys to $SSLKEYLOGFILE. Dev/debug only.
105    pub keylog: bool,
106    /// Maximum byte size of the HTTP/1.1 request or response head.
107    /// `None` = 65536.  `Some(0)` is rejected.
108    pub max_header_size: Option<usize>,
109    /// Maximum decompressed response body bytes the client will accept per
110    /// outgoing `fetch()`.  Default: 256 MiB.  Protects against compression
111    /// bombs from malicious peers.
112    pub max_response_body_bytes: Option<usize>,
113    #[cfg(feature = "compression")]
114    pub compression: Option<CompressionOptions>,
115}
116
117/// Compression options for response bodies.
118/// Only used when the `compression` feature is enabled.
119#[cfg(feature = "compression")]
120#[derive(Debug, Clone)]
121pub struct CompressionOptions {
122    /// Minimum body size in bytes before compression is applied. Default: 512.
123    pub min_body_bytes: usize,
124    /// Zstd compression level (1–22). `None` uses the zstd default (3).
125    pub level: Option<u32>,
126}
127
128/// A shared Iroh endpoint.
129///
130/// Clone-able (cheap Arc clone).  All fetch and serve calls on the same node
131/// share one endpoint and therefore one stable QUIC identity.
132#[derive(Clone)]
133pub struct IrohEndpoint {
134    pub(crate) inner: Arc<EndpointInner>,
135}
136
137pub(crate) struct EndpointInner {
138    pub ep: Endpoint,
139    /// The node's own base32-encoded public key (stable for the lifetime of the key).
140    pub node_id_str: String,
141    /// Connection pool for reusing QUIC connections across fetch/connect calls.
142    pub pool: ConnectionPool,
143    /// Maximum byte size of an HTTP/1.1 head (request or response).
144    pub max_header_size: usize,
145    /// Maximum decompressed response body bytes per fetch.  Default: 256 MiB.
146    pub max_response_body_bytes: usize,
147    /// Per-endpoint handle store — owns all body readers, writers,
148    /// sessions, request-head channels, and fetch-cancel tokens.
149    pub handles: HandleStore,
150    /// Active serve handle, if `serve()` has been called.
151    pub serve_handle: std::sync::Mutex<Option<ServeHandle>>,
152    /// Done-signal receiver from the active serve task.
153    /// Stored separately so `wait_serve_stop()` can await it without holding
154    /// the `serve_handle` lock for the duration of the wait.
155    pub serve_done_rx: std::sync::Mutex<Option<tokio::sync::watch::Receiver<bool>>>,
156    /// Signals `true` when the endpoint has fully closed (either explicitly or
157    /// because the serve loop exited due to native shutdown).
158    pub closed_tx: tokio::sync::watch::Sender<bool>,
159    pub closed_rx: tokio::sync::watch::Receiver<bool>,
160    /// Number of currently active QUIC connections (incremented by serve loop,
161    /// decremented via RAII guard when each connection task exits).
162    pub active_connections: Arc<AtomicUsize>,
163    /// Number of currently in-flight HTTP requests (incremented when a
164    /// bi-stream is accepted, decremented when the request task exits).
165    pub active_requests: Arc<AtomicUsize>,
166    /// Sender for transport-level events (pool hits/misses, path changes, sweep).
167    pub event_tx: tokio::sync::mpsc::Sender<crate::events::TransportEvent>,
168    /// Receiver for transport-level events.  Wrapped in Mutex+Option so
169    /// `subscribe_events()` can take it exactly once for the platform drain task.
170    pub event_rx:
171        std::sync::Mutex<Option<tokio::sync::mpsc::Receiver<crate::events::TransportEvent>>>,
172    /// Per-peer path-change subscriptions.
173    /// Key: node_id_str. Populated lazily when `subscribe_path_changes` is called.
174    pub path_subs: dashmap::DashMap<String, tokio::sync::mpsc::UnboundedSender<PathInfo>>,
175    /// Body compression options, if the feature is enabled.
176    #[cfg(feature = "compression")]
177    pub compression: Option<CompressionOptions>,
178}
179
180impl IrohEndpoint {
181    /// Bind an Iroh endpoint with the supplied options.
182    pub async fn bind(opts: NodeOptions) -> Result<Self, crate::CoreError> {
183        // Validate: if networking is disabled, relay_mode should not be explicitly set to a
184        // network-requiring mode.
185        if opts.networking.disabled
186            && opts
187                .networking
188                .relay_mode
189                .as_deref()
190                .is_some_and(|m| !matches!(m, "disabled"))
191        {
192            return Err(crate::CoreError::invalid_input(
193                "networking.disabled is true but relay_mode is set to a non-disabled value; \
194                 set relay_mode to \"disabled\" or omit it when networking.disabled is true",
195            ));
196        }
197
198        let relay_mode = if opts.networking.disabled {
199            RelayMode::Disabled
200        } else {
201            match opts.networking.relay_mode.as_deref() {
202                None | Some("default") => RelayMode::Default,
203                Some("staging") => RelayMode::Staging,
204                Some("disabled") => RelayMode::Disabled,
205                Some("custom") => {
206                    if opts.networking.relays.is_empty() {
207                        return Err(crate::CoreError::invalid_input(
208                            "relay_mode \"custom\" requires at least one URL in `relays`",
209                        ));
210                    }
211                    let urls = opts
212                        .networking
213                        .relays
214                        .iter()
215                        .map(|u| {
216                            u.parse::<iroh::RelayUrl>()
217                                .map_err(crate::CoreError::invalid_input)
218                        })
219                        .collect::<Result<Vec<_>, _>>()?;
220                    RelayMode::custom(urls)
221                }
222                Some(other) => {
223                    return Err(crate::CoreError::invalid_input(format!(
224                        "unknown relay_mode: {other}"
225                    )))
226                }
227            }
228        };
229
230        // Validate capabilities against known ALPN values.
231        for cap in &opts.capabilities {
232            if !crate::KNOWN_ALPNS.contains(&cap.as_str()) {
233                return Err(crate::CoreError::invalid_input(format!(
234                    "unknown ALPN capability: \"{cap}\"; valid values are {:?}",
235                    crate::KNOWN_ALPNS,
236                )));
237            }
238        }
239
240        // Validate compression level range (zstd accepts 1–22).
241        #[cfg(feature = "compression")]
242        if let Some(level) = opts.compression.as_ref().and_then(|c| c.level) {
243            if !(1..=22).contains(&level) {
244                return Err(crate::CoreError::invalid_input(format!(
245                    "compression level must be 1–22, got {level}"
246                )));
247            }
248        }
249
250        let alpns: Vec<Vec<u8>> = if opts.capabilities.is_empty() {
251            // Advertise both ALPN variants.
252            vec![ALPN_DUPLEX.to_vec(), ALPN.to_vec()]
253        } else {
254            let mut list: Vec<Vec<u8>> = opts
255                .capabilities
256                .iter()
257                .map(|c| c.as_bytes().to_vec())
258                .collect();
259            // Always include the base protocol so the node can talk to base-only peers.
260            if !list.iter().any(|a| a == ALPN) {
261                list.push(ALPN.to_vec());
262            }
263            list
264        };
265
266        let mut builder = Builder::new(iroh::endpoint::presets::Minimal)
267            .relay_mode(relay_mode)
268            .alpns(alpns);
269
270        // DNS discovery (enabled by default unless networking.disabled).
271        if !opts.networking.disabled && opts.discovery.enabled {
272            if let Some(ref url_str) = opts.discovery.dns_server {
273                let url: url::Url = url_str.parse().map_err(|e| {
274                    crate::CoreError::invalid_input(format!("invalid dns_discovery URL: {e}"))
275                })?;
276                builder = builder
277                    .address_lookup(PkarrPublisher::builder(url.clone()))
278                    .address_lookup(DnsAddressLookup::builder(
279                        url.host_str().unwrap_or_default().to_string(),
280                    ));
281            } else {
282                builder = builder
283                    .address_lookup(PkarrPublisher::n0_dns())
284                    .address_lookup(DnsAddressLookup::n0_dns());
285            }
286        }
287
288        if let Some(key_bytes) = opts.key {
289            builder = builder.secret_key(SecretKey::from_bytes(&key_bytes));
290        }
291
292        if let Some(ms) = opts.networking.idle_timeout_ms {
293            let timeout = IdleTimeout::try_from(Duration::from_millis(ms)).map_err(|e| {
294                crate::CoreError::invalid_input(format!("idle_timeout_ms out of range: {e}"))
295            })?;
296            let transport = QuicTransportConfig::builder()
297                .max_idle_timeout(Some(timeout))
298                // Limit inbound bidirectional streams per connection to bound
299                // slowloris-style resource exhaustion (P2-4).
300                .max_concurrent_bidi_streams(128u32.into())
301                .build();
302            builder = builder.transport_config(transport);
303        } else {
304            // Even without a custom idle timeout, cap concurrent bidi streams.
305            let transport = QuicTransportConfig::builder()
306                .max_concurrent_bidi_streams(128u32.into())
307                .build();
308            builder = builder.transport_config(transport);
309        }
310
311        // Bind address(es).
312        for addr_str in &opts.networking.bind_addrs {
313            let sock: std::net::SocketAddr = addr_str.parse().map_err(|e| {
314                crate::CoreError::invalid_input(format!("invalid bind address \"{addr_str}\": {e}"))
315            })?;
316            builder = builder.bind_addr(sock).map_err(|e| {
317                crate::CoreError::invalid_input(format!("bind address \"{addr_str}\": {e}"))
318            })?;
319        }
320
321        // Proxy configuration.
322        if let Some(ref proxy) = opts.networking.proxy_url {
323            let url: url::Url = proxy
324                .parse()
325                .map_err(|e| crate::CoreError::invalid_input(format!("invalid proxy URL: {e}")))?;
326            builder = builder.proxy_url(url);
327        } else if opts.networking.proxy_from_env {
328            builder = builder.proxy_from_env();
329        }
330
331        // TLS keylog for Wireshark debugging.
332        if opts.keylog {
333            builder = builder.keylog(true);
334        }
335
336        let ep = builder.bind().await.map_err(classify_bind_error)?;
337
338        let node_id_str = crate::base32_encode(ep.id().as_bytes());
339
340        let store_config = StoreConfig {
341            channel_capacity: opts
342                .streaming
343                .channel_capacity
344                .unwrap_or(crate::stream::DEFAULT_CHANNEL_CAPACITY)
345                .max(1),
346            max_chunk_size: opts
347                .streaming
348                .max_chunk_size_bytes
349                .unwrap_or(crate::stream::DEFAULT_MAX_CHUNK_SIZE)
350                .max(1),
351            drain_timeout: Duration::from_millis(
352                opts.streaming
353                    .drain_timeout_ms
354                    .unwrap_or(crate::stream::DEFAULT_DRAIN_TIMEOUT_MS),
355            ),
356            max_handles: crate::stream::DEFAULT_MAX_HANDLES,
357            ttl: Duration::from_millis(
358                opts.streaming
359                    .handle_ttl_ms
360                    .unwrap_or(crate::stream::DEFAULT_SLAB_TTL_MS),
361            ),
362        };
363        let sweep_ttl = store_config.ttl;
364        let sweep_interval = Duration::from_millis(
365            opts.streaming
366                .sweep_interval_ms
367                .unwrap_or(crate::stream::DEFAULT_SWEEP_INTERVAL_MS),
368        );
369        let (closed_tx, closed_rx) = tokio::sync::watch::channel(false);
370        let (event_tx, event_rx) = tokio::sync::mpsc::channel::<crate::events::TransportEvent>(256);
371
372        let inner = Arc::new(EndpointInner {
373            ep,
374            node_id_str,
375            pool: ConnectionPool::new(
376                opts.pool.max_connections,
377                opts.pool
378                    .idle_timeout_ms
379                    .map(std::time::Duration::from_millis),
380                Some(event_tx.clone()),
381            ),
382            // ISS-020: treat 0 as an error — callers should use `None` for the default.
383            max_header_size: match opts.max_header_size {
384                Some(0) => {
385                    return Err(crate::CoreError::invalid_input(
386                        "max_header_size must be > 0; use None for the default (65536)",
387                    ));
388                }
389                None => 64 * 1024,
390                Some(n) => n,
391            },
392            max_response_body_bytes: opts
393                .max_response_body_bytes
394                .unwrap_or(crate::server::DEFAULT_MAX_RESPONSE_BODY_BYTES),
395            handles: HandleStore::new(store_config),
396            serve_handle: std::sync::Mutex::new(None),
397            serve_done_rx: std::sync::Mutex::new(None),
398            closed_tx,
399            closed_rx,
400            active_connections: Arc::new(AtomicUsize::new(0)),
401            active_requests: Arc::new(AtomicUsize::new(0)),
402            event_tx,
403            event_rx: std::sync::Mutex::new(Some(event_rx)),
404            path_subs: dashmap::DashMap::new(),
405            #[cfg(feature = "compression")]
406            compression: opts.compression,
407        });
408
409        // Start per-endpoint sweep task (held alive via Weak reference).
410        if !sweep_ttl.is_zero() {
411            let weak = Arc::downgrade(&inner);
412            tokio::spawn(async move {
413                let mut ticker = tokio::time::interval(sweep_interval);
414                loop {
415                    ticker.tick().await;
416                    let Some(inner) = weak.upgrade() else {
417                        break;
418                    };
419                    inner.handles.sweep(sweep_ttl);
420                    drop(inner); // release strong ref between ticks
421                }
422            });
423        }
424
425        Ok(Self { inner })
426    }
427
428    /// The node's public key as a lowercase base32 string.
429    pub fn node_id(&self) -> &str {
430        &self.inner.node_id_str
431    }
432
433    /// Immediately run a TTL sweep on all handle registries, evicting any
434    /// entries whose TTL has expired.
435    ///
436    /// The background sweep task already runs this automatically on its
437    /// configured interval. `sweep_now()` is provided for test fixtures and
438    /// short-lived endpoints that cannot wait for the next tick.
439    ///
440    /// Returns immediately if the endpoint was created with `handle_ttl_ms: Some(0)`
441    /// (sweeping disabled).
442    pub fn sweep_now(&self) {
443        let ttl = self.inner.handles.config.ttl;
444        if !ttl.is_zero() {
445            self.inner.handles.sweep(ttl);
446        }
447    }
448
449    /// The node's raw secret key bytes (32 bytes).
450    ///
451    /// This is the Ed25519 private key that establishes the node's cryptographic identity.
452    /// Use it **only** to persist and later restore the key via `NodeOptions::secret_key`.
453    ///
454    /// # Security
455    ///
456    /// **These 32 bytes are the irrecoverable private key for this node.**
457    /// Anyone who obtains them can impersonate this node permanently — there is no revocation.
458    ///
459    /// - **Never log, print, or include in error payloads.** Debug formatters, tracing
460    ///   spans, and generic error handlers are common accidental leak vectors.
461    /// - **Encrypt at rest.** Store in a secrets vault or OS keychain, not in
462    ///   plaintext config files or databases.
463    /// - **Zeroize after use.** Call `zeroize::Zeroize::zeroize()` on the returned
464    ///   array (or use a `secrecy`/`zeroize` wrapper) once you have persisted the bytes
465    ///   to an encrypted store. The returned `[u8; 32]` is NOT automatically zeroed on drop.
466    /// - **Never include in network responses, crash dumps, or analytics.**
467    #[must_use]
468    pub fn secret_key_bytes(&self) -> [u8; 32] {
469        self.inner.ep.secret_key().to_bytes()
470    }
471
472    /// Graceful close: signal the serve loop to stop accepting, wait for
473    /// in-flight requests to drain (up to the configured drain timeout),
474    /// then close the QUIC endpoint.
475    ///
476    /// If no serve loop is running, closes the endpoint immediately.
477    /// The handle store (all registries) is freed when the last `IrohEndpoint`
478    /// clone is dropped — no explicit unregister is needed.
479    pub async fn close(&self) {
480        // ISS-027: drain in-flight requests *before* dropping so that request
481        // handlers can still access their reader/writer/trailer handles
482        // during the drain window.
483        let handle = self
484            .inner
485            .serve_handle
486            .lock()
487            .unwrap_or_else(|e| e.into_inner())
488            .take();
489        if let Some(h) = handle {
490            h.drain().await;
491        }
492        self.inner.ep.close().await;
493        let _ = self.inner.closed_tx.send(true);
494    }
495
496    /// Immediate close: abort the serve loop and close the endpoint with
497    /// no drain period.
498    pub async fn close_force(&self) {
499        let handle = self
500            .inner
501            .serve_handle
502            .lock()
503            .unwrap_or_else(|e| e.into_inner())
504            .take();
505        if let Some(h) = handle {
506            h.abort();
507        }
508        self.inner.ep.close().await;
509        let _ = self.inner.closed_tx.send(true);
510    }
511
512    /// Wait until this endpoint has been closed (either explicitly via `close()` /
513    /// `close_force()`, or because the native QUIC stack shut down).
514    ///
515    /// Returns immediately if already closed.
516    pub async fn wait_closed(&self) {
517        let mut rx = self.inner.closed_rx.clone();
518        let _ = rx.wait_for(|v| *v).await;
519    }
520
521    /// Store a serve handle so that `close()` can drain it.
522    pub fn set_serve_handle(&self, handle: ServeHandle) {
523        *self
524            .inner
525            .serve_done_rx
526            .lock()
527            .unwrap_or_else(|e| e.into_inner()) = Some(handle.subscribe_done());
528        *self
529            .inner
530            .serve_handle
531            .lock()
532            .unwrap_or_else(|e| e.into_inner()) = Some(handle);
533    }
534
535    /// Signal the serve loop to stop accepting new connections.
536    ///
537    /// Returns immediately — does NOT close the endpoint or drain in-flight
538    /// requests.  The handle is preserved so `close()` can still drain later.
539    pub fn stop_serve(&self) {
540        if let Some(h) = self
541            .inner
542            .serve_handle
543            .lock()
544            .unwrap_or_else(|e| e.into_inner())
545            .as_ref()
546        {
547            h.shutdown();
548        }
549    }
550
551    /// Wait until the serve loop has fully exited (serve task drained and finished).
552    ///
553    /// Returns immediately if `serve()` was never called.
554    pub async fn wait_serve_stop(&self) {
555        let rx = self
556            .inner
557            .serve_done_rx
558            .lock()
559            .unwrap_or_else(|e| e.into_inner())
560            .clone();
561        if let Some(mut rx) = rx {
562            // wait_for returns Err only if the sender is dropped; being dropped
563            // also means the task has exited, so treat both outcomes as "done".
564            let _ = rx.wait_for(|v| *v).await;
565        }
566    }
567
568    pub fn raw(&self) -> &Endpoint {
569        &self.inner.ep
570    }
571
572    /// Per-endpoint handle store.
573    pub fn handles(&self) -> &HandleStore {
574        &self.inner.handles
575    }
576
577    /// Maximum byte size of an HTTP/1.1 head.
578    pub fn max_header_size(&self) -> usize {
579        self.inner.max_header_size
580    }
581
582    /// Maximum decompressed response-body bytes accepted per outgoing fetch.
583    pub fn max_response_body_bytes(&self) -> usize {
584        self.inner.max_response_body_bytes
585    }
586
587    /// Access the connection pool.
588    pub(crate) fn pool(&self) -> &ConnectionPool {
589        &self.inner.pool
590    }
591
592    /// Snapshot of current endpoint statistics.
593    ///
594    /// All fields are point-in-time reads and may change between calls.
595    pub fn endpoint_stats(&self) -> EndpointStats {
596        let (active_readers, active_writers, active_sessions, total_handles) =
597            self.inner.handles.count_handles();
598        let pool_size = self.inner.pool.entry_count_approx() as usize;
599        let active_connections = self.inner.active_connections.load(Ordering::Relaxed);
600        let active_requests = self.inner.active_requests.load(Ordering::Relaxed);
601        EndpointStats {
602            active_readers,
603            active_writers,
604            active_sessions,
605            total_handles,
606            pool_size,
607            active_connections,
608            active_requests,
609        }
610    }
611
612    /// Compression options, if the `compression` feature is enabled.
613    #[cfg(feature = "compression")]
614    pub fn compression(&self) -> Option<&CompressionOptions> {
615        self.inner.compression.as_ref()
616    }
617
618    /// Returns the local socket addresses this endpoint is bound to.
619    pub fn bound_sockets(&self) -> Vec<std::net::SocketAddr> {
620        self.inner.ep.bound_sockets()
621    }
622
623    /// Full node address: node ID + relay URL(s) + direct socket addresses.
624    pub fn node_addr(&self) -> NodeAddrInfo {
625        let addr = self.inner.ep.addr();
626        let mut addrs = Vec::new();
627        for relay in addr.relay_urls() {
628            addrs.push(relay.to_string());
629        }
630        for da in addr.ip_addrs() {
631            addrs.push(da.to_string());
632        }
633        NodeAddrInfo {
634            id: self.inner.node_id_str.clone(),
635            addrs,
636        }
637    }
638
639    /// Home relay URL, or `None` if not connected to a relay.
640    pub fn home_relay(&self) -> Option<String> {
641        self.inner
642            .ep
643            .addr()
644            .relay_urls()
645            .next()
646            .map(|u| u.to_string())
647    }
648
649    /// Known addresses for a remote peer, or `None` if not in the endpoint's cache.
650    pub async fn peer_info(&self, node_id_b32: &str) -> Option<NodeAddrInfo> {
651        let bytes = crate::base32_decode(node_id_b32).ok()?;
652        let arr: [u8; 32] = bytes.try_into().ok()?;
653        let pk = iroh::PublicKey::from_bytes(&arr).ok()?;
654        let info = self.inner.ep.remote_info(pk).await?;
655        let id = crate::base32_encode(info.id().as_bytes());
656        let mut addrs = Vec::new();
657        for a in info.addrs() {
658            match a.addr() {
659                iroh::TransportAddr::Ip(sock) => addrs.push(sock.to_string()),
660                iroh::TransportAddr::Relay(url) => addrs.push(url.to_string()),
661                other => addrs.push(format!("{:?}", other)),
662            }
663        }
664        Some(NodeAddrInfo { id, addrs })
665    }
666
667    /// Per-peer connection statistics.
668    ///
669    /// Returns path information for each known transport address, including
670    /// whether each path is via a relay or direct, and which is active.
671    pub async fn peer_stats(&self, node_id_b32: &str) -> Option<PeerStats> {
672        let bytes = crate::base32_decode(node_id_b32).ok()?;
673        let arr: [u8; 32] = bytes.try_into().ok()?;
674        let pk = iroh::PublicKey::from_bytes(&arr).ok()?;
675        let info = self.inner.ep.remote_info(pk).await?;
676
677        let mut paths = Vec::new();
678        let mut has_active_relay = false;
679        let mut active_relay_url: Option<String> = None;
680
681        for a in info.addrs() {
682            let is_relay = a.addr().is_relay();
683            let is_active = matches!(a.usage(), TransportAddrUsage::Active);
684
685            let addr_str = match a.addr() {
686                iroh::TransportAddr::Ip(sock) => sock.to_string(),
687                iroh::TransportAddr::Relay(url) => {
688                    if is_active {
689                        has_active_relay = true;
690                        active_relay_url = Some(url.to_string());
691                    }
692                    url.to_string()
693                }
694                other => format!("{:?}", other),
695            };
696
697            paths.push(PathInfo {
698                relay: is_relay,
699                addr: addr_str,
700                active: is_active,
701            });
702        }
703
704        // Enrich with QUIC connection-level stats if a pooled connection exists.
705        let (rtt_ms, bytes_sent, bytes_received, lost_packets, sent_packets, congestion_window) =
706            if let Some(pooled) = self.inner.pool.get_existing(pk, crate::ALPN).await {
707                let s = pooled.conn.stats();
708                let rtt = pooled.conn.rtt(iroh::endpoint::PathId::ZERO);
709                (
710                    rtt.map(|d| d.as_secs_f64() * 1000.0),
711                    Some(s.udp_tx.bytes),
712                    Some(s.udp_rx.bytes),
713                    None, // quinn path stats not exposed via iroh ConnectionStats
714                    None, // quinn path stats not exposed via iroh ConnectionStats
715                    None, // quinn path stats not exposed via iroh ConnectionStats
716                )
717            } else {
718                (None, None, None, None, None, None)
719            };
720
721        Some(PeerStats {
722            relay: has_active_relay,
723            relay_url: active_relay_url,
724            paths,
725            rtt_ms,
726            bytes_sent,
727            bytes_received,
728            lost_packets,
729            sent_packets,
730            congestion_window,
731        })
732    }
733
734    /// Take the transport event receiver, handing it off to a platform drain task.
735    ///
736    /// May only be called once per endpoint.  The drain task owns the receiver and
737    /// loops until `event_tx` is dropped (i.e. the endpoint closes).  Returns `None`
738    /// if the receiver was already taken (i.e. `subscribe_events` was called before).
739    pub fn subscribe_events(
740        &self,
741    ) -> Option<tokio::sync::mpsc::Receiver<crate::events::TransportEvent>> {
742        self.inner
743            .event_rx
744            .lock()
745            .unwrap_or_else(|e| e.into_inner())
746            .take()
747    }
748
749    /// Subscribe to path changes for a specific peer.
750    ///
751    /// Spawns a background watcher task the first time a given peer is subscribed.
752    /// The watcher polls `peer_stats()` every 200 ms and emits on the returned
753    /// channel whenever the active path changes.
754    pub fn subscribe_path_changes(
755        &self,
756        node_id_str: &str,
757    ) -> tokio::sync::mpsc::UnboundedReceiver<PathInfo> {
758        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
759        // Replace any existing sender; old watcher exits when it detects is_closed().
760        self.inner.path_subs.insert(node_id_str.to_string(), tx);
761
762        let ep = self.clone();
763        let nid = node_id_str.to_string();
764        let event_tx = self.inner.event_tx.clone();
765
766        tokio::spawn(async move {
767            let mut last_key: Option<String> = None;
768            let mut closed_rx = ep.inner.closed_rx.clone();
769            loop {
770                // Exit immediately if the endpoint has been closed.
771                if *closed_rx.borrow() {
772                    ep.inner.path_subs.remove(&nid);
773                    break;
774                }
775                let is_closed = ep
776                    .inner
777                    .path_subs
778                    .get(&nid)
779                    .map(|s| s.is_closed())
780                    .unwrap_or(true);
781                if is_closed {
782                    ep.inner.path_subs.remove(&nid);
783                    break;
784                }
785
786                if let Some(stats) = ep.peer_stats(&nid).await {
787                    if let Some(active) = stats.paths.iter().find(|p| p.active) {
788                        let key = format!("{}:{}", active.relay, active.addr);
789                        if Some(&key) != last_key.as_ref() {
790                            last_key = Some(key);
791                            if let Some(sender) = ep.inner.path_subs.get(&nid) {
792                                let _ = sender.send(active.clone());
793                            }
794                            let _ = event_tx.try_send(crate::events::TransportEvent::path_change(
795                                &nid,
796                                &active.addr,
797                                active.relay,
798                            ));
799                        }
800                    }
801                }
802
803                // Sleep 200 ms, but wake early if the endpoint is being closed.
804                tokio::select! {
805                    _ = tokio::time::sleep(std::time::Duration::from_millis(200)) => {}
806                    result = closed_rx.wait_for(|v| *v) => {
807                        let _ = result;
808                        ep.inner.path_subs.remove(&nid);
809                        break;
810                    }
811                }
812            }
813        });
814
815        rx
816    }
817}
818
819// ── Bind-error classification ────────────────────────────────────────────────
820
821/// Classify a bind error into a prefixed string for the JS error mapper.
822fn classify_bind_error(e: impl std::fmt::Display) -> crate::CoreError {
823    let msg = e.to_string();
824    crate::CoreError::connection_failed(msg)
825}
826
827// ── NodeAddr info ────────────────────────────────────────────────────────────
828
829/// Serialisable node address: node ID + relay and direct addresses.
830#[derive(Debug, Clone, Serialize, Deserialize)]
831pub struct NodeAddrInfo {
832    /// Base32-encoded public key.
833    pub id: String,
834    /// Relay URLs and/or `ip:port` direct addresses.
835    pub addrs: Vec<String>,
836}
837
838// ── Observability types ──────────────────────────────────────────────────────
839
840/// Endpoint-level observability snapshot.
841///
842/// Returned by [`IrohEndpoint::endpoint_stats`].  All counts are point-in-time reads
843/// and may change between calls.
844#[derive(Debug, Clone, Serialize, Deserialize, Default)]
845#[serde(rename_all = "camelCase")]
846pub struct EndpointStats {
847    /// Number of currently open body reader handles.
848    pub active_readers: usize,
849    /// Number of currently open body writer handles.
850    pub active_writers: usize,
851    /// Number of live QUIC sessions (WebTransport connections).
852    pub active_sessions: usize,
853    /// Total number of allocated (reader + writer + session + other) handles.
854    pub total_handles: usize,
855    /// Number of QUIC connections currently cached in the connection pool.
856    pub pool_size: usize,
857    /// Number of live QUIC connections accepted by the serve loop.
858    pub active_connections: usize,
859    /// Number of HTTP requests currently being processed.
860    pub active_requests: usize,
861}
862
863/// A connection lifecycle event fired when a QUIC peer connection opens or closes.
864#[derive(Debug, Clone, Serialize, Deserialize)]
865#[serde(rename_all = "camelCase")]
866pub struct ConnectionEvent {
867    /// Base32-encoded public key of the peer.
868    pub peer_id: String,
869    /// `true` when this is the first connection from the peer (0→1), `false` when the last one closes (1→0).
870    pub connected: bool,
871}
872
873/// Per-peer connection statistics.
874#[derive(Debug, Clone, Serialize, Deserialize)]
875pub struct PeerStats {
876    /// Whether the peer is connected via a relay server (vs direct).
877    pub relay: bool,
878    /// Active relay URL, if any.
879    pub relay_url: Option<String>,
880    /// All known paths to this peer.
881    pub paths: Vec<PathInfo>,
882    /// Round-trip time in milliseconds.  `None` if no active QUIC connection is pooled.
883    pub rtt_ms: Option<f64>,
884    /// Total UDP bytes sent to this peer.  `None` if no active QUIC connection is pooled.
885    pub bytes_sent: Option<u64>,
886    /// Total UDP bytes received from this peer.  `None` if no active QUIC connection is pooled.
887    pub bytes_received: Option<u64>,
888    /// Total packets lost on the QUIC path.  `None` if no active QUIC connection is pooled.
889    pub lost_packets: Option<u64>,
890    /// Total packets sent on the QUIC path.  `None` if no active QUIC connection is pooled.
891    pub sent_packets: Option<u64>,
892    /// Current congestion window in bytes.  `None` if no active QUIC connection is pooled.
893    pub congestion_window: Option<u64>,
894}
895
896/// Network path information for a single transport address.
897#[derive(Debug, Clone, Serialize, Deserialize)]
898pub struct PathInfo {
899    /// Whether this path goes through a relay server.
900    pub relay: bool,
901    /// The relay URL (if relay) or `ip:port` (if direct).
902    pub addr: String,
903    /// Whether this is the currently selected/active path.
904    pub active: bool,
905}
906
907/// Parse an optional list of socket address strings into `SocketAddr` values.
908///
909/// Returns `Err` if any string cannot be parsed as a `host:port` address so
910/// that callers can surface misconfiguration rather than silently ignoring it.
911pub fn parse_direct_addrs(
912    addrs: &Option<Vec<String>>,
913) -> Result<Option<Vec<std::net::SocketAddr>>, String> {
914    match addrs {
915        None => Ok(None),
916        Some(v) => {
917            let mut out = Vec::with_capacity(v.len());
918            for s in v {
919                let addr = s
920                    .parse::<std::net::SocketAddr>()
921                    .map_err(|e| format!("invalid direct address {s:?}: {e}"))?;
922                out.push(addr);
923            }
924            Ok(Some(out))
925        }
926    }
927}