1use iroh::address_lookup::{DnsAddressLookup, PkarrPublisher};
4use iroh::endpoint::{IdleTimeout, QuicTransportConfig, TransportAddrUsage};
5use iroh::{Endpoint, RelayMode, SecretKey};
6use serde::{Deserialize, Serialize};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use crate::pool::ConnectionPool;
12use crate::server::ServeHandle;
13use crate::stream::{HandleStore, StoreConfig};
14use crate::{ALPN, ALPN_DUPLEX};
15
16#[derive(Debug, Clone, Default)]
18pub struct NetworkingOptions {
19 pub relay_mode: Option<String>,
21 pub relays: Vec<String>,
23 pub bind_addrs: Vec<String>,
25 pub idle_timeout_ms: Option<u64>,
27 pub proxy_url: Option<String>,
29 pub proxy_from_env: bool,
31 pub disabled: bool,
34}
35
36#[derive(Debug, Clone)]
38pub struct DiscoveryOptions {
39 pub dns_server: Option<String>,
41 pub enabled: bool,
43}
44
45impl Default for DiscoveryOptions {
46 fn default() -> Self { Self { dns_server: None, enabled: true } }
47}
48
49#[derive(Debug, Clone, Default)]
51pub struct PoolOptions {
52 pub max_connections: Option<usize>,
54 pub idle_timeout_ms: Option<u64>,
56}
57
58#[derive(Debug, Clone, Default)]
60pub struct StreamingOptions {
61 pub channel_capacity: Option<usize>,
63 pub max_chunk_size_bytes: Option<usize>,
65 pub drain_timeout_ms: Option<u64>,
67 pub handle_ttl_ms: Option<u64>,
69}
70
71#[derive(Debug, Clone)]
73pub struct NodeOptions {
74 pub key: Option<[u8; 32]>,
76 pub networking: NetworkingOptions,
78 pub discovery: DiscoveryOptions,
80 pub pool: PoolOptions,
82 pub streaming: StreamingOptions,
84 pub capabilities: Vec<String>,
86 pub keylog: bool,
88 pub max_header_size: Option<usize>,
90 pub server_limits: crate::server::ServerLimits,
92 #[cfg(feature = "compression")]
93 pub compression: Option<CompressionOptions>,
94}
95
96impl Default for NodeOptions {
97 fn default() -> Self {
98 Self {
99 key: None,
100 networking: NetworkingOptions::default(),
101 discovery: DiscoveryOptions::default(),
102 pool: PoolOptions::default(),
103 streaming: StreamingOptions::default(),
104 capabilities: Vec::new(),
105 keylog: false,
106 max_header_size: None,
107 server_limits: crate::server::ServerLimits::default(),
108 #[cfg(feature = "compression")]
109 compression: None,
110 }
111 }
112}
113
114#[cfg(feature = "compression")]
117#[derive(Debug, Clone)]
118pub struct CompressionOptions {
119 pub min_body_bytes: usize,
121 pub level: Option<u32>,
123}
124
125
126#[derive(Clone)]
131pub struct IrohEndpoint {
132 pub(crate) inner: Arc<EndpointInner>,
133}
134
135pub(crate) struct EndpointInner {
136 pub ep: Endpoint,
137 pub node_id_str: String,
139 pub pool: ConnectionPool,
141 pub max_header_size: usize,
143 pub server_limits: crate::server::ServerLimits,
145 pub handles: HandleStore,
148 pub serve_handle: std::sync::Mutex<Option<ServeHandle>>,
150 pub serve_done_rx: std::sync::Mutex<Option<tokio::sync::watch::Receiver<bool>>>,
154 pub closed_tx: tokio::sync::watch::Sender<bool>,
157 pub closed_rx: tokio::sync::watch::Receiver<bool>,
158 pub active_connections: Arc<AtomicUsize>,
161 pub active_requests: Arc<AtomicUsize>,
164 #[cfg(feature = "compression")]
166 pub compression: Option<CompressionOptions>,
167}
168
169impl IrohEndpoint {
170 pub async fn bind(opts: NodeOptions) -> Result<Self, crate::CoreError> {
172 if opts.networking.disabled
175 && opts.networking.relay_mode.as_deref()
176 .map_or(false, |m| !matches!(m, "disabled"))
177 {
178 return Err(crate::CoreError::invalid_input(
179 "networking.disabled is true but relay_mode is set to a non-disabled value; \
180 set relay_mode to \"disabled\" or omit it when networking.disabled is true",
181 ));
182 }
183
184 let relay_mode = if opts.networking.disabled {
185 RelayMode::Disabled
186 } else {
187 match opts.networking.relay_mode.as_deref() {
188 None | Some("default") => RelayMode::Default,
189 Some("staging") => RelayMode::Staging,
190 Some("disabled") => RelayMode::Disabled,
191 Some("custom") => {
192 if opts.networking.relays.is_empty() {
193 return Err(crate::CoreError::invalid_input(
194 "relay_mode \"custom\" requires at least one URL in `relays`",
195 ));
196 }
197 let urls = opts
198 .networking
199 .relays
200 .iter()
201 .map(|u| {
202 u.parse::<iroh::RelayUrl>()
203 .map_err(crate::CoreError::invalid_input)
204 })
205 .collect::<Result<Vec<_>, _>>()?;
206 RelayMode::custom(urls)
207 }
208 Some(other) => {
209 return Err(crate::CoreError::invalid_input(format!(
210 "unknown relay_mode: {other}"
211 )))
212 }
213 }
214 };
215
216 let alpns: Vec<Vec<u8>> = if opts.capabilities.is_empty() {
217 vec![ALPN_DUPLEX.to_vec(), ALPN.to_vec()]
219 } else {
220 let mut list: Vec<Vec<u8>> = opts
221 .capabilities
222 .iter()
223 .map(|c| c.as_bytes().to_vec())
224 .collect();
225 if !list.iter().any(|a| a == ALPN) {
227 list.push(ALPN.to_vec());
228 }
229 list
230 };
231
232 let mut builder = Endpoint::empty_builder(relay_mode).alpns(alpns);
233
234 if !opts.networking.disabled && opts.discovery.enabled {
236 if let Some(ref url_str) = opts.discovery.dns_server {
237 let url: url::Url = url_str.parse().map_err(|e| {
238 crate::CoreError::invalid_input(format!("invalid dns_discovery URL: {e}"))
239 })?;
240 builder = builder
241 .address_lookup(PkarrPublisher::builder(url.clone()))
242 .address_lookup(DnsAddressLookup::builder(
243 url.host_str().unwrap_or_default().to_string(),
244 ));
245 } else {
246 builder = builder
247 .address_lookup(PkarrPublisher::n0_dns())
248 .address_lookup(DnsAddressLookup::n0_dns());
249 }
250 }
251
252 if let Some(key_bytes) = opts.key {
253 builder = builder.secret_key(SecretKey::from_bytes(&key_bytes));
254 }
255
256 if let Some(ms) = opts.networking.idle_timeout_ms {
257 let timeout = IdleTimeout::try_from(Duration::from_millis(ms)).map_err(|e| {
258 crate::CoreError::invalid_input(format!("idle_timeout_ms out of range: {e}"))
259 })?;
260 let transport = QuicTransportConfig::builder()
261 .max_idle_timeout(Some(timeout))
262 .build();
263 builder = builder.transport_config(transport);
264 }
265
266 for addr_str in &opts.networking.bind_addrs {
268 let sock: std::net::SocketAddr = addr_str.parse().map_err(|e| {
269 crate::CoreError::invalid_input(format!("invalid bind address \"{addr_str}\": {e}"))
270 })?;
271 builder = builder.bind_addr(sock).map_err(|e| {
272 crate::CoreError::invalid_input(format!("bind address \"{addr_str}\": {e}"))
273 })?;
274 }
275
276 if let Some(ref proxy) = opts.networking.proxy_url {
278 let url: url::Url = proxy
279 .parse()
280 .map_err(|e| crate::CoreError::invalid_input(format!("invalid proxy URL: {e}")))?;
281 builder = builder.proxy_url(url);
282 } else if opts.networking.proxy_from_env {
283 builder = builder.proxy_from_env();
284 }
285
286 if opts.keylog {
288 builder = builder.keylog(true);
289 }
290
291 let ep = builder.bind().await.map_err(classify_bind_error)?;
292
293 let node_id_str = crate::base32_encode(ep.id().as_bytes());
294
295 let store_config = StoreConfig {
296 channel_capacity: opts
297 .streaming
298 .channel_capacity
299 .unwrap_or(crate::stream::DEFAULT_CHANNEL_CAPACITY)
300 .max(1),
301 max_chunk_size: opts
302 .streaming
303 .max_chunk_size_bytes
304 .unwrap_or(crate::stream::DEFAULT_MAX_CHUNK_SIZE)
305 .max(1),
306 drain_timeout: Duration::from_millis(
307 opts.streaming
308 .drain_timeout_ms
309 .unwrap_or(crate::stream::DEFAULT_DRAIN_TIMEOUT_MS),
310 ),
311 max_handles: crate::stream::DEFAULT_MAX_HANDLES,
312 ttl: Duration::from_millis(
313 opts.streaming
314 .handle_ttl_ms
315 .unwrap_or(crate::stream::DEFAULT_SLAB_TTL_MS),
316 ),
317 };
318 let sweep_ttl = store_config.ttl;
319 let (closed_tx, closed_rx) = tokio::sync::watch::channel(false);
320
321 let inner = Arc::new(EndpointInner {
322 ep,
323 node_id_str,
324 pool: ConnectionPool::new(
325 opts.pool.max_connections,
326 opts.pool.idle_timeout_ms
327 .map(std::time::Duration::from_millis),
328 ),
329 max_header_size: match opts.max_header_size {
332 None | Some(0) => 64 * 1024,
333 Some(n) => n,
334 },
335 server_limits: {
336 let mut sl = opts.server_limits.clone();
337 if sl.max_consecutive_errors.is_none() {
338 sl.max_consecutive_errors = Some(5);
339 }
340 sl
341 },
342 handles: HandleStore::new(store_config),
343 serve_handle: std::sync::Mutex::new(None),
344 serve_done_rx: std::sync::Mutex::new(None),
345 closed_tx,
346 closed_rx,
347 active_connections: Arc::new(AtomicUsize::new(0)),
348 active_requests: Arc::new(AtomicUsize::new(0)),
349 #[cfg(feature = "compression")]
350 compression: opts.compression,
351 });
352
353 if !sweep_ttl.is_zero() {
355 let weak = Arc::downgrade(&inner);
356 tokio::spawn(async move {
357 let mut ticker = tokio::time::interval(Duration::from_secs(60));
358 loop {
359 ticker.tick().await;
360 let Some(inner) = weak.upgrade() else {
361 break;
362 };
363 inner.handles.sweep(sweep_ttl);
364 drop(inner); }
366 });
367 }
368
369 Ok(Self { inner })
370 }
371
372 pub fn node_id(&self) -> &str {
374 &self.inner.node_id_str
375 }
376
377 pub fn max_consecutive_errors(&self) -> usize {
379 self.inner.server_limits.max_consecutive_errors.unwrap_or(5)
380 }
381
382 pub fn serve_options(&self) -> crate::server::ServeOptions {
387 self.inner.server_limits.clone()
388 }
389
390 pub fn secret_key_bytes(&self) -> [u8; 32] {
392 self.inner.ep.secret_key().to_bytes()
393 }
394
395 pub async fn close(&self) {
403 let handle = self
407 .inner
408 .serve_handle
409 .lock()
410 .unwrap_or_else(|e| e.into_inner())
411 .take();
412 if let Some(h) = handle {
413 h.drain().await;
414 }
415 self.inner.ep.close().await;
416 let _ = self.inner.closed_tx.send(true);
417 }
418
419 pub async fn close_force(&self) {
422 let handle = self
423 .inner
424 .serve_handle
425 .lock()
426 .unwrap_or_else(|e| e.into_inner())
427 .take();
428 if let Some(h) = handle {
429 h.abort();
430 }
431 self.inner.ep.close().await;
432 let _ = self.inner.closed_tx.send(true);
433 }
434
435 pub async fn wait_closed(&self) {
440 let mut rx = self.inner.closed_rx.clone();
441 let _ = rx.wait_for(|v| *v).await;
442 }
443
444 pub fn set_serve_handle(&self, handle: ServeHandle) {
446 *self
447 .inner
448 .serve_done_rx
449 .lock()
450 .unwrap_or_else(|e| e.into_inner()) = Some(handle.subscribe_done());
451 *self
452 .inner
453 .serve_handle
454 .lock()
455 .unwrap_or_else(|e| e.into_inner()) = Some(handle);
456 }
457
458 pub fn stop_serve(&self) {
463 if let Some(h) = self
464 .inner
465 .serve_handle
466 .lock()
467 .unwrap_or_else(|e| e.into_inner())
468 .as_ref()
469 {
470 h.shutdown();
471 }
472 }
473
474 pub async fn wait_serve_stop(&self) {
478 let rx = self
479 .inner
480 .serve_done_rx
481 .lock()
482 .unwrap_or_else(|e| e.into_inner())
483 .clone();
484 if let Some(mut rx) = rx {
485 let _ = rx.wait_for(|v| *v).await;
488 }
489 }
490
491 pub fn raw(&self) -> &Endpoint {
492 &self.inner.ep
493 }
494
495 pub fn handles(&self) -> &HandleStore {
497 &self.inner.handles
498 }
499
500 pub fn max_header_size(&self) -> usize {
502 self.inner.max_header_size
503 }
504
505 pub(crate) fn pool(&self) -> &ConnectionPool {
507 &self.inner.pool
508 }
509
510 pub fn endpoint_stats(&self) -> EndpointStats {
514 let (active_readers, active_writers, active_sessions, total_handles) =
515 self.inner.handles.count_handles();
516 let pool_size = self.inner.pool.entry_count_approx();
517 let active_connections = self.inner.active_connections.load(Ordering::Relaxed);
518 let active_requests = self.inner.active_requests.load(Ordering::Relaxed);
519 EndpointStats {
520 active_readers,
521 active_writers,
522 active_sessions,
523 total_handles,
524 pool_size,
525 active_connections,
526 active_requests,
527 }
528 }
529
530 #[cfg(feature = "compression")]
532 pub fn compression(&self) -> Option<&CompressionOptions> {
533 self.inner.compression.as_ref()
534 }
535
536 pub fn bound_sockets(&self) -> Vec<std::net::SocketAddr> {
538 self.inner.ep.bound_sockets()
539 }
540
541 pub fn node_addr(&self) -> NodeAddrInfo {
543 let addr = self.inner.ep.addr();
544 let mut addrs = Vec::new();
545 for relay in addr.relay_urls() {
546 addrs.push(relay.to_string());
547 }
548 for da in addr.ip_addrs() {
549 addrs.push(da.to_string());
550 }
551 NodeAddrInfo {
552 id: self.inner.node_id_str.clone(),
553 addrs,
554 }
555 }
556
557 pub fn home_relay(&self) -> Option<String> {
559 self.inner
560 .ep
561 .addr()
562 .relay_urls()
563 .next()
564 .map(|u| u.to_string())
565 }
566
567 pub async fn peer_info(&self, node_id_b32: &str) -> Option<NodeAddrInfo> {
569 let bytes = crate::base32_decode(node_id_b32).ok()?;
570 let arr: [u8; 32] = bytes.try_into().ok()?;
571 let pk = iroh::PublicKey::from_bytes(&arr).ok()?;
572 let info = self.inner.ep.remote_info(pk).await?;
573 let id = crate::base32_encode(info.id().as_bytes());
574 let mut addrs = Vec::new();
575 for a in info.addrs() {
576 match a.addr() {
577 iroh::TransportAddr::Ip(sock) => addrs.push(sock.to_string()),
578 iroh::TransportAddr::Relay(url) => addrs.push(url.to_string()),
579 other => addrs.push(format!("{:?}", other)),
580 }
581 }
582 Some(NodeAddrInfo { id, addrs })
583 }
584
585 pub async fn peer_stats(&self, node_id_b32: &str) -> Option<PeerStats> {
590 let bytes = crate::base32_decode(node_id_b32).ok()?;
591 let arr: [u8; 32] = bytes.try_into().ok()?;
592 let pk = iroh::PublicKey::from_bytes(&arr).ok()?;
593 let info = self.inner.ep.remote_info(pk).await?;
594
595 let mut paths = Vec::new();
596 let mut has_active_relay = false;
597 let mut active_relay_url: Option<String> = None;
598
599 for a in info.addrs() {
600 let is_relay = a.addr().is_relay();
601 let is_active = matches!(a.usage(), TransportAddrUsage::Active);
602
603 let addr_str = match a.addr() {
604 iroh::TransportAddr::Ip(sock) => sock.to_string(),
605 iroh::TransportAddr::Relay(url) => {
606 if is_active {
607 has_active_relay = true;
608 active_relay_url = Some(url.to_string());
609 }
610 url.to_string()
611 }
612 other => format!("{:?}", other),
613 };
614
615 paths.push(PathInfo {
616 relay: is_relay,
617 addr: addr_str,
618 active: is_active,
619 });
620 }
621
622 let (rtt_ms, bytes_sent, bytes_received, lost_packets, sent_packets, congestion_window) =
624 if let Some(pooled) = self.inner.pool.get_existing(pk, crate::ALPN).await {
625 let s = pooled.conn.stats();
626 let rtt = pooled.conn.rtt(iroh::endpoint::PathId::ZERO);
627 (
628 rtt.map(|d| d.as_secs_f64() * 1000.0),
629 Some(s.udp_tx.bytes),
630 Some(s.udp_rx.bytes),
631 None, None, None, )
635 } else {
636 (None, None, None, None, None, None)
637 };
638
639 Some(PeerStats {
640 relay: has_active_relay,
641 relay_url: active_relay_url,
642 paths,
643 rtt_ms,
644 bytes_sent,
645 bytes_received,
646 lost_packets,
647 sent_packets,
648 congestion_window,
649 })
650 }
651}
652
653fn classify_bind_error(e: impl std::fmt::Display) -> crate::CoreError {
657 let msg = e.to_string();
658 crate::CoreError::connection_failed(msg)
659}
660
661#[derive(Debug, Clone, Serialize, Deserialize)]
665pub struct NodeAddrInfo {
666 pub id: String,
668 pub addrs: Vec<String>,
670}
671
672#[derive(Debug, Clone, Serialize, Deserialize, Default)]
679#[serde(rename_all = "camelCase")]
680pub struct EndpointStats {
681 pub active_readers: usize,
683 pub active_writers: usize,
685 pub active_sessions: usize,
687 pub total_handles: usize,
689 pub pool_size: u64,
691 pub active_connections: usize,
693 pub active_requests: usize,
695}
696
697#[derive(Debug, Clone, Serialize, Deserialize)]
699#[serde(rename_all = "camelCase")]
700pub struct ConnectionEvent {
701 pub peer_id: String,
703 pub connected: bool,
705}
706
707#[derive(Debug, Clone, Serialize, Deserialize)]
709pub struct PeerStats {
710 pub relay: bool,
712 pub relay_url: Option<String>,
714 pub paths: Vec<PathInfo>,
716 pub rtt_ms: Option<f64>,
718 pub bytes_sent: Option<u64>,
720 pub bytes_received: Option<u64>,
722 pub lost_packets: Option<u64>,
724 pub sent_packets: Option<u64>,
726 pub congestion_window: Option<u64>,
728}
729
730#[derive(Debug, Clone, Serialize, Deserialize)]
732pub struct PathInfo {
733 pub relay: bool,
735 pub addr: String,
737 pub active: bool,
739}
740
741pub fn parse_direct_addrs(
746 addrs: &Option<Vec<String>>,
747) -> Result<Option<Vec<std::net::SocketAddr>>, String> {
748 match addrs {
749 None => Ok(None),
750 Some(v) => {
751 let mut out = Vec::with_capacity(v.len());
752 for s in v {
753 let addr = s
754 .parse::<std::net::SocketAddr>()
755 .map_err(|e| format!("invalid direct address {s:?}: {e}"))?;
756 out.push(addr);
757 }
758 Ok(Some(out))
759 }
760 }
761}