iroh_http_core/endpoint.rs
1//! Iroh endpoint lifecycle — create, share, and close.
2
3use iroh::address_lookup::{DnsAddressLookup, PkarrPublisher};
4use iroh::endpoint::{IdleTimeout, QuicTransportConfig, TransportAddrUsage};
5use iroh::{Endpoint, RelayMode, SecretKey};
6use serde::{Deserialize, Serialize};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use crate::pool::ConnectionPool;
12use crate::server::ServeHandle;
13use crate::stream::{HandleStore, StoreConfig};
14use crate::{ALPN, ALPN_DUPLEX};
15
16/// Networking / QUIC transport configuration.
17#[derive(Debug, Clone, Default)]
18pub struct NetworkingOptions {
19 /// Relay server mode. `"default"`, `"staging"`, `"disabled"`, or `"custom"`. Default: `"default"`.
20 pub relay_mode: Option<String>,
21 /// Custom relay server URLs. Only used when `relay_mode` is `"custom"`.
22 pub relays: Vec<String>,
23 /// UDP socket addresses to bind. Empty means OS-assigned.
24 pub bind_addrs: Vec<String>,
25 /// Milliseconds before an idle QUIC connection is cleaned up.
26 pub idle_timeout_ms: Option<u64>,
27 /// HTTP proxy URL for relay traffic.
28 pub proxy_url: Option<String>,
29 /// Read `HTTP_PROXY` / `HTTPS_PROXY` env vars for proxy config.
30 pub proxy_from_env: bool,
31 /// Disable relay servers and DNS discovery entirely. Overrides `relay_mode`.
32 /// Useful for in-process tests where endpoints connect via direct addresses.
33 pub disabled: bool,
34}
35
36/// DNS-based peer discovery configuration.
37#[derive(Debug, Clone)]
38pub struct DiscoveryOptions {
39 /// DNS discovery server URL. Uses n0 DNS defaults when `None`.
40 pub dns_server: Option<String>,
41 /// Whether to enable DNS discovery. Default: `true`.
42 pub enabled: bool,
43}
44
45impl Default for DiscoveryOptions {
46 fn default() -> Self {
47 Self {
48 dns_server: None,
49 enabled: true,
50 }
51 }
52}
53
54/// Connection-pool tuning.
55#[derive(Debug, Clone, Default)]
56pub struct PoolOptions {
57 /// Maximum number of idle connections to keep in the pool.
58 pub max_connections: Option<usize>,
59 /// Milliseconds a pooled connection may remain idle before being evicted.
60 pub idle_timeout_ms: Option<u64>,
61}
62
63/// Body-streaming and handle-store configuration.
64#[derive(Debug, Clone, Default)]
65pub struct StreamingOptions {
66 /// Capacity (in chunks) of each body channel. Default: 32.
67 pub channel_capacity: Option<usize>,
68 /// Maximum byte length of a single chunk in `send_chunk`. Default: 65536.
69 pub max_chunk_size_bytes: Option<usize>,
70 /// Milliseconds to wait for a slow body reader. Default: 30000.
71 pub drain_timeout_ms: Option<u64>,
72 /// TTL in ms for slab handle entries. `0` disables sweeping. Default: 300000.
73 pub handle_ttl_ms: Option<u64>,
74 /// How often (in ms) the TTL sweep task runs. Default: 60000 (60 s).
75 /// Reducing this lowers the worst-case leaked-handle window at the cost of
76 /// more frequent write-lock acquisitions on every handle registry.
77 /// Useful for short-lived endpoints and test fixtures.
78 pub sweep_interval_ms: Option<u64>,
79}
80
81/// Configuration passed to [`IrohEndpoint::bind`].
82#[derive(Debug, Clone, Default)]
83pub struct NodeOptions {
84 /// 32-byte Ed25519 secret key. Generate a fresh one when `None`.
85 pub key: Option<[u8; 32]>,
86 /// Networking / QUIC transport configuration.
87 pub networking: NetworkingOptions,
88 /// DNS-based peer discovery configuration.
89 pub discovery: DiscoveryOptions,
90 /// Connection-pool tuning.
91 pub pool: PoolOptions,
92 /// Body-streaming and handle-store configuration.
93 pub streaming: StreamingOptions,
94 /// ALPN capabilities to advertise. Empty = advertise iroh-http/2 and iroh-http/2-duplex.
95 pub capabilities: Vec<String>,
96 /// Write TLS session keys to $SSLKEYLOGFILE. Dev/debug only.
97 pub keylog: bool,
98 /// Maximum byte size of the HTTP/1.1 request or response head. `None` or `0` = 65536.
99 pub max_header_size: Option<usize>,
100 /// Server-side limits forwarded to the serve loop.
101 pub server_limits: crate::server::ServerLimits,
102 #[cfg(feature = "compression")]
103 pub compression: Option<CompressionOptions>,
104}
105
106/// Compression options for response bodies.
107/// Only used when the `compression` feature is enabled.
108#[cfg(feature = "compression")]
109#[derive(Debug, Clone)]
110pub struct CompressionOptions {
111 /// Minimum body size in bytes before compression is applied. Default: 512.
112 pub min_body_bytes: usize,
113 /// Zstd compression level (1–22). `None` uses the zstd default (3).
114 pub level: Option<u32>,
115}
116
117/// A shared Iroh endpoint.
118///
119/// Clone-able (cheap Arc clone). All fetch and serve calls on the same node
120/// share one endpoint and therefore one stable QUIC identity.
121#[derive(Clone)]
122pub struct IrohEndpoint {
123 pub(crate) inner: Arc<EndpointInner>,
124}
125
126pub(crate) struct EndpointInner {
127 pub ep: Endpoint,
128 /// The node's own base32-encoded public key (stable for the lifetime of the key).
129 pub node_id_str: String,
130 /// Connection pool for reusing QUIC connections across fetch/connect calls.
131 pub pool: ConnectionPool,
132 /// Maximum byte size of an HTTP/1.1 head (request or response).
133 pub max_header_size: usize,
134 /// Server-side limits forwarded to the serve loop.
135 pub server_limits: crate::server::ServerLimits,
136 /// Per-endpoint handle store — owns all body readers, writers,
137 /// sessions, request-head channels, and fetch-cancel tokens.
138 pub handles: HandleStore,
139 /// Active serve handle, if `serve()` has been called.
140 pub serve_handle: std::sync::Mutex<Option<ServeHandle>>,
141 /// Done-signal receiver from the active serve task.
142 /// Stored separately so `wait_serve_stop()` can await it without holding
143 /// the `serve_handle` lock for the duration of the wait.
144 pub serve_done_rx: std::sync::Mutex<Option<tokio::sync::watch::Receiver<bool>>>,
145 /// Signals `true` when the endpoint has fully closed (either explicitly or
146 /// because the serve loop exited due to native shutdown).
147 pub closed_tx: tokio::sync::watch::Sender<bool>,
148 pub closed_rx: tokio::sync::watch::Receiver<bool>,
149 /// Number of currently active QUIC connections (incremented by serve loop,
150 /// decremented via RAII guard when each connection task exits).
151 pub active_connections: Arc<AtomicUsize>,
152 /// Number of currently in-flight HTTP requests (incremented when a
153 /// bi-stream is accepted, decremented when the request task exits).
154 pub active_requests: Arc<AtomicUsize>,
155 /// Sender for transport-level events (pool hits/misses, path changes, sweep).
156 pub event_tx: tokio::sync::mpsc::Sender<crate::events::TransportEvent>,
157 /// Receiver for transport-level events. Wrapped in Mutex+Option so
158 /// `subscribe_events()` can take it exactly once for the platform drain task.
159 pub event_rx:
160 std::sync::Mutex<Option<tokio::sync::mpsc::Receiver<crate::events::TransportEvent>>>,
161 /// Per-peer path-change subscriptions.
162 /// Key: node_id_str. Populated lazily when `subscribe_path_changes` is called.
163 pub path_subs: dashmap::DashMap<String, tokio::sync::mpsc::UnboundedSender<PathInfo>>,
164 /// Body compression options, if the feature is enabled.
165 #[cfg(feature = "compression")]
166 pub compression: Option<CompressionOptions>,
167}
168
169impl IrohEndpoint {
170 /// Bind an Iroh endpoint with the supplied options.
171 pub async fn bind(opts: NodeOptions) -> Result<Self, crate::CoreError> {
172 // Validate: if networking is disabled, relay_mode should not be explicitly set to a
173 // network-requiring mode.
174 if opts.networking.disabled
175 && opts
176 .networking
177 .relay_mode
178 .as_deref()
179 .is_some_and(|m| !matches!(m, "disabled"))
180 {
181 return Err(crate::CoreError::invalid_input(
182 "networking.disabled is true but relay_mode is set to a non-disabled value; \
183 set relay_mode to \"disabled\" or omit it when networking.disabled is true",
184 ));
185 }
186
187 let relay_mode = if opts.networking.disabled {
188 RelayMode::Disabled
189 } else {
190 match opts.networking.relay_mode.as_deref() {
191 None | Some("default") => RelayMode::Default,
192 Some("staging") => RelayMode::Staging,
193 Some("disabled") => RelayMode::Disabled,
194 Some("custom") => {
195 if opts.networking.relays.is_empty() {
196 return Err(crate::CoreError::invalid_input(
197 "relay_mode \"custom\" requires at least one URL in `relays`",
198 ));
199 }
200 let urls = opts
201 .networking
202 .relays
203 .iter()
204 .map(|u| {
205 u.parse::<iroh::RelayUrl>()
206 .map_err(crate::CoreError::invalid_input)
207 })
208 .collect::<Result<Vec<_>, _>>()?;
209 RelayMode::custom(urls)
210 }
211 Some(other) => {
212 return Err(crate::CoreError::invalid_input(format!(
213 "unknown relay_mode: {other}"
214 )))
215 }
216 }
217 };
218
219 let alpns: Vec<Vec<u8>> = if opts.capabilities.is_empty() {
220 // Advertise both ALPN variants.
221 vec![ALPN_DUPLEX.to_vec(), ALPN.to_vec()]
222 } else {
223 let mut list: Vec<Vec<u8>> = opts
224 .capabilities
225 .iter()
226 .map(|c| c.as_bytes().to_vec())
227 .collect();
228 // Always include the base protocol so the node can talk to base-only peers.
229 if !list.iter().any(|a| a == ALPN) {
230 list.push(ALPN.to_vec());
231 }
232 list
233 };
234
235 let mut builder = Endpoint::empty_builder(relay_mode).alpns(alpns);
236
237 // DNS discovery (enabled by default unless networking.disabled).
238 if !opts.networking.disabled && opts.discovery.enabled {
239 if let Some(ref url_str) = opts.discovery.dns_server {
240 let url: url::Url = url_str.parse().map_err(|e| {
241 crate::CoreError::invalid_input(format!("invalid dns_discovery URL: {e}"))
242 })?;
243 builder = builder
244 .address_lookup(PkarrPublisher::builder(url.clone()))
245 .address_lookup(DnsAddressLookup::builder(
246 url.host_str().unwrap_or_default().to_string(),
247 ));
248 } else {
249 builder = builder
250 .address_lookup(PkarrPublisher::n0_dns())
251 .address_lookup(DnsAddressLookup::n0_dns());
252 }
253 }
254
255 if let Some(key_bytes) = opts.key {
256 builder = builder.secret_key(SecretKey::from_bytes(&key_bytes));
257 }
258
259 if let Some(ms) = opts.networking.idle_timeout_ms {
260 let timeout = IdleTimeout::try_from(Duration::from_millis(ms)).map_err(|e| {
261 crate::CoreError::invalid_input(format!("idle_timeout_ms out of range: {e}"))
262 })?;
263 let transport = QuicTransportConfig::builder()
264 .max_idle_timeout(Some(timeout))
265 // Limit inbound bidirectional streams per connection to bound
266 // slowloris-style resource exhaustion (P2-4).
267 .max_concurrent_bidi_streams(128u32.into())
268 .build();
269 builder = builder.transport_config(transport);
270 } else {
271 // Even without a custom idle timeout, cap concurrent bidi streams.
272 let transport = QuicTransportConfig::builder()
273 .max_concurrent_bidi_streams(128u32.into())
274 .build();
275 builder = builder.transport_config(transport);
276 }
277
278 // Bind address(es).
279 for addr_str in &opts.networking.bind_addrs {
280 let sock: std::net::SocketAddr = addr_str.parse().map_err(|e| {
281 crate::CoreError::invalid_input(format!("invalid bind address \"{addr_str}\": {e}"))
282 })?;
283 builder = builder.bind_addr(sock).map_err(|e| {
284 crate::CoreError::invalid_input(format!("bind address \"{addr_str}\": {e}"))
285 })?;
286 }
287
288 // Proxy configuration.
289 if let Some(ref proxy) = opts.networking.proxy_url {
290 let url: url::Url = proxy
291 .parse()
292 .map_err(|e| crate::CoreError::invalid_input(format!("invalid proxy URL: {e}")))?;
293 builder = builder.proxy_url(url);
294 } else if opts.networking.proxy_from_env {
295 builder = builder.proxy_from_env();
296 }
297
298 // TLS keylog for Wireshark debugging.
299 if opts.keylog {
300 builder = builder.keylog(true);
301 }
302
303 let ep = builder.bind().await.map_err(classify_bind_error)?;
304
305 let node_id_str = crate::base32_encode(ep.id().as_bytes());
306
307 let store_config = StoreConfig {
308 channel_capacity: opts
309 .streaming
310 .channel_capacity
311 .unwrap_or(crate::stream::DEFAULT_CHANNEL_CAPACITY)
312 .max(1),
313 max_chunk_size: opts
314 .streaming
315 .max_chunk_size_bytes
316 .unwrap_or(crate::stream::DEFAULT_MAX_CHUNK_SIZE)
317 .max(1),
318 drain_timeout: Duration::from_millis(
319 opts.streaming
320 .drain_timeout_ms
321 .unwrap_or(crate::stream::DEFAULT_DRAIN_TIMEOUT_MS),
322 ),
323 max_handles: crate::stream::DEFAULT_MAX_HANDLES,
324 ttl: Duration::from_millis(
325 opts.streaming
326 .handle_ttl_ms
327 .unwrap_or(crate::stream::DEFAULT_SLAB_TTL_MS),
328 ),
329 };
330 let sweep_ttl = store_config.ttl;
331 let sweep_interval = Duration::from_millis(
332 opts.streaming
333 .sweep_interval_ms
334 .unwrap_or(crate::stream::DEFAULT_SWEEP_INTERVAL_MS),
335 );
336 let (closed_tx, closed_rx) = tokio::sync::watch::channel(false);
337 let (event_tx, event_rx) = tokio::sync::mpsc::channel::<crate::events::TransportEvent>(256);
338
339 let inner = Arc::new(EndpointInner {
340 ep,
341 node_id_str,
342 pool: ConnectionPool::new(
343 opts.pool.max_connections,
344 opts.pool
345 .idle_timeout_ms
346 .map(std::time::Duration::from_millis),
347 Some(event_tx.clone()),
348 ),
349 // ISS-020: treat 0 as "use default" — it would otherwise underflow
350 // the hyper minimum (ISS-001). None also defaults to 64 KB.
351 max_header_size: match opts.max_header_size {
352 None | Some(0) => 64 * 1024,
353 Some(n) => n,
354 },
355 server_limits: {
356 let mut sl = opts.server_limits.clone();
357 if sl.max_consecutive_errors.is_none() {
358 sl.max_consecutive_errors = Some(5);
359 }
360 sl
361 },
362 handles: HandleStore::new(store_config),
363 serve_handle: std::sync::Mutex::new(None),
364 serve_done_rx: std::sync::Mutex::new(None),
365 closed_tx,
366 closed_rx,
367 active_connections: Arc::new(AtomicUsize::new(0)),
368 active_requests: Arc::new(AtomicUsize::new(0)),
369 event_tx,
370 event_rx: std::sync::Mutex::new(Some(event_rx)),
371 path_subs: dashmap::DashMap::new(),
372 #[cfg(feature = "compression")]
373 compression: opts.compression,
374 });
375
376 // Start per-endpoint sweep task (held alive via Weak reference).
377 if !sweep_ttl.is_zero() {
378 let weak = Arc::downgrade(&inner);
379 tokio::spawn(async move {
380 let mut ticker = tokio::time::interval(sweep_interval);
381 loop {
382 ticker.tick().await;
383 let Some(inner) = weak.upgrade() else {
384 break;
385 };
386 inner.handles.sweep(sweep_ttl);
387 drop(inner); // release strong ref between ticks
388 }
389 });
390 }
391
392 Ok(Self { inner })
393 }
394
395 /// The node's public key as a lowercase base32 string.
396 pub fn node_id(&self) -> &str {
397 &self.inner.node_id_str
398 }
399
400 /// The configured consecutive-error limit for the serve loop.
401 pub fn max_consecutive_errors(&self) -> usize {
402 self.inner.server_limits.max_consecutive_errors.unwrap_or(5)
403 }
404
405 /// Immediately run a TTL sweep on all handle registries, evicting any
406 /// entries whose TTL has expired.
407 ///
408 /// The background sweep task already runs this automatically on its
409 /// configured interval. `sweep_now()` is provided for test fixtures and
410 /// short-lived endpoints that cannot wait for the next tick.
411 ///
412 /// Returns immediately if the endpoint was created with `handle_ttl_ms: Some(0)`
413 /// (sweeping disabled).
414 pub fn sweep_now(&self) {
415 let ttl = self.inner.handles.config.ttl;
416 if !ttl.is_zero() {
417 self.inner.handles.sweep(ttl);
418 }
419 }
420
421 /// Build a [`ServeOptions`] from the endpoint's stored configuration.
422 ///
423 /// Platform adapters should call this instead of constructing `ServeOptions`
424 /// manually so that all server-limit fields are forwarded consistently.
425 pub fn serve_options(&self) -> crate::server::ServeOptions {
426 self.inner.server_limits.clone()
427 }
428
429 /// The node's raw secret key bytes (32 bytes).
430 ///
431 /// This is the Ed25519 private key that establishes the node's cryptographic identity.
432 /// Use it **only** to persist and later restore the key via `NodeOptions::secret_key`.
433 ///
434 /// # Security
435 ///
436 /// **These 32 bytes are the irrecoverable private key for this node.**
437 /// Anyone who obtains them can impersonate this node permanently — there is no revocation.
438 ///
439 /// - **Never log, print, or include in error payloads.** Debug formatters, tracing
440 /// spans, and generic error handlers are common accidental leak vectors.
441 /// - **Encrypt at rest.** Store in a secrets vault or OS keychain, not in
442 /// plaintext config files or databases.
443 /// - **Zeroize after use.** Call `zeroize::Zeroize::zeroize()` on the returned
444 /// array (or use a `secrecy`/`zeroize` wrapper) once you have persisted the bytes
445 /// to an encrypted store. The returned `[u8; 32]` is NOT automatically zeroed on drop.
446 /// - **Never include in network responses, crash dumps, or analytics.**
447 #[must_use]
448 pub fn secret_key_bytes(&self) -> [u8; 32] {
449 self.inner.ep.secret_key().to_bytes()
450 }
451
452 /// Graceful close: signal the serve loop to stop accepting, wait for
453 /// in-flight requests to drain (up to the configured drain timeout),
454 /// then close the QUIC endpoint.
455 ///
456 /// If no serve loop is running, closes the endpoint immediately.
457 /// The handle store (all registries) is freed when the last `IrohEndpoint`
458 /// clone is dropped — no explicit unregister is needed.
459 pub async fn close(&self) {
460 // ISS-027: drain in-flight requests *before* dropping so that request
461 // handlers can still access their reader/writer/trailer handles
462 // during the drain window.
463 let handle = self
464 .inner
465 .serve_handle
466 .lock()
467 .unwrap_or_else(|e| e.into_inner())
468 .take();
469 if let Some(h) = handle {
470 h.drain().await;
471 }
472 self.inner.ep.close().await;
473 let _ = self.inner.closed_tx.send(true);
474 }
475
476 /// Immediate close: abort the serve loop and close the endpoint with
477 /// no drain period.
478 pub async fn close_force(&self) {
479 let handle = self
480 .inner
481 .serve_handle
482 .lock()
483 .unwrap_or_else(|e| e.into_inner())
484 .take();
485 if let Some(h) = handle {
486 h.abort();
487 }
488 self.inner.ep.close().await;
489 let _ = self.inner.closed_tx.send(true);
490 }
491
492 /// Wait until this endpoint has been closed (either explicitly via `close()` /
493 /// `close_force()`, or because the native QUIC stack shut down).
494 ///
495 /// Returns immediately if already closed.
496 pub async fn wait_closed(&self) {
497 let mut rx = self.inner.closed_rx.clone();
498 let _ = rx.wait_for(|v| *v).await;
499 }
500
501 /// Store a serve handle so that `close()` can drain it.
502 pub fn set_serve_handle(&self, handle: ServeHandle) {
503 *self
504 .inner
505 .serve_done_rx
506 .lock()
507 .unwrap_or_else(|e| e.into_inner()) = Some(handle.subscribe_done());
508 *self
509 .inner
510 .serve_handle
511 .lock()
512 .unwrap_or_else(|e| e.into_inner()) = Some(handle);
513 }
514
515 /// Signal the serve loop to stop accepting new connections.
516 ///
517 /// Returns immediately — does NOT close the endpoint or drain in-flight
518 /// requests. The handle is preserved so `close()` can still drain later.
519 pub fn stop_serve(&self) {
520 if let Some(h) = self
521 .inner
522 .serve_handle
523 .lock()
524 .unwrap_or_else(|e| e.into_inner())
525 .as_ref()
526 {
527 h.shutdown();
528 }
529 }
530
531 /// Wait until the serve loop has fully exited (serve task drained and finished).
532 ///
533 /// Returns immediately if `serve()` was never called.
534 pub async fn wait_serve_stop(&self) {
535 let rx = self
536 .inner
537 .serve_done_rx
538 .lock()
539 .unwrap_or_else(|e| e.into_inner())
540 .clone();
541 if let Some(mut rx) = rx {
542 // wait_for returns Err only if the sender is dropped; being dropped
543 // also means the task has exited, so treat both outcomes as "done".
544 let _ = rx.wait_for(|v| *v).await;
545 }
546 }
547
548 pub fn raw(&self) -> &Endpoint {
549 &self.inner.ep
550 }
551
552 /// Per-endpoint handle store.
553 pub fn handles(&self) -> &HandleStore {
554 &self.inner.handles
555 }
556
557 /// Maximum byte size of an HTTP/1.1 head.
558 pub fn max_header_size(&self) -> usize {
559 self.inner.max_header_size
560 }
561
562 /// Maximum decompressed response-body bytes accepted per outgoing fetch.
563 pub fn max_response_body_bytes(&self) -> usize {
564 self.inner
565 .server_limits
566 .max_response_body_bytes
567 .unwrap_or(crate::server::DEFAULT_MAX_RESPONSE_BODY_BYTES)
568 }
569
570 /// Access the connection pool.
571 pub(crate) fn pool(&self) -> &ConnectionPool {
572 &self.inner.pool
573 }
574
575 /// Snapshot of current endpoint statistics.
576 ///
577 /// All fields are point-in-time reads and may change between calls.
578 pub fn endpoint_stats(&self) -> EndpointStats {
579 let (active_readers, active_writers, active_sessions, total_handles) =
580 self.inner.handles.count_handles();
581 let pool_size = self.inner.pool.entry_count_approx();
582 let active_connections = self.inner.active_connections.load(Ordering::Relaxed);
583 let active_requests = self.inner.active_requests.load(Ordering::Relaxed);
584 EndpointStats {
585 active_readers,
586 active_writers,
587 active_sessions,
588 total_handles,
589 pool_size,
590 active_connections,
591 active_requests,
592 }
593 }
594
595 /// Compression options, if the `compression` feature is enabled.
596 #[cfg(feature = "compression")]
597 pub fn compression(&self) -> Option<&CompressionOptions> {
598 self.inner.compression.as_ref()
599 }
600
601 /// Returns the local socket addresses this endpoint is bound to.
602 pub fn bound_sockets(&self) -> Vec<std::net::SocketAddr> {
603 self.inner.ep.bound_sockets()
604 }
605
606 /// Full node address: node ID + relay URL(s) + direct socket addresses.
607 pub fn node_addr(&self) -> NodeAddrInfo {
608 let addr = self.inner.ep.addr();
609 let mut addrs = Vec::new();
610 for relay in addr.relay_urls() {
611 addrs.push(relay.to_string());
612 }
613 for da in addr.ip_addrs() {
614 addrs.push(da.to_string());
615 }
616 NodeAddrInfo {
617 id: self.inner.node_id_str.clone(),
618 addrs,
619 }
620 }
621
622 /// Home relay URL, or `None` if not connected to a relay.
623 pub fn home_relay(&self) -> Option<String> {
624 self.inner
625 .ep
626 .addr()
627 .relay_urls()
628 .next()
629 .map(|u| u.to_string())
630 }
631
632 /// Known addresses for a remote peer, or `None` if not in the endpoint's cache.
633 pub async fn peer_info(&self, node_id_b32: &str) -> Option<NodeAddrInfo> {
634 let bytes = crate::base32_decode(node_id_b32).ok()?;
635 let arr: [u8; 32] = bytes.try_into().ok()?;
636 let pk = iroh::PublicKey::from_bytes(&arr).ok()?;
637 let info = self.inner.ep.remote_info(pk).await?;
638 let id = crate::base32_encode(info.id().as_bytes());
639 let mut addrs = Vec::new();
640 for a in info.addrs() {
641 match a.addr() {
642 iroh::TransportAddr::Ip(sock) => addrs.push(sock.to_string()),
643 iroh::TransportAddr::Relay(url) => addrs.push(url.to_string()),
644 other => addrs.push(format!("{:?}", other)),
645 }
646 }
647 Some(NodeAddrInfo { id, addrs })
648 }
649
650 /// Per-peer connection statistics.
651 ///
652 /// Returns path information for each known transport address, including
653 /// whether each path is via a relay or direct, and which is active.
654 pub async fn peer_stats(&self, node_id_b32: &str) -> Option<PeerStats> {
655 let bytes = crate::base32_decode(node_id_b32).ok()?;
656 let arr: [u8; 32] = bytes.try_into().ok()?;
657 let pk = iroh::PublicKey::from_bytes(&arr).ok()?;
658 let info = self.inner.ep.remote_info(pk).await?;
659
660 let mut paths = Vec::new();
661 let mut has_active_relay = false;
662 let mut active_relay_url: Option<String> = None;
663
664 for a in info.addrs() {
665 let is_relay = a.addr().is_relay();
666 let is_active = matches!(a.usage(), TransportAddrUsage::Active);
667
668 let addr_str = match a.addr() {
669 iroh::TransportAddr::Ip(sock) => sock.to_string(),
670 iroh::TransportAddr::Relay(url) => {
671 if is_active {
672 has_active_relay = true;
673 active_relay_url = Some(url.to_string());
674 }
675 url.to_string()
676 }
677 other => format!("{:?}", other),
678 };
679
680 paths.push(PathInfo {
681 relay: is_relay,
682 addr: addr_str,
683 active: is_active,
684 });
685 }
686
687 // Enrich with QUIC connection-level stats if a pooled connection exists.
688 let (rtt_ms, bytes_sent, bytes_received, lost_packets, sent_packets, congestion_window) =
689 if let Some(pooled) = self.inner.pool.get_existing(pk, crate::ALPN).await {
690 let s = pooled.conn.stats();
691 let rtt = pooled.conn.rtt(iroh::endpoint::PathId::ZERO);
692 (
693 rtt.map(|d| d.as_secs_f64() * 1000.0),
694 Some(s.udp_tx.bytes),
695 Some(s.udp_rx.bytes),
696 None, // quinn path stats not exposed via iroh ConnectionStats
697 None, // quinn path stats not exposed via iroh ConnectionStats
698 None, // quinn path stats not exposed via iroh ConnectionStats
699 )
700 } else {
701 (None, None, None, None, None, None)
702 };
703
704 Some(PeerStats {
705 relay: has_active_relay,
706 relay_url: active_relay_url,
707 paths,
708 rtt_ms,
709 bytes_sent,
710 bytes_received,
711 lost_packets,
712 sent_packets,
713 congestion_window,
714 })
715 }
716
717 /// Take the transport event receiver, handing it off to a platform drain task.
718 ///
719 /// May only be called once per endpoint. The drain task owns the receiver and
720 /// loops until `event_tx` is dropped (i.e. the endpoint closes). Returns `None`
721 /// if the receiver was already taken (i.e. `subscribe_events` was called before).
722 pub fn subscribe_events(
723 &self,
724 ) -> Option<tokio::sync::mpsc::Receiver<crate::events::TransportEvent>> {
725 self.inner
726 .event_rx
727 .lock()
728 .unwrap_or_else(|e| e.into_inner())
729 .take()
730 }
731
732 /// Subscribe to path changes for a specific peer.
733 ///
734 /// Spawns a background watcher task the first time a given peer is subscribed.
735 /// The watcher polls `peer_stats()` every 200 ms and emits on the returned
736 /// channel whenever the active path changes.
737 pub fn subscribe_path_changes(
738 &self,
739 node_id_str: &str,
740 ) -> tokio::sync::mpsc::UnboundedReceiver<PathInfo> {
741 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
742 // Replace any existing sender; old watcher exits when it detects is_closed().
743 self.inner.path_subs.insert(node_id_str.to_string(), tx);
744
745 let ep = self.clone();
746 let nid = node_id_str.to_string();
747 let event_tx = self.inner.event_tx.clone();
748
749 tokio::spawn(async move {
750 let mut last_key: Option<String> = None;
751 let mut closed_rx = ep.inner.closed_rx.clone();
752 loop {
753 // Exit immediately if the endpoint has been closed.
754 if *closed_rx.borrow() {
755 ep.inner.path_subs.remove(&nid);
756 break;
757 }
758 let is_closed = ep
759 .inner
760 .path_subs
761 .get(&nid)
762 .map(|s| s.is_closed())
763 .unwrap_or(true);
764 if is_closed {
765 ep.inner.path_subs.remove(&nid);
766 break;
767 }
768
769 if let Some(stats) = ep.peer_stats(&nid).await {
770 if let Some(active) = stats.paths.iter().find(|p| p.active) {
771 let key = format!("{}:{}", active.relay, active.addr);
772 if Some(&key) != last_key.as_ref() {
773 last_key = Some(key);
774 if let Some(sender) = ep.inner.path_subs.get(&nid) {
775 let _ = sender.send(active.clone());
776 }
777 let _ = event_tx.try_send(crate::events::TransportEvent::path_change(
778 &nid,
779 &active.addr,
780 active.relay,
781 ));
782 }
783 }
784 }
785
786 // Sleep 200 ms, but wake early if the endpoint is being closed.
787 tokio::select! {
788 _ = tokio::time::sleep(std::time::Duration::from_millis(200)) => {}
789 result = closed_rx.wait_for(|v| *v) => {
790 let _ = result;
791 ep.inner.path_subs.remove(&nid);
792 break;
793 }
794 }
795 }
796 });
797
798 rx
799 }
800}
801
802// ── Bind-error classification ────────────────────────────────────────────────
803
804/// Classify a bind error into a prefixed string for the JS error mapper.
805fn classify_bind_error(e: impl std::fmt::Display) -> crate::CoreError {
806 let msg = e.to_string();
807 crate::CoreError::connection_failed(msg)
808}
809
810// ── NodeAddr info ────────────────────────────────────────────────────────────
811
812/// Serialisable node address: node ID + relay and direct addresses.
813#[derive(Debug, Clone, Serialize, Deserialize)]
814pub struct NodeAddrInfo {
815 /// Base32-encoded public key.
816 pub id: String,
817 /// Relay URLs and/or `ip:port` direct addresses.
818 pub addrs: Vec<String>,
819}
820
821// ── Observability types ──────────────────────────────────────────────────────
822
823/// Endpoint-level observability snapshot.
824///
825/// Returned by [`IrohEndpoint::endpoint_stats`]. All counts are point-in-time reads
826/// and may change between calls.
827#[derive(Debug, Clone, Serialize, Deserialize, Default)]
828#[serde(rename_all = "camelCase")]
829pub struct EndpointStats {
830 /// Number of currently open body reader handles.
831 pub active_readers: usize,
832 /// Number of currently open body writer handles.
833 pub active_writers: usize,
834 /// Number of live QUIC sessions (WebTransport connections).
835 pub active_sessions: usize,
836 /// Total number of allocated (reader + writer + session + other) handles.
837 pub total_handles: usize,
838 /// Number of QUIC connections currently cached in the connection pool.
839 pub pool_size: u64,
840 /// Number of live QUIC connections accepted by the serve loop.
841 pub active_connections: usize,
842 /// Number of HTTP requests currently being processed.
843 pub active_requests: usize,
844}
845
846/// A connection lifecycle event fired when a QUIC peer connection opens or closes.
847#[derive(Debug, Clone, Serialize, Deserialize)]
848#[serde(rename_all = "camelCase")]
849pub struct ConnectionEvent {
850 /// Base32-encoded public key of the peer.
851 pub peer_id: String,
852 /// `true` when this is the first connection from the peer (0→1), `false` when the last one closes (1→0).
853 pub connected: bool,
854}
855
856/// Per-peer connection statistics.
857#[derive(Debug, Clone, Serialize, Deserialize)]
858pub struct PeerStats {
859 /// Whether the peer is connected via a relay server (vs direct).
860 pub relay: bool,
861 /// Active relay URL, if any.
862 pub relay_url: Option<String>,
863 /// All known paths to this peer.
864 pub paths: Vec<PathInfo>,
865 /// Round-trip time in milliseconds. `None` if no active QUIC connection is pooled.
866 pub rtt_ms: Option<f64>,
867 /// Total UDP bytes sent to this peer. `None` if no active QUIC connection is pooled.
868 pub bytes_sent: Option<u64>,
869 /// Total UDP bytes received from this peer. `None` if no active QUIC connection is pooled.
870 pub bytes_received: Option<u64>,
871 /// Total packets lost on the QUIC path. `None` if no active QUIC connection is pooled.
872 pub lost_packets: Option<u64>,
873 /// Total packets sent on the QUIC path. `None` if no active QUIC connection is pooled.
874 pub sent_packets: Option<u64>,
875 /// Current congestion window in bytes. `None` if no active QUIC connection is pooled.
876 pub congestion_window: Option<u64>,
877}
878
879/// Network path information for a single transport address.
880#[derive(Debug, Clone, Serialize, Deserialize)]
881pub struct PathInfo {
882 /// Whether this path goes through a relay server.
883 pub relay: bool,
884 /// The relay URL (if relay) or `ip:port` (if direct).
885 pub addr: String,
886 /// Whether this is the currently selected/active path.
887 pub active: bool,
888}
889
890/// Parse an optional list of socket address strings into `SocketAddr` values.
891///
892/// Returns `Err` if any string cannot be parsed as a `host:port` address so
893/// that callers can surface misconfiguration rather than silently ignoring it.
894pub fn parse_direct_addrs(
895 addrs: &Option<Vec<String>>,
896) -> Result<Option<Vec<std::net::SocketAddr>>, String> {
897 match addrs {
898 None => Ok(None),
899 Some(v) => {
900 let mut out = Vec::with_capacity(v.len());
901 for s in v {
902 let addr = s
903 .parse::<std::net::SocketAddr>()
904 .map_err(|e| format!("invalid direct address {s:?}: {e}"))?;
905 out.push(addr);
906 }
907 Ok(Some(out))
908 }
909 }
910}