iroh_http_core/endpoint.rs
1//! Iroh endpoint lifecycle — create, share, and close.
2
3use iroh::address_lookup::{DnsAddressLookup, PkarrPublisher};
4use iroh::endpoint::{Builder, IdleTimeout, QuicTransportConfig, TransportAddrUsage};
5use iroh::{Endpoint, RelayMode, SecretKey};
6use serde::{Deserialize, Serialize};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use crate::pool::ConnectionPool;
12use crate::server::ServeHandle;
13use crate::stream::{HandleStore, StoreConfig};
14use crate::{ALPN, ALPN_DUPLEX};
15
16/// Networking / QUIC transport configuration.
17#[derive(Debug, Clone, Default)]
18pub struct NetworkingOptions {
19 /// Relay server mode. `"default"`, `"staging"`, `"disabled"`, or `"custom"`. Default: `"default"`.
20 pub relay_mode: Option<String>,
21 /// Custom relay server URLs. Only used when `relay_mode` is `"custom"`.
22 pub relays: Vec<String>,
23 /// UDP socket addresses to bind. Empty means OS-assigned.
24 pub bind_addrs: Vec<String>,
25 /// Milliseconds before an idle QUIC connection is cleaned up.
26 pub idle_timeout_ms: Option<u64>,
27 /// HTTP proxy URL for relay traffic.
28 pub proxy_url: Option<String>,
29 /// Read `HTTP_PROXY` / `HTTPS_PROXY` env vars for proxy config.
30 pub proxy_from_env: bool,
31 /// Disable relay servers and DNS discovery entirely. Overrides `relay_mode`.
32 /// Useful for in-process tests where endpoints connect via direct addresses.
33 pub disabled: bool,
34}
35
36/// DNS-based peer discovery configuration.
37#[derive(Debug, Clone)]
38pub struct DiscoveryOptions {
39 /// DNS discovery server URL. Uses n0 DNS defaults when `None`.
40 pub dns_server: Option<String>,
41 /// Whether to enable DNS discovery. Default: `true`.
42 pub enabled: bool,
43}
44
45impl Default for DiscoveryOptions {
46 fn default() -> Self {
47 Self {
48 dns_server: None,
49 enabled: true,
50 }
51 }
52}
53
54/// Connection-pool tuning.
55#[derive(Debug, Clone, Default)]
56pub struct PoolOptions {
57 /// Maximum number of idle connections to keep in the pool.
58 pub max_connections: Option<usize>,
59 /// Milliseconds a pooled connection may remain idle before being evicted.
60 pub idle_timeout_ms: Option<u64>,
61}
62
63/// Body-streaming and handle-store configuration.
64#[derive(Debug, Clone, Default)]
65pub struct StreamingOptions {
66 /// Capacity (in chunks) of each body channel. Default: 32.
67 pub channel_capacity: Option<usize>,
68 /// Maximum byte length of a single chunk in `send_chunk`. Default: 65536.
69 pub max_chunk_size_bytes: Option<usize>,
70 /// Milliseconds to wait for a slow body reader. Default: 30000.
71 pub drain_timeout_ms: Option<u64>,
72 /// TTL in ms for slab handle entries. `0` disables sweeping. Default: 300000.
73 pub handle_ttl_ms: Option<u64>,
74 /// How often (in ms) the TTL sweep task runs. Default: 60000 (60 s).
75 /// Reducing this lowers the worst-case leaked-handle window at the cost of
76 /// more frequent write-lock acquisitions on every handle registry.
77 /// Useful for short-lived endpoints and test fixtures.
78 pub sweep_interval_ms: Option<u64>,
79}
80
81/// Configuration passed to [`IrohEndpoint::bind`].
82#[derive(Debug, Clone, Default)]
83pub struct NodeOptions {
84 /// 32-byte Ed25519 secret key. Generate a fresh one when `None`.
85 pub key: Option<[u8; 32]>,
86 /// Networking / QUIC transport configuration.
87 pub networking: NetworkingOptions,
88 /// DNS-based peer discovery configuration.
89 pub discovery: DiscoveryOptions,
90 /// Connection-pool tuning.
91 pub pool: PoolOptions,
92 /// Body-streaming and handle-store configuration.
93 pub streaming: StreamingOptions,
94 /// ALPN capabilities to advertise.
95 ///
96 /// Valid values: [`ALPN_STR`](crate::ALPN_STR) (`"iroh-http/2"`) and
97 /// [`ALPN_DUPLEX_STR`](crate::ALPN_DUPLEX_STR) (`"iroh-http/2-duplex"`).
98 ///
99 /// When empty (the default), both protocols are advertised. When non-empty,
100 /// the base protocol (`iroh-http/2`) is automatically injected if not
101 /// already present. Unknown values cause [`IrohEndpoint::bind`] to return
102 /// an error.
103 pub capabilities: Vec<String>,
104 /// Write TLS session keys to $SSLKEYLOGFILE. Dev/debug only.
105 pub keylog: bool,
106 /// Maximum byte size of the HTTP/1.1 request or response head.
107 /// `None` = 65536. `Some(0)` is rejected.
108 pub max_header_size: Option<usize>,
109 /// Maximum decompressed response body bytes the client will accept per
110 /// outgoing `fetch()`. Default: 256 MiB. Protects against compression
111 /// bombs from malicious peers.
112 pub max_response_body_bytes: Option<usize>,
113 #[cfg(feature = "compression")]
114 pub compression: Option<CompressionOptions>,
115}
116
117/// Compression options for response bodies.
118/// Only used when the `compression` feature is enabled.
119#[cfg(feature = "compression")]
120#[derive(Debug, Clone)]
121pub struct CompressionOptions {
122 /// Minimum body size in bytes before compression is applied. Default: 512.
123 pub min_body_bytes: usize,
124 /// Zstd compression level (1–22). `None` uses the zstd default (3).
125 pub level: Option<u32>,
126}
127
128/// A shared Iroh endpoint.
129///
130/// Clone-able (cheap Arc clone). All fetch and serve calls on the same node
131/// share one endpoint and therefore one stable QUIC identity.
132#[derive(Clone)]
133pub struct IrohEndpoint {
134 pub(crate) inner: Arc<EndpointInner>,
135}
136
137pub(crate) struct EndpointInner {
138 pub ep: Endpoint,
139 /// The node's own base32-encoded public key (stable for the lifetime of the key).
140 pub node_id_str: String,
141 /// Connection pool for reusing QUIC connections across fetch/connect calls.
142 pub pool: ConnectionPool,
143 /// Maximum byte size of an HTTP/1.1 head (request or response).
144 pub max_header_size: usize,
145 /// Maximum decompressed response body bytes per fetch. Default: 256 MiB.
146 pub max_response_body_bytes: usize,
147 /// Per-endpoint handle store — owns all body readers, writers,
148 /// sessions, request-head channels, and fetch-cancel tokens.
149 pub handles: HandleStore,
150 /// Active serve handle, if `serve()` has been called.
151 pub serve_handle: std::sync::Mutex<Option<ServeHandle>>,
152 /// Done-signal receiver from the active serve task.
153 /// Stored separately so `wait_serve_stop()` can await it without holding
154 /// the `serve_handle` lock for the duration of the wait.
155 pub serve_done_rx: std::sync::Mutex<Option<tokio::sync::watch::Receiver<bool>>>,
156 /// Signals `true` when the endpoint has fully closed (either explicitly or
157 /// because the serve loop exited due to native shutdown).
158 pub closed_tx: tokio::sync::watch::Sender<bool>,
159 pub closed_rx: tokio::sync::watch::Receiver<bool>,
160 /// Number of currently active QUIC connections (incremented by serve loop,
161 /// decremented via RAII guard when each connection task exits).
162 pub active_connections: Arc<AtomicUsize>,
163 /// Number of currently in-flight HTTP requests (incremented when a
164 /// bi-stream is accepted, decremented when the request task exits).
165 pub active_requests: Arc<AtomicUsize>,
166 /// Sender for transport-level events (pool hits/misses, path changes, sweep).
167 pub event_tx: tokio::sync::mpsc::Sender<crate::events::TransportEvent>,
168 /// Receiver for transport-level events. Wrapped in Mutex+Option so
169 /// `subscribe_events()` can take it exactly once for the platform drain task.
170 pub event_rx:
171 std::sync::Mutex<Option<tokio::sync::mpsc::Receiver<crate::events::TransportEvent>>>,
172 /// Per-peer path-change subscriptions.
173 /// Key: node_id_str. Populated lazily when `subscribe_path_changes` is called.
174 pub path_subs: dashmap::DashMap<String, tokio::sync::mpsc::UnboundedSender<PathInfo>>,
175 /// Body compression options, if the feature is enabled.
176 #[cfg(feature = "compression")]
177 pub compression: Option<CompressionOptions>,
178}
179
180impl IrohEndpoint {
181 /// Bind an Iroh endpoint with the supplied options.
182 pub async fn bind(opts: NodeOptions) -> Result<Self, crate::CoreError> {
183 // Validate: if networking is disabled, relay_mode should not be explicitly set to a
184 // network-requiring mode.
185 if opts.networking.disabled
186 && opts
187 .networking
188 .relay_mode
189 .as_deref()
190 .is_some_and(|m| !matches!(m, "disabled"))
191 {
192 return Err(crate::CoreError::invalid_input(
193 "networking.disabled is true but relay_mode is set to a non-disabled value; \
194 set relay_mode to \"disabled\" or omit it when networking.disabled is true",
195 ));
196 }
197
198 let relay_mode = if opts.networking.disabled {
199 RelayMode::Disabled
200 } else {
201 match opts.networking.relay_mode.as_deref() {
202 None | Some("default") => RelayMode::Default,
203 Some("staging") => RelayMode::Staging,
204 Some("disabled") => RelayMode::Disabled,
205 Some("custom") => {
206 if opts.networking.relays.is_empty() {
207 return Err(crate::CoreError::invalid_input(
208 "relay_mode \"custom\" requires at least one URL in `relays`",
209 ));
210 }
211 let urls = opts
212 .networking
213 .relays
214 .iter()
215 .map(|u| {
216 u.parse::<iroh::RelayUrl>()
217 .map_err(crate::CoreError::invalid_input)
218 })
219 .collect::<Result<Vec<_>, _>>()?;
220 RelayMode::custom(urls)
221 }
222 Some(other) => {
223 return Err(crate::CoreError::invalid_input(format!(
224 "unknown relay_mode: {other}"
225 )))
226 }
227 }
228 };
229
230 // Validate capabilities against known ALPN values.
231 for cap in &opts.capabilities {
232 if !crate::KNOWN_ALPNS.contains(&cap.as_str()) {
233 return Err(crate::CoreError::invalid_input(format!(
234 "unknown ALPN capability: \"{cap}\"; valid values are {:?}",
235 crate::KNOWN_ALPNS,
236 )));
237 }
238 }
239
240 // Validate compression level range (zstd accepts 1–22).
241 #[cfg(feature = "compression")]
242 if let Some(level) = opts.compression.as_ref().and_then(|c| c.level) {
243 if !(1..=22).contains(&level) {
244 return Err(crate::CoreError::invalid_input(format!(
245 "compression level must be 1–22, got {level}"
246 )));
247 }
248 }
249
250 let alpns: Vec<Vec<u8>> = if opts.capabilities.is_empty() {
251 // Advertise both ALPN variants.
252 vec![ALPN_DUPLEX.to_vec(), ALPN.to_vec()]
253 } else {
254 let mut list: Vec<Vec<u8>> = opts
255 .capabilities
256 .iter()
257 .map(|c| c.as_bytes().to_vec())
258 .collect();
259 // Always include the base protocol so the node can talk to base-only peers.
260 if !list.iter().any(|a| a == ALPN) {
261 list.push(ALPN.to_vec());
262 }
263 list
264 };
265
266 let mut builder = Builder::new(iroh::endpoint::presets::Minimal)
267 .relay_mode(relay_mode)
268 .alpns(alpns);
269
270 // DNS discovery (enabled by default unless networking.disabled).
271 if !opts.networking.disabled && opts.discovery.enabled {
272 if let Some(ref url_str) = opts.discovery.dns_server {
273 let url: url::Url = url_str.parse().map_err(|e| {
274 crate::CoreError::invalid_input(format!("invalid dns_discovery URL: {e}"))
275 })?;
276 builder = builder
277 .address_lookup(PkarrPublisher::builder(url.clone()))
278 .address_lookup(DnsAddressLookup::builder(
279 url.host_str().unwrap_or_default().to_string(),
280 ));
281 } else {
282 builder = builder
283 .address_lookup(PkarrPublisher::n0_dns())
284 .address_lookup(DnsAddressLookup::n0_dns());
285 }
286 }
287
288 if let Some(key_bytes) = opts.key {
289 builder = builder.secret_key(SecretKey::from_bytes(&key_bytes));
290 }
291
292 if let Some(ms) = opts.networking.idle_timeout_ms {
293 let timeout = IdleTimeout::try_from(Duration::from_millis(ms)).map_err(|e| {
294 crate::CoreError::invalid_input(format!("idle_timeout_ms out of range: {e}"))
295 })?;
296 let transport = QuicTransportConfig::builder()
297 .max_idle_timeout(Some(timeout))
298 // Limit inbound bidirectional streams per connection to bound
299 // slowloris-style resource exhaustion (P2-4).
300 .max_concurrent_bidi_streams(128u32.into())
301 .build();
302 builder = builder.transport_config(transport);
303 } else {
304 // Even without a custom idle timeout, cap concurrent bidi streams.
305 let transport = QuicTransportConfig::builder()
306 .max_concurrent_bidi_streams(128u32.into())
307 .build();
308 builder = builder.transport_config(transport);
309 }
310
311 // Bind address(es).
312 for addr_str in &opts.networking.bind_addrs {
313 let sock: std::net::SocketAddr = addr_str.parse().map_err(|e| {
314 crate::CoreError::invalid_input(format!("invalid bind address \"{addr_str}\": {e}"))
315 })?;
316 builder = builder.bind_addr(sock).map_err(|e| {
317 crate::CoreError::invalid_input(format!("bind address \"{addr_str}\": {e}"))
318 })?;
319 }
320
321 // Proxy configuration.
322 if let Some(ref proxy) = opts.networking.proxy_url {
323 let url: url::Url = proxy
324 .parse()
325 .map_err(|e| crate::CoreError::invalid_input(format!("invalid proxy URL: {e}")))?;
326 builder = builder.proxy_url(url);
327 } else if opts.networking.proxy_from_env {
328 builder = builder.proxy_from_env();
329 }
330
331 // TLS keylog for Wireshark debugging.
332 if opts.keylog {
333 builder = builder.keylog(true);
334 }
335
336 let ep = builder.bind().await.map_err(classify_bind_error)?;
337
338 let node_id_str = crate::base32_encode(ep.id().as_bytes());
339
340 let store_config = StoreConfig {
341 channel_capacity: opts
342 .streaming
343 .channel_capacity
344 .unwrap_or(crate::stream::DEFAULT_CHANNEL_CAPACITY)
345 .max(1),
346 max_chunk_size: opts
347 .streaming
348 .max_chunk_size_bytes
349 .unwrap_or(crate::stream::DEFAULT_MAX_CHUNK_SIZE)
350 .max(1),
351 drain_timeout: Duration::from_millis(
352 opts.streaming
353 .drain_timeout_ms
354 .unwrap_or(crate::stream::DEFAULT_DRAIN_TIMEOUT_MS),
355 ),
356 max_handles: crate::stream::DEFAULT_MAX_HANDLES,
357 ttl: Duration::from_millis(
358 opts.streaming
359 .handle_ttl_ms
360 .unwrap_or(crate::stream::DEFAULT_SLAB_TTL_MS),
361 ),
362 };
363 let sweep_ttl = store_config.ttl;
364 let sweep_interval = Duration::from_millis(
365 opts.streaming
366 .sweep_interval_ms
367 .unwrap_or(crate::stream::DEFAULT_SWEEP_INTERVAL_MS),
368 );
369 let (closed_tx, closed_rx) = tokio::sync::watch::channel(false);
370 let (event_tx, event_rx) = tokio::sync::mpsc::channel::<crate::events::TransportEvent>(256);
371
372 let inner = Arc::new(EndpointInner {
373 ep,
374 node_id_str,
375 pool: ConnectionPool::new(
376 opts.pool.max_connections,
377 opts.pool
378 .idle_timeout_ms
379 .map(std::time::Duration::from_millis),
380 Some(event_tx.clone()),
381 ),
382 // ISS-020: treat 0 as an error — callers should use `None` for the default.
383 max_header_size: match opts.max_header_size {
384 Some(0) => {
385 return Err(crate::CoreError::invalid_input(
386 "max_header_size must be > 0; use None for the default (65536)",
387 ));
388 }
389 None => 64 * 1024,
390 Some(n) => n,
391 },
392 max_response_body_bytes: opts
393 .max_response_body_bytes
394 .unwrap_or(crate::server::DEFAULT_MAX_RESPONSE_BODY_BYTES),
395 handles: HandleStore::new(store_config),
396 serve_handle: std::sync::Mutex::new(None),
397 serve_done_rx: std::sync::Mutex::new(None),
398 closed_tx,
399 closed_rx,
400 active_connections: Arc::new(AtomicUsize::new(0)),
401 active_requests: Arc::new(AtomicUsize::new(0)),
402 event_tx,
403 event_rx: std::sync::Mutex::new(Some(event_rx)),
404 path_subs: dashmap::DashMap::new(),
405 #[cfg(feature = "compression")]
406 compression: opts.compression,
407 });
408
409 // Start per-endpoint sweep task (held alive via Weak reference).
410 if !sweep_ttl.is_zero() {
411 let weak = Arc::downgrade(&inner);
412 tokio::spawn(async move {
413 let mut ticker = tokio::time::interval(sweep_interval);
414 loop {
415 ticker.tick().await;
416 let Some(inner) = weak.upgrade() else {
417 break;
418 };
419 inner.handles.sweep(sweep_ttl);
420 drop(inner); // release strong ref between ticks
421 }
422 });
423 }
424
425 Ok(Self { inner })
426 }
427
428 /// The node's public key as a lowercase base32 string.
429 pub fn node_id(&self) -> &str {
430 &self.inner.node_id_str
431 }
432
433 /// Immediately run a TTL sweep on all handle registries, evicting any
434 /// entries whose TTL has expired.
435 ///
436 /// The background sweep task already runs this automatically on its
437 /// configured interval. `sweep_now()` is provided for test fixtures and
438 /// short-lived endpoints that cannot wait for the next tick.
439 ///
440 /// Returns immediately if the endpoint was created with `handle_ttl_ms: Some(0)`
441 /// (sweeping disabled).
442 pub fn sweep_now(&self) {
443 let ttl = self.inner.handles.config.ttl;
444 if !ttl.is_zero() {
445 self.inner.handles.sweep(ttl);
446 }
447 }
448
449 /// The node's raw secret key bytes (32 bytes).
450 ///
451 /// This is the Ed25519 private key that establishes the node's cryptographic identity.
452 /// Use it **only** to persist and later restore the key via `NodeOptions::secret_key`.
453 ///
454 /// # Security
455 ///
456 /// **These 32 bytes are the irrecoverable private key for this node.**
457 /// Anyone who obtains them can impersonate this node permanently — there is no revocation.
458 ///
459 /// - **Never log, print, or include in error payloads.** Debug formatters, tracing
460 /// spans, and generic error handlers are common accidental leak vectors.
461 /// - **Encrypt at rest.** Store in a secrets vault or OS keychain, not in
462 /// plaintext config files or databases.
463 /// - **Zeroize after use.** Call `zeroize::Zeroize::zeroize()` on the returned
464 /// array (or use a `secrecy`/`zeroize` wrapper) once you have persisted the bytes
465 /// to an encrypted store. The returned `[u8; 32]` is NOT automatically zeroed on drop.
466 /// - **Never include in network responses, crash dumps, or analytics.**
467 #[must_use]
468 pub fn secret_key_bytes(&self) -> [u8; 32] {
469 self.inner.ep.secret_key().to_bytes()
470 }
471
472 /// Graceful close: signal the serve loop to stop accepting, wait for
473 /// in-flight requests to drain (up to the configured drain timeout),
474 /// then close the QUIC endpoint.
475 ///
476 /// If no serve loop is running, closes the endpoint immediately.
477 /// The handle store (all registries) is freed when the last `IrohEndpoint`
478 /// clone is dropped — no explicit unregister is needed.
479 pub async fn close(&self) {
480 // ISS-027: drain in-flight requests *before* dropping so that request
481 // handlers can still access their reader/writer/trailer handles
482 // during the drain window.
483 let handle = self
484 .inner
485 .serve_handle
486 .lock()
487 .unwrap_or_else(|e| e.into_inner())
488 .take();
489 if let Some(h) = handle {
490 h.drain().await;
491 }
492 self.inner.ep.close().await;
493 let _ = self.inner.closed_tx.send(true);
494 }
495
496 /// Immediate close: abort the serve loop and close the endpoint with
497 /// no drain period.
498 pub async fn close_force(&self) {
499 let handle = self
500 .inner
501 .serve_handle
502 .lock()
503 .unwrap_or_else(|e| e.into_inner())
504 .take();
505 if let Some(h) = handle {
506 h.abort();
507 }
508 self.inner.ep.close().await;
509 let _ = self.inner.closed_tx.send(true);
510 }
511
512 /// Wait until this endpoint has been closed (either explicitly via `close()` /
513 /// `close_force()`, or because the native QUIC stack shut down).
514 ///
515 /// Returns immediately if already closed.
516 pub async fn wait_closed(&self) {
517 let mut rx = self.inner.closed_rx.clone();
518 let _ = rx.wait_for(|v| *v).await;
519 }
520
521 /// Store a serve handle so that `close()` can drain it.
522 pub fn set_serve_handle(&self, handle: ServeHandle) {
523 *self
524 .inner
525 .serve_done_rx
526 .lock()
527 .unwrap_or_else(|e| e.into_inner()) = Some(handle.subscribe_done());
528 *self
529 .inner
530 .serve_handle
531 .lock()
532 .unwrap_or_else(|e| e.into_inner()) = Some(handle);
533 }
534
535 /// Signal the serve loop to stop accepting new connections.
536 ///
537 /// Returns immediately — does NOT close the endpoint or drain in-flight
538 /// requests. The handle is preserved so `close()` can still drain later.
539 pub fn stop_serve(&self) {
540 if let Some(h) = self
541 .inner
542 .serve_handle
543 .lock()
544 .unwrap_or_else(|e| e.into_inner())
545 .as_ref()
546 {
547 h.shutdown();
548 }
549 }
550
551 /// Wait until the serve loop has fully exited (serve task drained and finished).
552 ///
553 /// Returns immediately if `serve()` was never called.
554 pub async fn wait_serve_stop(&self) {
555 let rx = self
556 .inner
557 .serve_done_rx
558 .lock()
559 .unwrap_or_else(|e| e.into_inner())
560 .clone();
561 if let Some(mut rx) = rx {
562 // wait_for returns Err only if the sender is dropped; being dropped
563 // also means the task has exited, so treat both outcomes as "done".
564 let _ = rx.wait_for(|v| *v).await;
565 }
566 }
567
568 pub fn raw(&self) -> &Endpoint {
569 &self.inner.ep
570 }
571
572 /// Per-endpoint handle store.
573 pub fn handles(&self) -> &HandleStore {
574 &self.inner.handles
575 }
576
577 /// Maximum byte size of an HTTP/1.1 head.
578 pub fn max_header_size(&self) -> usize {
579 self.inner.max_header_size
580 }
581
582 /// Maximum decompressed response-body bytes accepted per outgoing fetch.
583 pub fn max_response_body_bytes(&self) -> usize {
584 self.inner.max_response_body_bytes
585 }
586
587 /// Access the connection pool.
588 pub(crate) fn pool(&self) -> &ConnectionPool {
589 &self.inner.pool
590 }
591
592 /// Snapshot of current endpoint statistics.
593 ///
594 /// All fields are point-in-time reads and may change between calls.
595 pub fn endpoint_stats(&self) -> EndpointStats {
596 let (active_readers, active_writers, active_sessions, total_handles) =
597 self.inner.handles.count_handles();
598 let pool_size = self.inner.pool.entry_count_approx() as usize;
599 let active_connections = self.inner.active_connections.load(Ordering::Relaxed);
600 let active_requests = self.inner.active_requests.load(Ordering::Relaxed);
601 EndpointStats {
602 active_readers,
603 active_writers,
604 active_sessions,
605 total_handles,
606 pool_size,
607 active_connections,
608 active_requests,
609 }
610 }
611
612 /// Compression options, if the `compression` feature is enabled.
613 #[cfg(feature = "compression")]
614 pub fn compression(&self) -> Option<&CompressionOptions> {
615 self.inner.compression.as_ref()
616 }
617
618 /// Returns the local socket addresses this endpoint is bound to.
619 pub fn bound_sockets(&self) -> Vec<std::net::SocketAddr> {
620 self.inner.ep.bound_sockets()
621 }
622
623 /// Full node address: node ID + relay URL(s) + direct socket addresses.
624 pub fn node_addr(&self) -> NodeAddrInfo {
625 let addr = self.inner.ep.addr();
626 let mut addrs = Vec::new();
627 for relay in addr.relay_urls() {
628 addrs.push(relay.to_string());
629 }
630 for da in addr.ip_addrs() {
631 addrs.push(da.to_string());
632 }
633 NodeAddrInfo {
634 id: self.inner.node_id_str.clone(),
635 addrs,
636 }
637 }
638
639 /// Home relay URL, or `None` if not connected to a relay.
640 pub fn home_relay(&self) -> Option<String> {
641 self.inner
642 .ep
643 .addr()
644 .relay_urls()
645 .next()
646 .map(|u| u.to_string())
647 }
648
649 /// Known addresses for a remote peer, or `None` if not in the endpoint's cache.
650 pub async fn peer_info(&self, node_id_b32: &str) -> Option<NodeAddrInfo> {
651 let bytes = crate::base32_decode(node_id_b32).ok()?;
652 let arr: [u8; 32] = bytes.try_into().ok()?;
653 let pk = iroh::PublicKey::from_bytes(&arr).ok()?;
654 let info = self.inner.ep.remote_info(pk).await?;
655 let id = crate::base32_encode(info.id().as_bytes());
656 let mut addrs = Vec::new();
657 for a in info.addrs() {
658 match a.addr() {
659 iroh::TransportAddr::Ip(sock) => addrs.push(sock.to_string()),
660 iroh::TransportAddr::Relay(url) => addrs.push(url.to_string()),
661 other => addrs.push(format!("{:?}", other)),
662 }
663 }
664 Some(NodeAddrInfo { id, addrs })
665 }
666
667 /// Per-peer connection statistics.
668 ///
669 /// Returns path information for each known transport address, including
670 /// whether each path is via a relay or direct, and which is active.
671 pub async fn peer_stats(&self, node_id_b32: &str) -> Option<PeerStats> {
672 let bytes = crate::base32_decode(node_id_b32).ok()?;
673 let arr: [u8; 32] = bytes.try_into().ok()?;
674 let pk = iroh::PublicKey::from_bytes(&arr).ok()?;
675 let info = self.inner.ep.remote_info(pk).await?;
676
677 let mut paths = Vec::new();
678 let mut has_active_relay = false;
679 let mut active_relay_url: Option<String> = None;
680
681 for a in info.addrs() {
682 let is_relay = a.addr().is_relay();
683 let is_active = matches!(a.usage(), TransportAddrUsage::Active);
684
685 let addr_str = match a.addr() {
686 iroh::TransportAddr::Ip(sock) => sock.to_string(),
687 iroh::TransportAddr::Relay(url) => {
688 if is_active {
689 has_active_relay = true;
690 active_relay_url = Some(url.to_string());
691 }
692 url.to_string()
693 }
694 other => format!("{:?}", other),
695 };
696
697 paths.push(PathInfo {
698 relay: is_relay,
699 addr: addr_str,
700 active: is_active,
701 });
702 }
703
704 // Enrich with QUIC connection-level stats if a pooled connection exists.
705 let (rtt_ms, bytes_sent, bytes_received, lost_packets, sent_packets, congestion_window) =
706 if let Some(pooled) = self.inner.pool.get_existing(pk, crate::ALPN).await {
707 let s = pooled.conn.stats();
708 let rtt = pooled.conn.rtt(iroh::endpoint::PathId::ZERO);
709 (
710 rtt.map(|d| d.as_secs_f64() * 1000.0),
711 Some(s.udp_tx.bytes),
712 Some(s.udp_rx.bytes),
713 None, // quinn path stats not exposed via iroh ConnectionStats
714 None, // quinn path stats not exposed via iroh ConnectionStats
715 None, // quinn path stats not exposed via iroh ConnectionStats
716 )
717 } else {
718 (None, None, None, None, None, None)
719 };
720
721 Some(PeerStats {
722 relay: has_active_relay,
723 relay_url: active_relay_url,
724 paths,
725 rtt_ms,
726 bytes_sent,
727 bytes_received,
728 lost_packets,
729 sent_packets,
730 congestion_window,
731 })
732 }
733
734 /// Take the transport event receiver, handing it off to a platform drain task.
735 ///
736 /// May only be called once per endpoint. The drain task owns the receiver and
737 /// loops until `event_tx` is dropped (i.e. the endpoint closes). Returns `None`
738 /// if the receiver was already taken (i.e. `subscribe_events` was called before).
739 pub fn subscribe_events(
740 &self,
741 ) -> Option<tokio::sync::mpsc::Receiver<crate::events::TransportEvent>> {
742 self.inner
743 .event_rx
744 .lock()
745 .unwrap_or_else(|e| e.into_inner())
746 .take()
747 }
748
749 /// Subscribe to path changes for a specific peer.
750 ///
751 /// Spawns a background watcher task the first time a given peer is subscribed.
752 /// The watcher polls `peer_stats()` every 200 ms and emits on the returned
753 /// channel whenever the active path changes.
754 pub fn subscribe_path_changes(
755 &self,
756 node_id_str: &str,
757 ) -> tokio::sync::mpsc::UnboundedReceiver<PathInfo> {
758 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
759 // Replace any existing sender; old watcher exits when it detects is_closed().
760 self.inner.path_subs.insert(node_id_str.to_string(), tx);
761
762 let ep = self.clone();
763 let nid = node_id_str.to_string();
764 let event_tx = self.inner.event_tx.clone();
765
766 tokio::spawn(async move {
767 let mut last_key: Option<String> = None;
768 let mut closed_rx = ep.inner.closed_rx.clone();
769 loop {
770 // Exit immediately if the endpoint has been closed.
771 if *closed_rx.borrow() {
772 ep.inner.path_subs.remove(&nid);
773 break;
774 }
775 let is_closed = ep
776 .inner
777 .path_subs
778 .get(&nid)
779 .map(|s| s.is_closed())
780 .unwrap_or(true);
781 if is_closed {
782 ep.inner.path_subs.remove(&nid);
783 break;
784 }
785
786 if let Some(stats) = ep.peer_stats(&nid).await {
787 if let Some(active) = stats.paths.iter().find(|p| p.active) {
788 let key = format!("{}:{}", active.relay, active.addr);
789 if Some(&key) != last_key.as_ref() {
790 last_key = Some(key);
791 if let Some(sender) = ep.inner.path_subs.get(&nid) {
792 let _ = sender.send(active.clone());
793 }
794 let _ = event_tx.try_send(crate::events::TransportEvent::path_change(
795 &nid,
796 &active.addr,
797 active.relay,
798 ));
799 }
800 }
801 }
802
803 // Sleep 200 ms, but wake early if the endpoint is being closed.
804 tokio::select! {
805 _ = tokio::time::sleep(std::time::Duration::from_millis(200)) => {}
806 result = closed_rx.wait_for(|v| *v) => {
807 let _ = result;
808 ep.inner.path_subs.remove(&nid);
809 break;
810 }
811 }
812 }
813 });
814
815 rx
816 }
817}
818
819// ── Bind-error classification ────────────────────────────────────────────────
820
821/// Classify a bind error into a prefixed string for the JS error mapper.
822fn classify_bind_error(e: impl std::fmt::Display) -> crate::CoreError {
823 let msg = e.to_string();
824 crate::CoreError::connection_failed(msg)
825}
826
827// ── NodeAddr info ────────────────────────────────────────────────────────────
828
829/// Serialisable node address: node ID + relay and direct addresses.
830#[derive(Debug, Clone, Serialize, Deserialize)]
831pub struct NodeAddrInfo {
832 /// Base32-encoded public key.
833 pub id: String,
834 /// Relay URLs and/or `ip:port` direct addresses.
835 pub addrs: Vec<String>,
836}
837
838// ── Observability types ──────────────────────────────────────────────────────
839
840/// Endpoint-level observability snapshot.
841///
842/// Returned by [`IrohEndpoint::endpoint_stats`]. All counts are point-in-time reads
843/// and may change between calls.
844#[derive(Debug, Clone, Serialize, Deserialize, Default)]
845#[serde(rename_all = "camelCase")]
846pub struct EndpointStats {
847 /// Number of currently open body reader handles.
848 pub active_readers: usize,
849 /// Number of currently open body writer handles.
850 pub active_writers: usize,
851 /// Number of live QUIC sessions (WebTransport connections).
852 pub active_sessions: usize,
853 /// Total number of allocated (reader + writer + session + other) handles.
854 pub total_handles: usize,
855 /// Number of QUIC connections currently cached in the connection pool.
856 pub pool_size: usize,
857 /// Number of live QUIC connections accepted by the serve loop.
858 pub active_connections: usize,
859 /// Number of HTTP requests currently being processed.
860 pub active_requests: usize,
861}
862
863/// A connection lifecycle event fired when a QUIC peer connection opens or closes.
864#[derive(Debug, Clone, Serialize, Deserialize)]
865#[serde(rename_all = "camelCase")]
866pub struct ConnectionEvent {
867 /// Base32-encoded public key of the peer.
868 pub peer_id: String,
869 /// `true` when this is the first connection from the peer (0→1), `false` when the last one closes (1→0).
870 pub connected: bool,
871}
872
873/// Per-peer connection statistics.
874#[derive(Debug, Clone, Serialize, Deserialize)]
875pub struct PeerStats {
876 /// Whether the peer is connected via a relay server (vs direct).
877 pub relay: bool,
878 /// Active relay URL, if any.
879 pub relay_url: Option<String>,
880 /// All known paths to this peer.
881 pub paths: Vec<PathInfo>,
882 /// Round-trip time in milliseconds. `None` if no active QUIC connection is pooled.
883 pub rtt_ms: Option<f64>,
884 /// Total UDP bytes sent to this peer. `None` if no active QUIC connection is pooled.
885 pub bytes_sent: Option<u64>,
886 /// Total UDP bytes received from this peer. `None` if no active QUIC connection is pooled.
887 pub bytes_received: Option<u64>,
888 /// Total packets lost on the QUIC path. `None` if no active QUIC connection is pooled.
889 pub lost_packets: Option<u64>,
890 /// Total packets sent on the QUIC path. `None` if no active QUIC connection is pooled.
891 pub sent_packets: Option<u64>,
892 /// Current congestion window in bytes. `None` if no active QUIC connection is pooled.
893 pub congestion_window: Option<u64>,
894}
895
896/// Network path information for a single transport address.
897#[derive(Debug, Clone, Serialize, Deserialize)]
898pub struct PathInfo {
899 /// Whether this path goes through a relay server.
900 pub relay: bool,
901 /// The relay URL (if relay) or `ip:port` (if direct).
902 pub addr: String,
903 /// Whether this is the currently selected/active path.
904 pub active: bool,
905}
906
907/// Parse an optional list of socket address strings into `SocketAddr` values.
908///
909/// Returns `Err` if any string cannot be parsed as a `host:port` address so
910/// that callers can surface misconfiguration rather than silently ignoring it.
911pub fn parse_direct_addrs(
912 addrs: &Option<Vec<String>>,
913) -> Result<Option<Vec<std::net::SocketAddr>>, String> {
914 match addrs {
915 None => Ok(None),
916 Some(v) => {
917 let mut out = Vec::with_capacity(v.len());
918 for s in v {
919 let addr = s
920 .parse::<std::net::SocketAddr>()
921 .map_err(|e| format!("invalid direct address {s:?}: {e}"))?;
922 out.push(addr);
923 }
924 Ok(Some(out))
925 }
926 }
927}