net_sdk/mesh.rs
1//! Multi-peer mesh handle for the SDK.
2//!
3//! `Mesh` wraps [`MeshNode`] with ergonomic methods for connecting to
4//! peers, sending events, and polling received events. Unlike
5//! [`crate::Net`], which is backed by an `EventBus` + adapter, `Mesh`
6//! manages its own encrypted peer sessions and routing.
7//!
8//! # Example
9//!
10//! ```rust,no_run
11//! use net_sdk::mesh::{Mesh, MeshBuilder};
12//!
13//! # async fn example() -> net_sdk::error::Result<()> {
14//! let mut node = Mesh::builder("127.0.0.1:9000", b"my-32-byte-preshared-key-here!!!")?
15//! .heartbeat_ms(200)
16//! .session_timeout_ms(5000)
17//! .build()
18//! .await?;
19//!
20//! // Connect to a peer
21//! let peer_pubkey = [0u8; 32]; // get from peer.public_key()
22//! node.connect("127.0.0.1:9001", &peer_pubkey, 0x2222).await?;
23//! node.start();
24//!
25//! // Send events
26//! node.send(0x2222, &serde_json::json!({"token": "hello"})).await?;
27//!
28//! // Poll received events
29//! let events = node.recv(100).await?;
30//!
31//! node.shutdown().await?;
32//! # Ok(())
33//! # }
34//! ```
35
36use std::net::SocketAddr;
37use std::sync::Arc;
38use std::time::Duration;
39
40use bytes::Bytes;
41use serde::Serialize;
42
43use net::adapter::net::{
44 AckReason, ChannelConfig, ChannelConfigRegistry, ChannelName, ChannelPublisher, EntityKeypair,
45 MeshNode, MeshNodeConfig, MigrationSubprotocolHandler, PublishConfig, PublishReport, Stream,
46 StreamConfig, StreamStats,
47};
48use net::adapter::Adapter;
49use net::event::StoredEvent;
50
51use crate::error::{Result, SdkError};
52
53/// Options passed to [`Mesh::subscribe_channel_with`].
54///
55/// Today the only knob is a presented
56/// [`PermissionToken`](crate::identity::PermissionToken) — the
57/// shape is a struct rather than a bare `Option<Token>` so future
58/// additions (request-side timeout override, subscribe priority,
59/// etc.) don't break callers.
60///
61/// # Round-trip shape
62///
63/// ```no_run
64/// # use std::time::Duration;
65/// # use net_sdk::{ChannelName, Identity, SubscribeOptions, TokenScope};
66/// # use net_sdk::mesh::MeshBuilder;
67/// # async fn example(
68/// # publisher: &net_sdk::Mesh,
69/// # publisher_identity: &Identity,
70/// # subscriber_entity_id: net::adapter::net::identity::EntityId,
71/// # ) -> net_sdk::error::Result<()> {
72/// // Publisher issues a SUBSCRIBE-scope token for the subscriber.
73/// // The publisher's own `Mesh` is bound to an `Identity`, so the
74/// // token lands in its local cache when `issue_token` is called.
75/// let channel = ChannelName::new("events/trades").unwrap();
76/// let token = publisher_identity.issue_token(
77/// subscriber_entity_id,
78/// TokenScope::SUBSCRIBE,
79/// &channel,
80/// Duration::from_secs(600),
81/// 0, // no further delegation
82/// );
83///
84/// // Subscriber (another `Mesh`) calls `subscribe_channel_with`,
85/// // attaching the same token bytes they received from the
86/// // publisher out of band. The publisher verifies the signature,
87/// // checks `subject == subscriber_entity_id`, installs it in its
88/// // cache, then runs `can_subscribe`.
89/// let subscriber: &net_sdk::Mesh = unimplemented!();
90/// subscriber
91/// .subscribe_channel_with(
92/// publisher.node_id(),
93/// &channel,
94/// SubscribeOptions { token: Some(token) },
95/// )
96/// .await?;
97/// # Ok(())
98/// # }
99/// ```
100#[derive(Default, Debug, Clone)]
101pub struct SubscribeOptions {
102 /// Token to attach to the subscribe request. The publisher
103 /// verifies + installs it before running
104 /// `ChannelConfig::can_subscribe`, so a matching token
105 /// satisfies `require_token` channels end-to-end.
106 pub token: Option<net::adapter::net::PermissionToken>,
107}
108
109/// Builder for configuring a [`Mesh`] node.
110pub struct MeshBuilder {
111 bind_addr: SocketAddr,
112 psk: [u8; 32],
113 heartbeat_interval: Duration,
114 session_timeout: Duration,
115 num_shards: u16,
116 identity: Option<crate::identity::Identity>,
117 subnet: Option<net::adapter::net::SubnetId>,
118 subnet_policy: Option<Arc<net::adapter::net::SubnetPolicy>>,
119 #[cfg(feature = "nat-traversal")]
120 reflex_override: Option<SocketAddr>,
121 #[cfg(feature = "port-mapping")]
122 try_port_mapping: bool,
123}
124
125impl MeshBuilder {
126 /// Create a new builder.
127 pub fn new(bind_addr: &str, psk: &[u8; 32]) -> Result<Self> {
128 let addr: SocketAddr = bind_addr
129 .parse()
130 .map_err(|e| SdkError::Config(format!("invalid bind address: {}", e)))?;
131 Ok(Self {
132 bind_addr: addr,
133 psk: *psk,
134 heartbeat_interval: Duration::from_secs(5),
135 session_timeout: Duration::from_secs(30),
136 num_shards: 4,
137 identity: None,
138 subnet: None,
139 subnet_policy: None,
140 #[cfg(feature = "nat-traversal")]
141 reflex_override: None,
142 #[cfg(feature = "port-mapping")]
143 try_port_mapping: false,
144 })
145 }
146
147 /// Pin this node to a caller-owned [`Identity`](crate::Identity).
148 ///
149 /// Without this call, `build()` generates an ephemeral keypair —
150 /// fine for one-off sessions, but every restart produces a new
151 /// entity id (and therefore a new node id). Provide an identity
152 /// loaded from disk / vault / enclave to keep stable addressing.
153 ///
154 /// The identity's [`TokenCache`](crate::identity::TokenCache) is
155 /// also bound to this mesh; tokens installed via
156 /// [`Identity::install_token`](crate::identity::Identity::install_token)
157 /// become available to the channel auth path at subscribe time.
158 pub fn identity(mut self, identity: crate::identity::Identity) -> Self {
159 self.identity = Some(identity);
160 self
161 }
162
163 /// Set heartbeat interval in milliseconds.
164 pub fn heartbeat_ms(mut self, ms: u64) -> Self {
165 self.heartbeat_interval = Duration::from_millis(ms);
166 self
167 }
168
169 /// Set session timeout in milliseconds.
170 pub fn session_timeout_ms(mut self, ms: u64) -> Self {
171 self.session_timeout = Duration::from_millis(ms);
172 self
173 }
174
175 /// Set number of inbound shards.
176 pub fn shards(mut self, n: u16) -> Self {
177 self.num_shards = n;
178 self
179 }
180
181 /// Pin this node to a specific subnet. Defaults to
182 /// [`SubnetId::GLOBAL`](crate::subnets::SubnetId) — no
183 /// restriction. Visibility checks on the publish + subscribe
184 /// paths compare against this value.
185 pub fn subnet(mut self, id: net::adapter::net::SubnetId) -> Self {
186 self.subnet = Some(id);
187 self
188 }
189
190 /// Install a subnet policy that derives each peer's subnet from
191 /// their capability announcement. Mesh-wide policy consistency
192 /// is assumed — mismatched policies across nodes lead to
193 /// asymmetric views of peer subnets.
194 ///
195 /// Accepts either an owned `SubnetPolicy` or an `Arc<SubnetPolicy>`
196 /// via blanket `Into` support — useful when several builders
197 /// share one policy at node construction time.
198 pub fn subnet_policy(
199 mut self,
200 policy: impl Into<Arc<net::adapter::net::SubnetPolicy>>,
201 ) -> Self {
202 self.subnet_policy = Some(policy.into());
203 self
204 }
205
206 /// Pin this mesh's publicly-advertised reflex `SocketAddr` to
207 /// the supplied external address. The classifier's background
208 /// sweep is skipped; the node starts in `NatClass::Open` with
209 /// `reflex_addr = Some(external)` on outbound capability
210 /// announcements.
211 ///
212 /// Intended for:
213 ///
214 /// - **Port-forwarded servers.** An operator who has manually
215 /// configured a port forward knows the external address and
216 /// shouldn't wait on peer-probing to discover it.
217 /// - **Stage-4 port mapping (UPnP / NAT-PMP / PCP).** A
218 /// successful mapping installation will set this on behalf
219 /// of the caller.
220 ///
221 /// **Optimization, not correctness.** Nodes without an
222 /// override still reach every peer through the routed-
223 /// handshake path — the override just cuts the classifier
224 /// round-trip when the answer is already known.
225 ///
226 /// Requires the `nat-traversal` cargo feature.
227 #[cfg(feature = "nat-traversal")]
228 pub fn reflex_override(mut self, external: SocketAddr) -> Self {
229 self.reflex_override = Some(external);
230 self
231 }
232
233 /// Opt into opportunistic UPnP-IGD / NAT-PMP / PCP port
234 /// mapping at startup. When set, the mesh spawns a
235 /// [`PortMapperTask`](net::adapter::net::traversal::portmap)
236 /// during `start()` that:
237 ///
238 /// 1. Probes NAT-PMP against the OS-discovered default
239 /// gateway (1 s deadline).
240 /// 2. Falls back to UPnP via SSDP discovery (2 s deadline).
241 /// 3. On install success, pins the reflex override to the
242 /// mapped external address — the mesh advertises itself
243 /// as `Open` to peers without a classifier round-trip.
244 /// 4. Renews every 30 min; revokes on shutdown; falls back
245 /// to the classifier on three consecutive renewal
246 /// failures.
247 ///
248 /// **Optimization, not correctness.** A node whose router
249 /// doesn't speak UPnP / NAT-PMP still reaches every peer
250 /// through the routed-handshake path. Off by default
251 /// because port mapping modifies state on the operator's
252 /// router.
253 ///
254 /// Requires the `port-mapping` cargo feature. The flag is
255 /// silently ignored when the feature is off.
256 #[cfg(feature = "port-mapping")]
257 pub fn try_port_mapping(mut self, enabled: bool) -> Self {
258 self.try_port_mapping = enabled;
259 self
260 }
261
262 /// Build the mesh node.
263 pub async fn build(self) -> Result<Mesh> {
264 // Use the caller's identity if one was set, otherwise mint an
265 // ephemeral one. `MeshNode::new` takes the keypair by value,
266 // so clone it out of the Arc when we have a shared identity.
267 let (keypair, sdk_identity) = match self.identity {
268 Some(id) => (id.keypair().as_ref().clone(), Some(id)),
269 None => (EntityKeypair::generate(), None),
270 };
271
272 let mut config = MeshNodeConfig::new(self.bind_addr, self.psk)
273 .with_heartbeat_interval(self.heartbeat_interval)
274 .with_session_timeout(self.session_timeout)
275 .with_num_shards(self.num_shards)
276 .with_handshake(3, Duration::from_secs(5));
277 if let Some(id) = self.subnet {
278 config = config.with_subnet(id);
279 }
280 if let Some(policy) = self.subnet_policy {
281 config = config.with_subnet_policy(policy);
282 }
283 #[cfg(feature = "nat-traversal")]
284 if let Some(external) = self.reflex_override {
285 config = config.with_reflex_override(external);
286 }
287 #[cfg(feature = "port-mapping")]
288 if self.try_port_mapping {
289 config = config.with_try_port_mapping(true);
290 }
291
292 let mut node = MeshNode::new(keypair, config).await?;
293 // Install a shared ChannelConfigRegistry so `register_channel`
294 // can add entries without needing `&mut Mesh` — the registry
295 // itself uses interior mutability (DashMap).
296 let channel_configs = Arc::new(ChannelConfigRegistry::new());
297 node.set_channel_configs(channel_configs.clone());
298 // Hand the caller's TokenCache to the mesh so channel auth
299 // (`require_token` / `can_subscribe` / `can_publish`) has a
300 // cache to consult + install incoming tokens into. Without
301 // an identity, no cache is installed and `require_token`
302 // channels will reject.
303 if let Some(id) = sdk_identity.as_ref() {
304 node.set_token_cache(id.token_cache().clone());
305 }
306 Ok(Mesh {
307 node: Arc::new(node),
308 channel_configs,
309 identity: sdk_identity,
310 #[cfg(feature = "tool")]
311 tool_metadata_fetch: Arc::new(parking_lot::Mutex::new(None)),
312 })
313 }
314}
315
316/// A multi-peer mesh node.
317///
318/// Manages encrypted connections to multiple peers over a single UDP
319/// socket. Supports direct peer-to-peer sends, routed multi-hop sends,
320/// automatic failure detection, and rerouting.
321pub struct Mesh {
322 /// Shared `MeshNode`. `Arc` rather than by-value so NAPI /
323 /// FFI bindings can hand the same live node to multiple
324 /// wrappers (e.g. a `DaemonRuntime` alongside the existing
325 /// `NetMesh` class) without double-owning the underlying
326 /// socket. All public methods go through `.inner()` (Arc
327 /// deref), so holding the `Arc` changes no existing call
328 /// sites.
329 node: Arc<MeshNode>,
330 /// Channel config registry shared with the underlying `MeshNode`
331 /// so `register_channel` / subscriber ACL checks operate on the
332 /// same live data.
333 channel_configs: Arc<ChannelConfigRegistry>,
334 /// Held onto so the caller's `TokenCache` (and future capability
335 /// announcement state) stays alive for the mesh's lifetime —
336 /// `MeshNode` was already handed a clone of the keypair, so this
337 /// is purely for the auxiliary state that rides alongside.
338 identity: Option<crate::identity::Identity>,
339 /// Lazy auto-install state for the `tool.metadata.fetch` nRPC
340 /// service. The first `Mesh::serve_tool` call locks this
341 /// mutex, sees `None`, installs the handler, and stores
342 /// `Some(handle)`. Subsequent `serve_tool` calls see `Some(_)`
343 /// and skip the install. The handle lives for the lifetime of
344 /// the `Mesh`; the service stays registered even after every
345 /// individual tool ServeHandle is dropped (low cost — the
346 /// handler just answers `NotFound` for every name once the
347 /// registry is empty again).
348 ///
349 /// `pub(crate)` so the SDK's `tool` module — which lives in a
350 /// separate file but the same crate — can reach this slot
351 /// without an accessor stub.
352 #[cfg(feature = "tool")]
353 pub(crate) tool_metadata_fetch: Arc<parking_lot::Mutex<Option<crate::mesh_rpc::ServeHandle>>>,
354}
355
356impl Mesh {
357 /// Create a builder.
358 pub fn builder(bind_addr: &str, psk: &[u8; 32]) -> Result<MeshBuilder> {
359 MeshBuilder::new(bind_addr, psk)
360 }
361
362 /// Get this node's Noise public key.
363 ///
364 /// Share this with peers so they can connect to this node.
365 pub fn public_key(&self) -> &[u8; 32] {
366 self.node.public_key()
367 }
368
369 /// Get this node's ID (derived from ed25519 identity).
370 pub fn node_id(&self) -> u64 {
371 self.node.node_id()
372 }
373
374 /// Get the local bind address.
375 pub fn local_addr(&self) -> SocketAddr {
376 self.node.local_addr()
377 }
378
379 /// Install (or clear with `None`) the caller-side nRPC
380 /// observer for this `Mesh`. Fires on every `call_typed`
381 /// completion (success / server error / timeout / transport
382 /// error) with a typed [`crate::mesh_rpc::RpcCallEvent`].
383 /// See `DECK_DEMO_HARNESS_PLAN.md` Missing Item D for the
384 /// design rationale.
385 ///
386 /// Replaces any previously-installed observer.
387 /// Observers run inline on the dispatch task; implementations
388 /// must be cheap (push into a bounded ring / mpsc, not
389 /// block).
390 ///
391 /// v1 fires only `RpcDirection::Outbound`; server-side
392 /// (inbound) firing is a follow-up.
393 #[cfg(feature = "cortex")]
394 pub fn set_rpc_observer(&self, observer: Option<crate::mesh_rpc::RpcObserverHandle>) {
395 self.node.set_rpc_observer(observer);
396 }
397
398 /// Crate-internal accessor for the underlying `MeshNode`.
399 /// Used by `mesh_rpc` to delegate the typed RPC API; not
400 /// intended for downstream consumers (the public surface
401 /// stays on `Mesh` itself). Gated on the same feature
402 /// combination as its sole consumer (`mesh_rpc` /
403 /// `mesh_rpc_resilience`) so feature combinations that
404 /// exclude either don't trip dead-code lints.
405 #[cfg(all(feature = "net", feature = "cortex"))]
406 pub(crate) fn node(&self) -> &Arc<MeshNode> {
407 &self.node
408 }
409
410 /// Crate-internal accessor for the SDK's
411 /// `ChannelConfigRegistry`. Used by `mesh_rpc` to
412 /// auto-register the request channel + reply prefix on
413 /// `serve_rpc`.
414 #[cfg(all(feature = "net", feature = "cortex"))]
415 pub(crate) fn channel_configs_arc(&self) -> &Arc<ChannelConfigRegistry> {
416 &self.channel_configs
417 }
418
419 /// Connect to a peer as initiator.
420 ///
421 /// The peer must be listening (call `accept()` on their side).
422 /// `peer_pubkey` is the peer's Noise public key from `public_key()`.
423 pub async fn connect(
424 &self,
425 peer_addr: &str,
426 peer_pubkey: &[u8; 32],
427 peer_node_id: u64,
428 ) -> Result<()> {
429 let addr: SocketAddr = peer_addr
430 .parse()
431 .map_err(|e| SdkError::Config(format!("invalid peer address: {}", e)))?;
432 self.node.connect(addr, peer_pubkey, peer_node_id).await?;
433 Ok(())
434 }
435
436 /// Accept an incoming connection as responder.
437 ///
438 /// Waits for a peer to initiate a Noise handshake.
439 /// Returns the peer's address.
440 pub async fn accept(&self, peer_node_id: u64) -> Result<SocketAddr> {
441 let (addr, _) = self.node.accept(peer_node_id).await?;
442 Ok(addr)
443 }
444
445 /// Connect to a peer when the responder is already
446 /// `start()`ed and hasn't pre-`accept()`'d this initiator's
447 /// `node_id` — the standard "remote-attach against a running
448 /// daemon" case. Mirror of [`Self::connect`] for that
449 /// scenario; the local side must also have `start()` called
450 /// before this method (the dispatch loop is what completes
451 /// the handshake).
452 ///
453 /// `relay_addr` is the wire address to send msg1 to. The
454 /// degenerate single-hop case (relay == final destination)
455 /// is the CLI remote-attach pattern; the multi-hop case
456 /// (relay forwards to dest) is the same call signature.
457 /// Either way the destination's running dispatch loop
458 /// receives msg1 via the routed-handshake protocol and
459 /// replies with msg2.
460 ///
461 /// # Why a separate method from `connect`?
462 ///
463 /// `connect` uses the direct-handshake protocol, where the
464 /// responder must pre-register the initiator's `node_id`
465 /// via `accept()` before its `start()`. `connect_via` uses
466 /// the routed-handshake protocol — the initiator's full
467 /// `node_id` rides inside the Noise msg1 payload, so the
468 /// responder learns it on demand. No pre-`accept` needed.
469 pub async fn connect_via(
470 &self,
471 relay_addr: &str,
472 peer_pubkey: &[u8; 32],
473 peer_node_id: u64,
474 ) -> Result<()> {
475 let addr: SocketAddr = relay_addr
476 .parse()
477 .map_err(|e| SdkError::Config(format!("invalid relay address: {}", e)))?;
478 self.node
479 .connect_via(addr, peer_pubkey, peer_node_id)
480 .await?;
481 Ok(())
482 }
483
484 /// Start the receive loop, heartbeat sender, and router.
485 ///
486 /// Call this after connecting to peers. Events won't be received
487 /// until `start()` is called.
488 pub fn start(&self) {
489 // `start_arc` (vs bare `start`) enables the periodic capability
490 // re-announce, keeping this node's entry alive in its own and
491 // peers' folds past one announcement TTL.
492 self.node.start_arc();
493 }
494
495 /// Number of connected peers.
496 pub fn peer_count(&self) -> usize {
497 self.node.peer_count()
498 }
499
500 // ---- Sending ----
501
502 /// Send a serializable event to a direct peer.
503 pub async fn send_to(&self, peer_addr: &str, event: &impl Serialize) -> Result<()> {
504 let addr: SocketAddr = peer_addr
505 .parse()
506 .map_err(|e| SdkError::Config(format!("invalid address: {}", e)))?;
507 let json = serde_json::to_vec(event)?;
508 let batch = net::event::Batch {
509 shard_id: 0,
510 events: vec![net::event::InternalEvent::new(Bytes::from(json), 0, 0)],
511 sequence_start: 0,
512 process_nonce: net::event::batch_process_nonce(),
513 };
514 self.node.send_to_peer(addr, &batch).await?;
515 Ok(())
516 }
517
518 /// Send a serializable event via the routing table.
519 ///
520 /// The event is encrypted for the destination and forwarded through
521 /// intermediate nodes if needed. Requires a route to `dest_node_id`
522 /// in the routing table and a session with the destination.
523 pub async fn send(&self, dest_node_id: u64, event: &impl Serialize) -> Result<()> {
524 let json = serde_json::to_vec(event)?;
525 let batch = net::event::Batch {
526 shard_id: 0,
527 events: vec![net::event::InternalEvent::new(Bytes::from(json), 0, 0)],
528 sequence_start: 0,
529 process_nonce: net::event::batch_process_nonce(),
530 };
531 self.node.send_routed(dest_node_id, &batch).await?;
532 Ok(())
533 }
534
535 /// Send raw bytes to a direct peer.
536 pub async fn send_raw_to(&self, peer_addr: &str, data: &[u8]) -> Result<()> {
537 let addr: SocketAddr = peer_addr
538 .parse()
539 .map_err(|e| SdkError::Config(format!("invalid address: {}", e)))?;
540 let batch = net::event::Batch {
541 shard_id: 0,
542 events: vec![net::event::InternalEvent::new(
543 Bytes::copy_from_slice(data),
544 0,
545 0,
546 )],
547 sequence_start: 0,
548 process_nonce: net::event::batch_process_nonce(),
549 };
550 self.node.send_to_peer(addr, &batch).await?;
551 Ok(())
552 }
553
554 // ---- Receiving ----
555
556 /// Poll for received events.
557 ///
558 /// Returns up to `limit` events from all shards.
559 pub async fn recv(&self, limit: usize) -> Result<Vec<StoredEvent>> {
560 // Poll shard 0 (most events land here for single-stream sends)
561 let result = self.node.poll_shard(0, None, limit).await?;
562 Ok(result.events)
563 }
564
565 /// Poll a specific shard for events.
566 pub async fn recv_shard(&self, shard_id: u16, limit: usize) -> Result<Vec<StoredEvent>> {
567 let result = self.node.poll_shard(shard_id, None, limit).await?;
568 Ok(result.events)
569 }
570
571 // ---- Channels (distributed pub/sub) ----
572
573 /// Register a channel on this publisher. Subscribers who ask to
574 /// join are validated against `config` before being added to the
575 /// subscriber roster.
576 ///
577 /// `config.channel_id` must be built from the same canonical name
578 /// subscribers pass to `subscribe_channel`. The registry keys on
579 /// the canonical name (not the u16 hash) to avoid ACL bypass via
580 /// hash collision.
581 ///
582 /// Idempotent: re-registering the same channel replaces the prior
583 /// config.
584 pub fn register_channel(&self, config: ChannelConfig) {
585 self.channel_configs.insert(config);
586 }
587
588 /// Ask `publisher_node_id` to add this node to `channel`'s
589 /// subscriber set. Blocks until the publisher's `Ack` arrives or
590 /// the mesh's membership-ack timeout elapses.
591 ///
592 /// Returns `Ok(())` on acceptance; rejection (unauthorized /
593 /// unknown channel / rate-limited / too-many-channels) surfaces
594 /// as `SdkError::ChannelRejected(reason)`. Network-level failures
595 /// surface as `SdkError::Adapter(...)`.
596 ///
597 /// This bare form presents no credential. On a **token-gated**
598 /// channel it is always rejected — the publisher requires a token
599 /// chain on *every* subscribe and does not honor a credential
600 /// presented on a previous subscribe (e.g. before a reconnect or
601 /// roster eviction). Re-subscribe with
602 /// [`Self::subscribe_channel_with`] carrying the token each time.
603 pub async fn subscribe_channel(
604 &self,
605 publisher_node_id: u64,
606 channel: &ChannelName,
607 ) -> Result<()> {
608 self.subscribe_channel_with(publisher_node_id, channel, SubscribeOptions::default())
609 .await
610 }
611
612 /// Subscribe with options — optionally presenting a
613 /// [`PermissionToken`](crate::identity::PermissionToken).
614 ///
615 /// Use this when the publisher registered the channel with
616 /// `token_roots` (token enforcement) and/or a `subscribe_caps`
617 /// filter that your node's capabilities alone don't satisfy. The
618 /// publisher verifies the presented token chain on arrival — it
619 /// must root at one of the channel's `token_roots`, bind at its
620 /// leaf to the subscribing peer's `EntityId`, and authorize
621 /// `SUBSCRIBE` at every link — then retains it to re-check expiry
622 /// and revocation while the subscription lives.
623 ///
624 /// The credential must be presented on **every** subscribe: a
625 /// previously-accepted chain is not reused for a later bare
626 /// [`Self::subscribe_channel`], so after a reconnect or roster
627 /// eviction you must call this again with the token.
628 pub async fn subscribe_channel_with(
629 &self,
630 publisher_node_id: u64,
631 channel: &ChannelName,
632 opts: SubscribeOptions,
633 ) -> Result<()> {
634 let result = match opts.token {
635 Some(token) => {
636 self.node
637 .subscribe_channel_with_token(publisher_node_id, channel.clone(), token)
638 .await
639 }
640 None => {
641 self.node
642 .subscribe_channel(publisher_node_id, channel.clone())
643 .await
644 }
645 };
646 match result {
647 Ok(()) => Ok(()),
648 Err(e) => Err(adapter_to_channel_error(e)),
649 }
650 }
651
652 /// Mirror of [`Self::subscribe_channel`]. Idempotent on the
653 /// publisher side — unsubscribing a non-subscriber still returns
654 /// `Ok(())`.
655 pub async fn unsubscribe_channel(
656 &self,
657 publisher_node_id: u64,
658 channel: &ChannelName,
659 ) -> Result<()> {
660 match self
661 .node
662 .unsubscribe_channel(publisher_node_id, channel.clone())
663 .await
664 {
665 Ok(()) => Ok(()),
666 Err(e) => Err(adapter_to_channel_error(e)),
667 }
668 }
669
670 /// Publish one payload to every subscriber of `channel`.
671 /// `config.on_failure` controls whether per-peer errors
672 /// short-circuit the fan-out. Returns a [`PublishReport`]
673 /// describing per-peer outcomes.
674 pub async fn publish(
675 &self,
676 channel: &ChannelName,
677 payload: Bytes,
678 config: PublishConfig,
679 ) -> Result<PublishReport> {
680 let publisher = ChannelPublisher::new(channel.clone(), config);
681 Ok(self.node.publish(&publisher, payload).await?)
682 }
683
684 /// Fan multiple payloads to every subscriber of `channel` as one
685 /// batch per subscriber. Semantics match [`Self::publish`].
686 pub async fn publish_many(
687 &self,
688 channel: &ChannelName,
689 payloads: &[Bytes],
690 config: PublishConfig,
691 ) -> Result<PublishReport> {
692 let publisher = ChannelPublisher::new(channel.clone(), config);
693 Ok(self.node.publish_many(&publisher, payloads).await?)
694 }
695
696 // ---- Routing ----
697
698 /// Add a route to a destination node.
699 ///
700 /// Packets sent to `dest_node_id` via `send()` will be forwarded
701 /// through `next_hop_addr`.
702 pub fn add_route(&self, dest_node_id: u64, next_hop_addr: &str) -> Result<()> {
703 let addr: SocketAddr = next_hop_addr
704 .parse()
705 .map_err(|e| SdkError::Config(format!("invalid address: {}", e)))?;
706 self.node.router().add_route(dest_node_id, addr);
707 Ok(())
708 }
709
710 /// Remove a route.
711 pub fn remove_route(&self, dest_node_id: u64) {
712 self.node.router().remove_route(dest_node_id);
713 }
714
715 // ---- Mesh topology ----
716
717 /// Block a peer (simulate network partition).
718 pub fn block_peer(&self, peer_addr: &str) -> Result<()> {
719 let addr: SocketAddr = peer_addr
720 .parse()
721 .map_err(|e| SdkError::Config(format!("invalid address: {}", e)))?;
722 self.node.block_peer(addr);
723 Ok(())
724 }
725
726 /// Unblock a peer.
727 pub fn unblock_peer(&self, peer_addr: &str) -> Result<()> {
728 let addr: SocketAddr = peer_addr
729 .parse()
730 .map_err(|e| SdkError::Config(format!("invalid address: {}", e)))?;
731 self.node.unblock_peer(&addr);
732 Ok(())
733 }
734
735 /// Number of nodes discovered via pingwave propagation.
736 pub fn discovered_nodes(&self) -> usize {
737 self.node.proximity_graph().node_count()
738 }
739
740 /// Number of active reroutes (routes using alternates after failure).
741 pub fn active_reroutes(&self) -> usize {
742 self.node.reroute_policy().active_reroutes()
743 }
744
745 // ---- Streams ----
746
747 /// Open (or look up) a logical stream to a peer. See
748 /// [`net::adapter::net::MeshNode::open_stream`] for the full contract.
749 /// Repeated calls for the same `(peer, stream_id)` are idempotent;
750 /// the first open wins and subsequent configs are logged and
751 /// ignored.
752 pub fn open_stream(
753 &self,
754 peer_node_id: u64,
755 stream_id: u64,
756 config: StreamConfig,
757 ) -> Result<Stream> {
758 self.node
759 .open_stream(peer_node_id, stream_id, config)
760 .map_err(SdkError::from)
761 }
762
763 /// Close a stream: drop its `StreamState` and free the window. Idempotent.
764 pub fn close_stream(&self, peer_node_id: u64, stream_id: u64) {
765 self.node.close_stream(peer_node_id, stream_id);
766 }
767
768 /// Send a batch of events on an explicit stream.
769 ///
770 /// Returns [`SdkError::Backpressure`] when the stream's per-stream
771 /// in-flight window is full (no events were sent — the caller
772 /// decides whether to drop, retry, or buffer). [`SdkError::NotConnected`]
773 /// when the peer session is gone. All other failures surface as
774 /// [`SdkError::Adapter`].
775 pub async fn send_on_stream(&self, stream: &Stream, events: &[Bytes]) -> Result<()> {
776 self.node
777 .send_on_stream(stream, events)
778 .await
779 .map_err(SdkError::from)
780 }
781
782 /// Send events, retrying on `Backpressure` with exponential backoff
783 /// (5 ms → 200 ms, doubling) up to `max_retries` times. Transport
784 /// errors and `NotConnected` are returned immediately.
785 pub async fn send_with_retry(
786 &self,
787 stream: &Stream,
788 events: &[Bytes],
789 max_retries: usize,
790 ) -> Result<()> {
791 self.node
792 .send_with_retry(stream, events, max_retries)
793 .await
794 .map_err(SdkError::from)
795 }
796
797 /// Block the calling task until the send succeeds or a transport
798 /// error occurs. See [`Mesh::send_with_retry`] for finer control.
799 pub async fn send_blocking(&self, stream: &Stream, events: &[Bytes]) -> Result<()> {
800 self.node
801 .send_blocking(stream, events)
802 .await
803 .map_err(SdkError::from)
804 }
805
806 /// Snapshot of per-stream stats (tx/rx seq, window, in-flight,
807 /// backpressure count, activity).
808 pub fn stream_stats(&self, peer_node_id: u64, stream_id: u64) -> Option<StreamStats> {
809 self.node.stream_stats(peer_node_id, stream_id)
810 }
811
812 /// Snapshot stats for every stream in the session to `peer_node_id`.
813 pub fn all_stream_stats(&self, peer_node_id: u64) -> Vec<(u64, StreamStats)> {
814 self.node.all_stream_stats(peer_node_id)
815 }
816
817 // ---- Capability announcements ----
818
819 /// Announce this node's capabilities to every directly-connected
820 /// peer. Self-indexes too, so `find_nodes` called from this same
821 /// node matches on the announcement. Multi-hop propagation is
822 /// deferred — peers more than one hop away will not see the
823 /// announcement.
824 ///
825 /// Default TTL is 5 minutes; use
826 /// [`Self::announce_capabilities_with`] to override.
827 pub async fn announce_capabilities(
828 &self,
829 caps: crate::capabilities::CapabilitySet,
830 ) -> Result<()> {
831 self.node.announce_capabilities(caps).await?;
832 Ok(())
833 }
834
835 /// Extended announce with explicit TTL and signing opt-in.
836 /// `sign = true` is accepted but currently a no-op; signatures
837 /// tie in with Stage E (channel auth), once `node_id` →
838 /// `EntityId` binding is wired.
839 pub async fn announce_capabilities_with(
840 &self,
841 caps: crate::capabilities::CapabilitySet,
842 ttl: std::time::Duration,
843 sign: bool,
844 ) -> Result<()> {
845 self.node
846 .announce_capabilities_with(caps, ttl, sign)
847 .await?;
848 Ok(())
849 }
850
851 /// Query the capability index. Returns node ids whose latest
852 /// announcement matches `filter`; includes our own `node_id` if
853 /// our own announcement matches.
854 pub fn find_nodes(&self, filter: &crate::capabilities::CapabilityFilter) -> Vec<u64> {
855 self.node.find_nodes_by_filter(filter)
856 }
857
858 /// Scoped variant of [`Self::find_nodes`]. Filters candidates
859 /// through a [`crate::capabilities::ScopeFilter`] derived from
860 /// each node's `scope:*` reserved tags. Untagged nodes resolve
861 /// to `Global` and remain visible under most filters by design;
862 /// nodes tagged `scope:subnet-local` only show up under
863 /// [`crate::capabilities::ScopeFilter::SameSubnet`]. See
864 /// `docs/SCOPED_CAPABILITIES_PLAN.md` for the full table.
865 pub fn find_nodes_scoped(
866 &self,
867 filter: &crate::capabilities::CapabilityFilter,
868 scope: &crate::capabilities::ScopeFilter<'_>,
869 ) -> Vec<u64> {
870 self.node.find_nodes_by_filter_scoped(filter, scope)
871 }
872
873 /// Pick the single best-scoring node for a placement
874 /// requirement. Returns the winning node's id, or `None` if no
875 /// node matches.
876 pub fn find_best_node(&self, req: &crate::capabilities::CapabilityRequirement) -> Option<u64> {
877 self.node.find_best_node(req)
878 }
879
880 /// Scoped variant of [`Self::find_best_node`]. Picks the highest-
881 /// scoring node within the scope-filtered candidate set.
882 pub fn find_best_node_scoped(
883 &self,
884 req: &crate::capabilities::CapabilityRequirement,
885 scope: &crate::capabilities::ScopeFilter<'_>,
886 ) -> Option<u64> {
887 self.node.find_best_node_scoped(req, scope)
888 }
889
890 /// Bucketed aggregation over the local capability fold. Composes
891 /// [`TagMatcher`](crate::capabilities::TagMatcher) ×
892 /// [`GroupBy`](crate::capabilities::GroupBy) ×
893 /// [`Aggregation`](crate::capabilities::Aggregation) into a
894 /// `Vec<(bucket, value)>` sorted lex by bucket key. Phase 6c-A
895 /// of `MULTIFOLD_PHASE_6C_CAPACITY_AGGREGATION.md`.
896 ///
897 /// `matcher = None` walks every entry. Returns an empty vec when
898 /// no entries match.
899 pub fn capability_aggregate(
900 &self,
901 matcher: Option<crate::capabilities::TagMatcher>,
902 group_by: crate::capabilities::GroupBy,
903 agg: crate::capabilities::Aggregation,
904 ) -> Vec<(String, u64)> {
905 self.node
906 .capability_fold()
907 .aggregate(matcher, group_by, agg)
908 }
909
910 /// Capacity-ranked materialized view. Wraps
911 /// [`Self::capability_aggregate`] with per-bucket state
912 /// breakdown (`idle` / `busy` / `reserved`), an RTT gate, and
913 /// optional summed numeric capacity. Phase 6c-B of
914 /// `MULTIFOLD_PHASE_6C_CAPACITY_AGGREGATION.md`.
915 ///
916 /// `rtt_lookup` maps a publisher's `node_id` to current RTT in
917 /// milliseconds. When `query.max_rtt_ms` is `None`, the closure
918 /// is never invoked; when set, publishers whose lookup returns
919 /// `None` are dropped (fail-closed — never-pinged nodes don't
920 /// ride a "fastest available" filter as zero).
921 ///
922 /// Faulty entries are always excluded. Rows return sorted by
923 /// `available` descending; ties broken by bucket key ascending.
924 /// Truncated to `query.limit` (0 = no truncation).
925 ///
926 /// # Example
927 ///
928 /// ```
929 /// # async fn doc() -> net_sdk::error::Result<()> {
930 /// use net_sdk::capabilities::{
931 /// CapabilitySet, CapacityQuery, GroupBy, TagMatcher,
932 /// };
933 /// use net_sdk::mesh::MeshBuilder;
934 ///
935 /// let node = MeshBuilder::new("127.0.0.1:0", &[0x42u8; 32])?
936 /// .build()
937 /// .await?;
938 /// node.announce_capabilities(
939 /// CapabilitySet::new()
940 /// .add_tag("hardware.gpu")
941 /// .add_tag("hardware.gpu.h100")
942 /// .add_tag("hardware.gpu.count=8"),
943 /// )
944 /// .await?;
945 ///
946 /// // Top GPU types by available capacity, no RTT filter,
947 /// // summed count column populated.
948 /// let view = node.capability_capacity_ranking(
949 /// CapacityQuery {
950 /// matcher: Some(TagMatcher::Prefix { value: "hardware.gpu".into() }),
951 /// group_by: GroupBy::TagStem { prefix: "hardware.gpu".into() },
952 /// max_rtt_ms: None,
953 /// sum_axis_key: Some("hardware.gpu.count".into()),
954 /// limit: 5,
955 /// },
956 /// |_node_id| None,
957 /// );
958 /// // Self-match: one bucket per stem this node carries.
959 /// assert!(view.iter().any(|row| row.bucket == "h100"));
960 /// # Ok(())
961 /// # }
962 /// ```
963 pub fn capability_capacity_ranking<R>(
964 &self,
965 query: crate::capabilities::CapacityQuery,
966 rtt_lookup: R,
967 ) -> Vec<crate::capabilities::CapacityRow>
968 where
969 R: Fn(u64) -> Option<u32>,
970 {
971 self.node
972 .capability_fold()
973 .capacity_ranking(query, rtt_lookup)
974 }
975
976 // ---- Lifecycle ----
977
978 /// Set a migration handler (for Mikoshi daemon migration).
979 pub fn set_migration_handler(&mut self, handler: Arc<MigrationSubprotocolHandler>) {
980 self.node.set_migration_handler(handler);
981 }
982
983 /// Gracefully shut down.
984 pub async fn shutdown(self) -> Result<()> {
985 self.node.shutdown().await?;
986 Ok(())
987 }
988
989 /// Get a reference to the underlying `MeshNode`.
990 pub fn inner(&self) -> &MeshNode {
991 &self.node
992 }
993
994 /// Clone the `Arc`-shared `MeshNode` handle out of the mesh.
995 ///
996 /// Used by FFI bindings (currently: NAPI) that need to hand
997 /// the same live node to the `net-sdk::compute::DaemonRuntime`
998 /// **and** to their own wrapper class without constructing a
999 /// second UDP socket. All public `MeshNode` operations go
1000 /// through `&MeshNode`, so two Arc holders observe exactly
1001 /// the same state.
1002 pub fn node_arc(&self) -> Arc<MeshNode> {
1003 self.node.clone()
1004 }
1005
1006 /// Construct a `Mesh` that shares an existing `MeshNode` with
1007 /// another owner. Used by FFI bindings that already hold an
1008 /// `Arc<MeshNode>` (e.g. NAPI's `NetMesh`) and need a `Mesh`
1009 /// wrapper so the SDK's `DaemonRuntime` can be built against
1010 /// the same live node.
1011 ///
1012 /// Does not re-install `channel_configs` or a `TokenCache` —
1013 /// the owner of the original `MeshNode` is responsible for
1014 /// that wiring. Supplied `channel_configs` / `identity`
1015 /// arguments are held onto here purely so the `Mesh`'s own
1016 /// helpers (channel registration lookup, identity getter)
1017 /// have data to return.
1018 pub fn from_node_arc(
1019 node: Arc<MeshNode>,
1020 channel_configs: Arc<ChannelConfigRegistry>,
1021 identity: Option<crate::identity::Identity>,
1022 ) -> Self {
1023 Self {
1024 node,
1025 channel_configs,
1026 identity,
1027 #[cfg(feature = "tool")]
1028 tool_metadata_fetch: Arc::new(parking_lot::Mutex::new(None)),
1029 }
1030 }
1031
1032 /// Caller-owned identity bound to this mesh, if any. Returns
1033 /// `None` for meshes built without `.identity(...)` (ephemeral
1034 /// keypair).
1035 pub fn identity(&self) -> Option<&crate::identity::Identity> {
1036 self.identity.as_ref()
1037 }
1038
1039 // ── NAT traversal ──────────────────────────────────────────
1040 //
1041 // Framing (load-bearing — see `docs/NAT_TRAVERSAL_PLAN.md`
1042 // stage 5): every user-visible docstring here must position
1043 // NAT traversal as **optimization, not correctness**. Nodes
1044 // behind NAT can always talk through the mesh's routed-
1045 // handshake path. These APIs let the mesh upgrade to a
1046 // *direct* path when the underlying NATs allow it, cutting
1047 // relay hops out of the data plane. A `nat_type` of
1048 // `symmetric` or a `PunchFailed` error is not a
1049 // connectivity failure — it just means traffic keeps
1050 // riding the relay.
1051 //
1052 // Anti-phrasings to avoid: "required for NATed peers",
1053 // "enables cross-NAT connectivity", "fixes NAT issues."
1054 // Each of these implies the mesh otherwise can't reach
1055 // NATed peers, which is false.
1056
1057 /// Current NAT classification for this mesh's public face,
1058 /// as observed against other peers during the classification
1059 /// sweep. One of `Open`, `Cone`, `Symmetric`, or `Unknown`
1060 /// (pre-sweep or insufficient data).
1061 ///
1062 /// **Optimization, not correctness.** A `Symmetric`
1063 /// classification doesn't prevent this mesh from
1064 /// communicating with any peer — it just means the direct-
1065 /// punch optimization is unlikely to succeed against some
1066 /// peers, and traffic will keep riding the routed path.
1067 ///
1068 /// Requires the `nat-traversal` cargo feature.
1069 #[cfg(feature = "nat-traversal")]
1070 pub fn nat_type(&self) -> net::adapter::net::traversal::classify::NatClass {
1071 self.node.nat_class()
1072 }
1073
1074 /// This mesh's public-facing `SocketAddr` as observed by a
1075 /// remote peer, or `None` before the first classification
1076 /// sweep has produced an observation.
1077 ///
1078 /// Piggybacks on outbound `CapabilityAnnouncement`s so peers
1079 /// can attempt a direct-connect without a separate
1080 /// discovery round-trip. Read by peers implementing the
1081 /// `connect_direct` rendezvous path.
1082 ///
1083 /// Requires the `nat-traversal` cargo feature.
1084 #[cfg(feature = "nat-traversal")]
1085 pub fn reflex_addr(&self) -> Option<SocketAddr> {
1086 self.node.reflex_addr()
1087 }
1088
1089 /// The NAT classification most recently advertised by
1090 /// `peer_node_id` (parsed from the `nat:*` tag on their
1091 /// capability announcement). Returns `NatClass::Unknown`
1092 /// when the peer hasn't announced or was compiled without
1093 /// NAT traversal — the pair-type matrix treats Unknown as
1094 /// "attempt direct, fall back on failure," not as
1095 /// "don't attempt."
1096 ///
1097 /// Requires the `nat-traversal` cargo feature.
1098 #[cfg(feature = "nat-traversal")]
1099 pub fn peer_nat_type(
1100 &self,
1101 peer_node_id: u64,
1102 ) -> net::adapter::net::traversal::classify::NatClass {
1103 self.node.peer_nat_class(peer_node_id)
1104 }
1105
1106 /// Send one reflex probe to `peer_node_id` and return the
1107 /// public `SocketAddr` the peer observed on the probe's UDP
1108 /// envelope. Useful for tests and for operators diagnosing a
1109 /// NAT-type classification that seems off.
1110 ///
1111 /// Times out after `TraversalConfig::reflex_timeout` (3 s
1112 /// default) on network delays, and fast-fails with
1113 /// `peer-not-reachable` on an unknown peer.
1114 ///
1115 /// Requires the `nat-traversal` cargo feature.
1116 #[cfg(feature = "nat-traversal")]
1117 pub async fn probe_reflex(&self, peer_node_id: u64) -> Result<SocketAddr> {
1118 Ok(self.node.probe_reflex(peer_node_id).await?)
1119 }
1120
1121 /// Explicitly re-run the NAT classification sweep against
1122 /// this node's currently-connected peers. Normally the
1123 /// background loop (spawned by `start()`) takes care of
1124 /// this; call this after a suspected NAT rebind (e.g. a
1125 /// gateway reboot) to accelerate the re-classification.
1126 ///
1127 /// No-op when fewer than 2 peers are connected — the
1128 /// two-probe rule needs two distinct targets to produce a
1129 /// classification. Never returns an error; a failed sweep
1130 /// leaves the previous classification intact.
1131 ///
1132 /// Requires the `nat-traversal` cargo feature.
1133 #[cfg(feature = "nat-traversal")]
1134 pub async fn reclassify_nat(&self) {
1135 self.node.reclassify_nat().await
1136 }
1137
1138 /// Establish a session to `peer_node_id` via the rendezvous
1139 /// path, using the pair-type matrix to decide between a
1140 /// direct handshake and a relay-coordinated punch. The
1141 /// returned session is equivalent in correctness to
1142 /// `connect()` — the *only* difference is that a
1143 /// `connect_direct` that lands on the punched path cuts
1144 /// relay hops out of the data plane.
1145 ///
1146 /// **Optimization, not correctness.** `connect_direct`
1147 /// always resolves: on a punch-failed outcome, the session
1148 /// is established via the routed-handshake fallback.
1149 /// Inspect `traversal_stats()` afterward to distinguish a
1150 /// successful punch from a relay fallback.
1151 ///
1152 /// `coordinator` names a peer we already have a session
1153 /// with — typically a stable relay-capable node. The
1154 /// coordinator mediates the introduction; it doesn't carry
1155 /// user-plane traffic once the punch succeeds.
1156 ///
1157 /// Fails with an `SdkError::Traversal` variant whose `kind`
1158 /// is `peer-not-reachable` (no cached reflex for `peer`),
1159 /// `transport` (socket-level error on the final handshake),
1160 /// or (internal, retried on fallback) `punch-failed`.
1161 ///
1162 /// Requires the `nat-traversal` cargo feature.
1163 #[cfg(feature = "nat-traversal")]
1164 pub async fn connect_direct(
1165 &self,
1166 peer_node_id: u64,
1167 peer_pubkey: &[u8; 32],
1168 coordinator: u64,
1169 ) -> Result<()> {
1170 self.node
1171 .connect_direct(peer_node_id, peer_pubkey, coordinator)
1172 .await?;
1173 Ok(())
1174 }
1175
1176 /// Cumulative counters for this mesh's NAT-traversal
1177 /// activity: punch attempts, successful punches, and relay
1178 /// fallbacks. Monotonic — counters never reset. Useful for
1179 /// diagnostics + telemetry (success rate, relay load
1180 /// trends).
1181 ///
1182 /// Requires the `nat-traversal` cargo feature.
1183 #[cfg(feature = "nat-traversal")]
1184 pub fn traversal_stats(&self) -> net::adapter::net::traversal::TraversalStatsSnapshot {
1185 self.node.traversal_stats()
1186 }
1187
1188 /// Install a runtime reflex override. Forces `nat_type() =
1189 /// "open"` and `reflex_addr() = Some(external)` immediately,
1190 /// short-circuiting any further classifier sweeps.
1191 ///
1192 /// Intended for operator-driven updates — a port-forward
1193 /// that went live mid-session, or a stage-4 port-mapping
1194 /// task that just installed a UPnP / NAT-PMP mapping.
1195 /// Builder-level [`MeshBuilder::reflex_override`] covers the
1196 /// startup-time case; this is the runtime equivalent.
1197 ///
1198 /// **Optimization, not correctness.** Nodes without an
1199 /// override still reach every peer via the routed-handshake
1200 /// path. The override pins the publicly-advertised address
1201 /// when it's already known.
1202 ///
1203 /// Requires the `nat-traversal` cargo feature.
1204 #[cfg(feature = "nat-traversal")]
1205 pub fn set_reflex_override(&self, external: SocketAddr) {
1206 self.node.set_reflex_override(external);
1207 }
1208
1209 /// Drop a previously-installed reflex override. The
1210 /// classifier resumes on its normal cadence; the next sweep
1211 /// repopulates `reflex_addr` and `nat_type` from real probe
1212 /// observations. `reflex_addr` clears to `None` immediately
1213 /// so a between-sweep read doesn't return a stale override.
1214 ///
1215 /// No-op when no override is active — safe to call
1216 /// unconditionally during shutdown or a port-mapper revoke
1217 /// path.
1218 ///
1219 /// Requires the `nat-traversal` cargo feature.
1220 #[cfg(feature = "nat-traversal")]
1221 pub fn clear_reflex_override(&self) {
1222 self.node.clear_reflex_override();
1223 }
1224}
1225
1226/// Map an `AdapterError` from a subscribe / unsubscribe / publish
1227/// call into the channel-aware `SdkError` variant. Rejection acks
1228/// come through as `AdapterError::Connection("membership request
1229/// rejected: Some(<reason>)")`; parse that into
1230/// [`SdkError::ChannelRejected`].
1231fn adapter_to_channel_error(err: net::error::AdapterError) -> SdkError {
1232 use net::error::AdapterError;
1233 if let AdapterError::Connection(ref msg) = err {
1234 let prefix = "membership request rejected: ";
1235 if let Some(tail) = msg.strip_prefix(prefix) {
1236 let reason = parse_ack_reason(tail);
1237 return SdkError::ChannelRejected(reason);
1238 }
1239 }
1240 SdkError::from(err)
1241}
1242
1243fn parse_ack_reason(s: &str) -> Option<AckReason> {
1244 // `{:?}` of `Option<AckReason>` produces `Some(Unauthorized)` etc.
1245 let inside = s.trim().strip_prefix("Some(")?.strip_suffix(')')?;
1246 match inside {
1247 "Unauthorized" => Some(AckReason::Unauthorized),
1248 "UnknownChannel" => Some(AckReason::UnknownChannel),
1249 "RateLimited" => Some(AckReason::RateLimited),
1250 "TooManyChannels" => Some(AckReason::TooManyChannels),
1251 _ => None,
1252 }
1253}