1use iroh::address_lookup::{DnsAddressLookup, PkarrPublisher};
4use iroh::endpoint::{IdleTimeout, QuicTransportConfig, TransportAddrUsage};
5use iroh::{Endpoint, RelayMode, SecretKey};
6use serde::{Deserialize, Serialize};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use crate::pool::ConnectionPool;
12use crate::server::ServeHandle;
13use crate::stream::{HandleStore, StoreConfig};
14use crate::{ALPN, ALPN_DUPLEX};
15
16#[derive(Debug, Clone, Default)]
18pub struct NetworkingOptions {
19 pub relay_mode: Option<String>,
21 pub relays: Vec<String>,
23 pub bind_addrs: Vec<String>,
25 pub idle_timeout_ms: Option<u64>,
27 pub proxy_url: Option<String>,
29 pub proxy_from_env: bool,
31 pub disabled: bool,
34}
35
36#[derive(Debug, Clone)]
38pub struct DiscoveryOptions {
39 pub dns_server: Option<String>,
41 pub enabled: bool,
43}
44
45impl Default for DiscoveryOptions {
46 fn default() -> Self {
47 Self {
48 dns_server: None,
49 enabled: true,
50 }
51 }
52}
53
54#[derive(Debug, Clone, Default)]
56pub struct PoolOptions {
57 pub max_connections: Option<usize>,
59 pub idle_timeout_ms: Option<u64>,
61}
62
63#[derive(Debug, Clone, Default)]
65pub struct StreamingOptions {
66 pub channel_capacity: Option<usize>,
68 pub max_chunk_size_bytes: Option<usize>,
70 pub drain_timeout_ms: Option<u64>,
72 pub handle_ttl_ms: Option<u64>,
74}
75
76#[derive(Debug, Clone, Default)]
78pub struct NodeOptions {
79 pub key: Option<[u8; 32]>,
81 pub networking: NetworkingOptions,
83 pub discovery: DiscoveryOptions,
85 pub pool: PoolOptions,
87 pub streaming: StreamingOptions,
89 pub capabilities: Vec<String>,
91 pub keylog: bool,
93 pub max_header_size: Option<usize>,
95 pub server_limits: crate::server::ServerLimits,
97 #[cfg(feature = "compression")]
98 pub compression: Option<CompressionOptions>,
99}
100
101#[cfg(feature = "compression")]
104#[derive(Debug, Clone)]
105pub struct CompressionOptions {
106 pub min_body_bytes: usize,
108 pub level: Option<u32>,
110}
111
112#[derive(Clone)]
117pub struct IrohEndpoint {
118 pub(crate) inner: Arc<EndpointInner>,
119}
120
121pub(crate) struct EndpointInner {
122 pub ep: Endpoint,
123 pub node_id_str: String,
125 pub pool: ConnectionPool,
127 pub max_header_size: usize,
129 pub server_limits: crate::server::ServerLimits,
131 pub handles: HandleStore,
134 pub serve_handle: std::sync::Mutex<Option<ServeHandle>>,
136 pub serve_done_rx: std::sync::Mutex<Option<tokio::sync::watch::Receiver<bool>>>,
140 pub closed_tx: tokio::sync::watch::Sender<bool>,
143 pub closed_rx: tokio::sync::watch::Receiver<bool>,
144 pub active_connections: Arc<AtomicUsize>,
147 pub active_requests: Arc<AtomicUsize>,
150 #[cfg(feature = "compression")]
152 pub compression: Option<CompressionOptions>,
153}
154
155impl IrohEndpoint {
156 pub async fn bind(opts: NodeOptions) -> Result<Self, crate::CoreError> {
158 if opts.networking.disabled
161 && opts
162 .networking
163 .relay_mode
164 .as_deref()
165 .is_some_and(|m| !matches!(m, "disabled"))
166 {
167 return Err(crate::CoreError::invalid_input(
168 "networking.disabled is true but relay_mode is set to a non-disabled value; \
169 set relay_mode to \"disabled\" or omit it when networking.disabled is true",
170 ));
171 }
172
173 let relay_mode = if opts.networking.disabled {
174 RelayMode::Disabled
175 } else {
176 match opts.networking.relay_mode.as_deref() {
177 None | Some("default") => RelayMode::Default,
178 Some("staging") => RelayMode::Staging,
179 Some("disabled") => RelayMode::Disabled,
180 Some("custom") => {
181 if opts.networking.relays.is_empty() {
182 return Err(crate::CoreError::invalid_input(
183 "relay_mode \"custom\" requires at least one URL in `relays`",
184 ));
185 }
186 let urls = opts
187 .networking
188 .relays
189 .iter()
190 .map(|u| {
191 u.parse::<iroh::RelayUrl>()
192 .map_err(crate::CoreError::invalid_input)
193 })
194 .collect::<Result<Vec<_>, _>>()?;
195 RelayMode::custom(urls)
196 }
197 Some(other) => {
198 return Err(crate::CoreError::invalid_input(format!(
199 "unknown relay_mode: {other}"
200 )))
201 }
202 }
203 };
204
205 let alpns: Vec<Vec<u8>> = if opts.capabilities.is_empty() {
206 vec![ALPN_DUPLEX.to_vec(), ALPN.to_vec()]
208 } else {
209 let mut list: Vec<Vec<u8>> = opts
210 .capabilities
211 .iter()
212 .map(|c| c.as_bytes().to_vec())
213 .collect();
214 if !list.iter().any(|a| a == ALPN) {
216 list.push(ALPN.to_vec());
217 }
218 list
219 };
220
221 let mut builder = Endpoint::empty_builder(relay_mode).alpns(alpns);
222
223 if !opts.networking.disabled && opts.discovery.enabled {
225 if let Some(ref url_str) = opts.discovery.dns_server {
226 let url: url::Url = url_str.parse().map_err(|e| {
227 crate::CoreError::invalid_input(format!("invalid dns_discovery URL: {e}"))
228 })?;
229 builder = builder
230 .address_lookup(PkarrPublisher::builder(url.clone()))
231 .address_lookup(DnsAddressLookup::builder(
232 url.host_str().unwrap_or_default().to_string(),
233 ));
234 } else {
235 builder = builder
236 .address_lookup(PkarrPublisher::n0_dns())
237 .address_lookup(DnsAddressLookup::n0_dns());
238 }
239 }
240
241 if let Some(key_bytes) = opts.key {
242 builder = builder.secret_key(SecretKey::from_bytes(&key_bytes));
243 }
244
245 if let Some(ms) = opts.networking.idle_timeout_ms {
246 let timeout = IdleTimeout::try_from(Duration::from_millis(ms)).map_err(|e| {
247 crate::CoreError::invalid_input(format!("idle_timeout_ms out of range: {e}"))
248 })?;
249 let transport = QuicTransportConfig::builder()
250 .max_idle_timeout(Some(timeout))
251 .build();
252 builder = builder.transport_config(transport);
253 }
254
255 for addr_str in &opts.networking.bind_addrs {
257 let sock: std::net::SocketAddr = addr_str.parse().map_err(|e| {
258 crate::CoreError::invalid_input(format!("invalid bind address \"{addr_str}\": {e}"))
259 })?;
260 builder = builder.bind_addr(sock).map_err(|e| {
261 crate::CoreError::invalid_input(format!("bind address \"{addr_str}\": {e}"))
262 })?;
263 }
264
265 if let Some(ref proxy) = opts.networking.proxy_url {
267 let url: url::Url = proxy
268 .parse()
269 .map_err(|e| crate::CoreError::invalid_input(format!("invalid proxy URL: {e}")))?;
270 builder = builder.proxy_url(url);
271 } else if opts.networking.proxy_from_env {
272 builder = builder.proxy_from_env();
273 }
274
275 if opts.keylog {
277 builder = builder.keylog(true);
278 }
279
280 let ep = builder.bind().await.map_err(classify_bind_error)?;
281
282 let node_id_str = crate::base32_encode(ep.id().as_bytes());
283
284 let store_config = StoreConfig {
285 channel_capacity: opts
286 .streaming
287 .channel_capacity
288 .unwrap_or(crate::stream::DEFAULT_CHANNEL_CAPACITY)
289 .max(1),
290 max_chunk_size: opts
291 .streaming
292 .max_chunk_size_bytes
293 .unwrap_or(crate::stream::DEFAULT_MAX_CHUNK_SIZE)
294 .max(1),
295 drain_timeout: Duration::from_millis(
296 opts.streaming
297 .drain_timeout_ms
298 .unwrap_or(crate::stream::DEFAULT_DRAIN_TIMEOUT_MS),
299 ),
300 max_handles: crate::stream::DEFAULT_MAX_HANDLES,
301 ttl: Duration::from_millis(
302 opts.streaming
303 .handle_ttl_ms
304 .unwrap_or(crate::stream::DEFAULT_SLAB_TTL_MS),
305 ),
306 };
307 let sweep_ttl = store_config.ttl;
308 let (closed_tx, closed_rx) = tokio::sync::watch::channel(false);
309
310 let inner = Arc::new(EndpointInner {
311 ep,
312 node_id_str,
313 pool: ConnectionPool::new(
314 opts.pool.max_connections,
315 opts.pool
316 .idle_timeout_ms
317 .map(std::time::Duration::from_millis),
318 ),
319 max_header_size: match opts.max_header_size {
322 None | Some(0) => 64 * 1024,
323 Some(n) => n,
324 },
325 server_limits: {
326 let mut sl = opts.server_limits.clone();
327 if sl.max_consecutive_errors.is_none() {
328 sl.max_consecutive_errors = Some(5);
329 }
330 sl
331 },
332 handles: HandleStore::new(store_config),
333 serve_handle: std::sync::Mutex::new(None),
334 serve_done_rx: std::sync::Mutex::new(None),
335 closed_tx,
336 closed_rx,
337 active_connections: Arc::new(AtomicUsize::new(0)),
338 active_requests: Arc::new(AtomicUsize::new(0)),
339 #[cfg(feature = "compression")]
340 compression: opts.compression,
341 });
342
343 if !sweep_ttl.is_zero() {
345 let weak = Arc::downgrade(&inner);
346 tokio::spawn(async move {
347 let mut ticker = tokio::time::interval(Duration::from_secs(60));
348 loop {
349 ticker.tick().await;
350 let Some(inner) = weak.upgrade() else {
351 break;
352 };
353 inner.handles.sweep(sweep_ttl);
354 drop(inner); }
356 });
357 }
358
359 Ok(Self { inner })
360 }
361
362 pub fn node_id(&self) -> &str {
364 &self.inner.node_id_str
365 }
366
367 pub fn max_consecutive_errors(&self) -> usize {
369 self.inner.server_limits.max_consecutive_errors.unwrap_or(5)
370 }
371
372 pub fn serve_options(&self) -> crate::server::ServeOptions {
377 self.inner.server_limits.clone()
378 }
379
380 pub fn secret_key_bytes(&self) -> [u8; 32] {
382 self.inner.ep.secret_key().to_bytes()
383 }
384
385 pub async fn close(&self) {
393 let handle = self
397 .inner
398 .serve_handle
399 .lock()
400 .unwrap_or_else(|e| e.into_inner())
401 .take();
402 if let Some(h) = handle {
403 h.drain().await;
404 }
405 self.inner.ep.close().await;
406 let _ = self.inner.closed_tx.send(true);
407 }
408
409 pub async fn close_force(&self) {
412 let handle = self
413 .inner
414 .serve_handle
415 .lock()
416 .unwrap_or_else(|e| e.into_inner())
417 .take();
418 if let Some(h) = handle {
419 h.abort();
420 }
421 self.inner.ep.close().await;
422 let _ = self.inner.closed_tx.send(true);
423 }
424
425 pub async fn wait_closed(&self) {
430 let mut rx = self.inner.closed_rx.clone();
431 let _ = rx.wait_for(|v| *v).await;
432 }
433
434 pub fn set_serve_handle(&self, handle: ServeHandle) {
436 *self
437 .inner
438 .serve_done_rx
439 .lock()
440 .unwrap_or_else(|e| e.into_inner()) = Some(handle.subscribe_done());
441 *self
442 .inner
443 .serve_handle
444 .lock()
445 .unwrap_or_else(|e| e.into_inner()) = Some(handle);
446 }
447
448 pub fn stop_serve(&self) {
453 if let Some(h) = self
454 .inner
455 .serve_handle
456 .lock()
457 .unwrap_or_else(|e| e.into_inner())
458 .as_ref()
459 {
460 h.shutdown();
461 }
462 }
463
464 pub async fn wait_serve_stop(&self) {
468 let rx = self
469 .inner
470 .serve_done_rx
471 .lock()
472 .unwrap_or_else(|e| e.into_inner())
473 .clone();
474 if let Some(mut rx) = rx {
475 let _ = rx.wait_for(|v| *v).await;
478 }
479 }
480
481 pub fn raw(&self) -> &Endpoint {
482 &self.inner.ep
483 }
484
485 pub fn handles(&self) -> &HandleStore {
487 &self.inner.handles
488 }
489
490 pub fn max_header_size(&self) -> usize {
492 self.inner.max_header_size
493 }
494
495 pub(crate) fn pool(&self) -> &ConnectionPool {
497 &self.inner.pool
498 }
499
500 pub fn endpoint_stats(&self) -> EndpointStats {
504 let (active_readers, active_writers, active_sessions, total_handles) =
505 self.inner.handles.count_handles();
506 let pool_size = self.inner.pool.entry_count_approx();
507 let active_connections = self.inner.active_connections.load(Ordering::Relaxed);
508 let active_requests = self.inner.active_requests.load(Ordering::Relaxed);
509 EndpointStats {
510 active_readers,
511 active_writers,
512 active_sessions,
513 total_handles,
514 pool_size,
515 active_connections,
516 active_requests,
517 }
518 }
519
520 #[cfg(feature = "compression")]
522 pub fn compression(&self) -> Option<&CompressionOptions> {
523 self.inner.compression.as_ref()
524 }
525
526 pub fn bound_sockets(&self) -> Vec<std::net::SocketAddr> {
528 self.inner.ep.bound_sockets()
529 }
530
531 pub fn node_addr(&self) -> NodeAddrInfo {
533 let addr = self.inner.ep.addr();
534 let mut addrs = Vec::new();
535 for relay in addr.relay_urls() {
536 addrs.push(relay.to_string());
537 }
538 for da in addr.ip_addrs() {
539 addrs.push(da.to_string());
540 }
541 NodeAddrInfo {
542 id: self.inner.node_id_str.clone(),
543 addrs,
544 }
545 }
546
547 pub fn home_relay(&self) -> Option<String> {
549 self.inner
550 .ep
551 .addr()
552 .relay_urls()
553 .next()
554 .map(|u| u.to_string())
555 }
556
557 pub async fn peer_info(&self, node_id_b32: &str) -> Option<NodeAddrInfo> {
559 let bytes = crate::base32_decode(node_id_b32).ok()?;
560 let arr: [u8; 32] = bytes.try_into().ok()?;
561 let pk = iroh::PublicKey::from_bytes(&arr).ok()?;
562 let info = self.inner.ep.remote_info(pk).await?;
563 let id = crate::base32_encode(info.id().as_bytes());
564 let mut addrs = Vec::new();
565 for a in info.addrs() {
566 match a.addr() {
567 iroh::TransportAddr::Ip(sock) => addrs.push(sock.to_string()),
568 iroh::TransportAddr::Relay(url) => addrs.push(url.to_string()),
569 other => addrs.push(format!("{:?}", other)),
570 }
571 }
572 Some(NodeAddrInfo { id, addrs })
573 }
574
575 pub async fn peer_stats(&self, node_id_b32: &str) -> Option<PeerStats> {
580 let bytes = crate::base32_decode(node_id_b32).ok()?;
581 let arr: [u8; 32] = bytes.try_into().ok()?;
582 let pk = iroh::PublicKey::from_bytes(&arr).ok()?;
583 let info = self.inner.ep.remote_info(pk).await?;
584
585 let mut paths = Vec::new();
586 let mut has_active_relay = false;
587 let mut active_relay_url: Option<String> = None;
588
589 for a in info.addrs() {
590 let is_relay = a.addr().is_relay();
591 let is_active = matches!(a.usage(), TransportAddrUsage::Active);
592
593 let addr_str = match a.addr() {
594 iroh::TransportAddr::Ip(sock) => sock.to_string(),
595 iroh::TransportAddr::Relay(url) => {
596 if is_active {
597 has_active_relay = true;
598 active_relay_url = Some(url.to_string());
599 }
600 url.to_string()
601 }
602 other => format!("{:?}", other),
603 };
604
605 paths.push(PathInfo {
606 relay: is_relay,
607 addr: addr_str,
608 active: is_active,
609 });
610 }
611
612 let (rtt_ms, bytes_sent, bytes_received, lost_packets, sent_packets, congestion_window) =
614 if let Some(pooled) = self.inner.pool.get_existing(pk, crate::ALPN).await {
615 let s = pooled.conn.stats();
616 let rtt = pooled.conn.rtt(iroh::endpoint::PathId::ZERO);
617 (
618 rtt.map(|d| d.as_secs_f64() * 1000.0),
619 Some(s.udp_tx.bytes),
620 Some(s.udp_rx.bytes),
621 None, None, None, )
625 } else {
626 (None, None, None, None, None, None)
627 };
628
629 Some(PeerStats {
630 relay: has_active_relay,
631 relay_url: active_relay_url,
632 paths,
633 rtt_ms,
634 bytes_sent,
635 bytes_received,
636 lost_packets,
637 sent_packets,
638 congestion_window,
639 })
640 }
641}
642
643fn classify_bind_error(e: impl std::fmt::Display) -> crate::CoreError {
647 let msg = e.to_string();
648 crate::CoreError::connection_failed(msg)
649}
650
651#[derive(Debug, Clone, Serialize, Deserialize)]
655pub struct NodeAddrInfo {
656 pub id: String,
658 pub addrs: Vec<String>,
660}
661
662#[derive(Debug, Clone, Serialize, Deserialize, Default)]
669#[serde(rename_all = "camelCase")]
670pub struct EndpointStats {
671 pub active_readers: usize,
673 pub active_writers: usize,
675 pub active_sessions: usize,
677 pub total_handles: usize,
679 pub pool_size: u64,
681 pub active_connections: usize,
683 pub active_requests: usize,
685}
686
687#[derive(Debug, Clone, Serialize, Deserialize)]
689#[serde(rename_all = "camelCase")]
690pub struct ConnectionEvent {
691 pub peer_id: String,
693 pub connected: bool,
695}
696
697#[derive(Debug, Clone, Serialize, Deserialize)]
699pub struct PeerStats {
700 pub relay: bool,
702 pub relay_url: Option<String>,
704 pub paths: Vec<PathInfo>,
706 pub rtt_ms: Option<f64>,
708 pub bytes_sent: Option<u64>,
710 pub bytes_received: Option<u64>,
712 pub lost_packets: Option<u64>,
714 pub sent_packets: Option<u64>,
716 pub congestion_window: Option<u64>,
718}
719
720#[derive(Debug, Clone, Serialize, Deserialize)]
722pub struct PathInfo {
723 pub relay: bool,
725 pub addr: String,
727 pub active: bool,
729}
730
731pub fn parse_direct_addrs(
736 addrs: &Option<Vec<String>>,
737) -> Result<Option<Vec<std::net::SocketAddr>>, String> {
738 match addrs {
739 None => Ok(None),
740 Some(v) => {
741 let mut out = Vec::with_capacity(v.len());
742 for s in v {
743 let addr = s
744 .parse::<std::net::SocketAddr>()
745 .map_err(|e| format!("invalid direct address {s:?}: {e}"))?;
746 out.push(addr);
747 }
748 Ok(Some(out))
749 }
750 }
751}