Skip to main content

net_sdk/
mesh.rs

1//! Multi-peer mesh handle for the SDK.
2//!
3//! `Mesh` wraps [`MeshNode`] with ergonomic methods for connecting to
4//! peers, sending events, and polling received events. Unlike
5//! [`crate::Net`], which is backed by an `EventBus` + adapter, `Mesh`
6//! manages its own encrypted peer sessions and routing.
7//!
8//! # Example
9//!
10//! ```rust,no_run
11//! use net_sdk::mesh::{Mesh, MeshBuilder};
12//!
13//! # async fn example() -> net_sdk::error::Result<()> {
14//! let mut node = Mesh::builder("127.0.0.1:9000", b"my-32-byte-preshared-key-here!!!")?
15//!     .heartbeat_ms(200)
16//!     .session_timeout_ms(5000)
17//!     .build()
18//!     .await?;
19//!
20//! // Connect to a peer
21//! let peer_pubkey = [0u8; 32]; // get from peer.public_key()
22//! node.connect("127.0.0.1:9001", &peer_pubkey, 0x2222).await?;
23//! node.start();
24//!
25//! // Send events
26//! node.send(0x2222, &serde_json::json!({"token": "hello"})).await?;
27//!
28//! // Poll received events
29//! let events = node.recv(100).await?;
30//!
31//! node.shutdown().await?;
32//! # Ok(())
33//! # }
34//! ```
35
36use std::net::SocketAddr;
37use std::sync::Arc;
38use std::time::Duration;
39
40use bytes::Bytes;
41use serde::Serialize;
42
43use net::adapter::net::{
44    AckReason, ChannelConfig, ChannelConfigRegistry, ChannelName, ChannelPublisher, EntityKeypair,
45    MeshNode, MeshNodeConfig, MigrationSubprotocolHandler, PublishConfig, PublishReport, Stream,
46    StreamConfig, StreamStats,
47};
48use net::adapter::Adapter;
49use net::event::StoredEvent;
50
51use crate::error::{Result, SdkError};
52
53/// Options passed to [`Mesh::subscribe_channel_with`].
54///
55/// Today the only knob is a presented
56/// [`PermissionToken`](crate::identity::PermissionToken) — the
57/// shape is a struct rather than a bare `Option<Token>` so future
58/// additions (request-side timeout override, subscribe priority,
59/// etc.) don't break callers.
60///
61/// # Round-trip shape
62///
63/// ```no_run
64/// # use std::time::Duration;
65/// # use net_sdk::{ChannelName, Identity, SubscribeOptions, TokenScope};
66/// # use net_sdk::mesh::MeshBuilder;
67/// # async fn example(
68/// #     publisher: &net_sdk::Mesh,
69/// #     publisher_identity: &Identity,
70/// #     subscriber_entity_id: net::adapter::net::identity::EntityId,
71/// # ) -> net_sdk::error::Result<()> {
72/// // Publisher issues a SUBSCRIBE-scope token for the subscriber.
73/// // The publisher's own `Mesh` is bound to an `Identity`, so the
74/// // token lands in its local cache when `issue_token` is called.
75/// let channel = ChannelName::new("events/trades").unwrap();
76/// let token = publisher_identity.issue_token(
77///     subscriber_entity_id,
78///     TokenScope::SUBSCRIBE,
79///     &channel,
80///     Duration::from_secs(600),
81///     0, // no further delegation
82/// );
83///
84/// // Subscriber (another `Mesh`) calls `subscribe_channel_with`,
85/// // attaching the same token bytes they received from the
86/// // publisher out of band. The publisher verifies the signature,
87/// // checks `subject == subscriber_entity_id`, installs it in its
88/// // cache, then runs `can_subscribe`.
89/// let subscriber: &net_sdk::Mesh = unimplemented!();
90/// subscriber
91///     .subscribe_channel_with(
92///         publisher.node_id(),
93///         &channel,
94///         SubscribeOptions { token: Some(token) },
95///     )
96///     .await?;
97/// # Ok(())
98/// # }
99/// ```
100#[derive(Default, Debug, Clone)]
101pub struct SubscribeOptions {
102    /// Token to attach to the subscribe request. The publisher
103    /// verifies + installs it before running
104    /// `ChannelConfig::can_subscribe`, so a matching token
105    /// satisfies `require_token` channels end-to-end.
106    pub token: Option<net::adapter::net::PermissionToken>,
107}
108
109/// Builder for configuring a [`Mesh`] node.
110pub struct MeshBuilder {
111    bind_addr: SocketAddr,
112    psk: [u8; 32],
113    heartbeat_interval: Duration,
114    session_timeout: Duration,
115    num_shards: u16,
116    identity: Option<crate::identity::Identity>,
117    subnet: Option<net::adapter::net::SubnetId>,
118    subnet_policy: Option<Arc<net::adapter::net::SubnetPolicy>>,
119    #[cfg(feature = "nat-traversal")]
120    reflex_override: Option<SocketAddr>,
121    #[cfg(feature = "port-mapping")]
122    try_port_mapping: bool,
123}
124
125impl MeshBuilder {
126    /// Create a new builder.
127    pub fn new(bind_addr: &str, psk: &[u8; 32]) -> Result<Self> {
128        let addr: SocketAddr = bind_addr
129            .parse()
130            .map_err(|e| SdkError::Config(format!("invalid bind address: {}", e)))?;
131        Ok(Self {
132            bind_addr: addr,
133            psk: *psk,
134            heartbeat_interval: Duration::from_secs(5),
135            session_timeout: Duration::from_secs(30),
136            num_shards: 4,
137            identity: None,
138            subnet: None,
139            subnet_policy: None,
140            #[cfg(feature = "nat-traversal")]
141            reflex_override: None,
142            #[cfg(feature = "port-mapping")]
143            try_port_mapping: false,
144        })
145    }
146
147    /// Pin this node to a caller-owned [`Identity`](crate::Identity).
148    ///
149    /// Without this call, `build()` generates an ephemeral keypair —
150    /// fine for one-off sessions, but every restart produces a new
151    /// entity id (and therefore a new node id). Provide an identity
152    /// loaded from disk / vault / enclave to keep stable addressing.
153    ///
154    /// The identity's [`TokenCache`](crate::identity::TokenCache) is
155    /// also bound to this mesh; tokens installed via
156    /// [`Identity::install_token`](crate::identity::Identity::install_token)
157    /// become available to the channel auth path at subscribe time.
158    pub fn identity(mut self, identity: crate::identity::Identity) -> Self {
159        self.identity = Some(identity);
160        self
161    }
162
163    /// Set heartbeat interval in milliseconds.
164    pub fn heartbeat_ms(mut self, ms: u64) -> Self {
165        self.heartbeat_interval = Duration::from_millis(ms);
166        self
167    }
168
169    /// Set session timeout in milliseconds.
170    pub fn session_timeout_ms(mut self, ms: u64) -> Self {
171        self.session_timeout = Duration::from_millis(ms);
172        self
173    }
174
175    /// Set number of inbound shards.
176    pub fn shards(mut self, n: u16) -> Self {
177        self.num_shards = n;
178        self
179    }
180
181    /// Pin this node to a specific subnet. Defaults to
182    /// [`SubnetId::GLOBAL`](crate::subnets::SubnetId) — no
183    /// restriction. Visibility checks on the publish + subscribe
184    /// paths compare against this value.
185    pub fn subnet(mut self, id: net::adapter::net::SubnetId) -> Self {
186        self.subnet = Some(id);
187        self
188    }
189
190    /// Install a subnet policy that derives each peer's subnet from
191    /// their capability announcement. Mesh-wide policy consistency
192    /// is assumed — mismatched policies across nodes lead to
193    /// asymmetric views of peer subnets.
194    ///
195    /// Accepts either an owned `SubnetPolicy` or an `Arc<SubnetPolicy>`
196    /// via blanket `Into` support — useful when several builders
197    /// share one policy at node construction time.
198    pub fn subnet_policy(
199        mut self,
200        policy: impl Into<Arc<net::adapter::net::SubnetPolicy>>,
201    ) -> Self {
202        self.subnet_policy = Some(policy.into());
203        self
204    }
205
206    /// Pin this mesh's publicly-advertised reflex `SocketAddr` to
207    /// the supplied external address. The classifier's background
208    /// sweep is skipped; the node starts in `NatClass::Open` with
209    /// `reflex_addr = Some(external)` on outbound capability
210    /// announcements.
211    ///
212    /// Intended for:
213    ///
214    /// - **Port-forwarded servers.** An operator who has manually
215    ///   configured a port forward knows the external address and
216    ///   shouldn't wait on peer-probing to discover it.
217    /// - **Stage-4 port mapping (UPnP / NAT-PMP / PCP).** A
218    ///   successful mapping installation will set this on behalf
219    ///   of the caller.
220    ///
221    /// **Optimization, not correctness.** Nodes without an
222    /// override still reach every peer through the routed-
223    /// handshake path — the override just cuts the classifier
224    /// round-trip when the answer is already known.
225    ///
226    /// Requires the `nat-traversal` cargo feature.
227    #[cfg(feature = "nat-traversal")]
228    pub fn reflex_override(mut self, external: SocketAddr) -> Self {
229        self.reflex_override = Some(external);
230        self
231    }
232
233    /// Opt into opportunistic UPnP-IGD / NAT-PMP / PCP port
234    /// mapping at startup. When set, the mesh spawns a
235    /// [`PortMapperTask`](net::adapter::net::traversal::portmap)
236    /// during `start()` that:
237    ///
238    /// 1. Probes NAT-PMP against the OS-discovered default
239    ///    gateway (1 s deadline).
240    /// 2. Falls back to UPnP via SSDP discovery (2 s deadline).
241    /// 3. On install success, pins the reflex override to the
242    ///    mapped external address — the mesh advertises itself
243    ///    as `Open` to peers without a classifier round-trip.
244    /// 4. Renews every 30 min; revokes on shutdown; falls back
245    ///    to the classifier on three consecutive renewal
246    ///    failures.
247    ///
248    /// **Optimization, not correctness.** A node whose router
249    /// doesn't speak UPnP / NAT-PMP still reaches every peer
250    /// through the routed-handshake path. Off by default
251    /// because port mapping modifies state on the operator's
252    /// router.
253    ///
254    /// Requires the `port-mapping` cargo feature. The flag is
255    /// silently ignored when the feature is off.
256    #[cfg(feature = "port-mapping")]
257    pub fn try_port_mapping(mut self, enabled: bool) -> Self {
258        self.try_port_mapping = enabled;
259        self
260    }
261
262    /// Build the mesh node.
263    pub async fn build(self) -> Result<Mesh> {
264        // Use the caller's identity if one was set, otherwise mint an
265        // ephemeral one. `MeshNode::new` takes the keypair by value,
266        // so clone it out of the Arc when we have a shared identity.
267        let (keypair, sdk_identity) = match self.identity {
268            Some(id) => (id.keypair().as_ref().clone(), Some(id)),
269            None => (EntityKeypair::generate(), None),
270        };
271
272        let mut config = MeshNodeConfig::new(self.bind_addr, self.psk)
273            .with_heartbeat_interval(self.heartbeat_interval)
274            .with_session_timeout(self.session_timeout)
275            .with_num_shards(self.num_shards)
276            .with_handshake(3, Duration::from_secs(5));
277        if let Some(id) = self.subnet {
278            config = config.with_subnet(id);
279        }
280        if let Some(policy) = self.subnet_policy {
281            config = config.with_subnet_policy(policy);
282        }
283        #[cfg(feature = "nat-traversal")]
284        if let Some(external) = self.reflex_override {
285            config = config.with_reflex_override(external);
286        }
287        #[cfg(feature = "port-mapping")]
288        if self.try_port_mapping {
289            config = config.with_try_port_mapping(true);
290        }
291
292        let mut node = MeshNode::new(keypair, config).await?;
293        // Install a shared ChannelConfigRegistry so `register_channel`
294        // can add entries without needing `&mut Mesh` — the registry
295        // itself uses interior mutability (DashMap).
296        let channel_configs = Arc::new(ChannelConfigRegistry::new());
297        node.set_channel_configs(channel_configs.clone());
298        // Hand the caller's TokenCache to the mesh so channel auth
299        // (`require_token` / `can_subscribe` / `can_publish`) has a
300        // cache to consult + install incoming tokens into. Without
301        // an identity, no cache is installed and `require_token`
302        // channels will reject.
303        if let Some(id) = sdk_identity.as_ref() {
304            node.set_token_cache(id.token_cache().clone());
305        }
306        Ok(Mesh {
307            node: Arc::new(node),
308            channel_configs,
309            identity: sdk_identity,
310            #[cfg(feature = "tool")]
311            tool_metadata_fetch: Arc::new(parking_lot::Mutex::new(None)),
312        })
313    }
314}
315
316/// A multi-peer mesh node.
317///
318/// Manages encrypted connections to multiple peers over a single UDP
319/// socket. Supports direct peer-to-peer sends, routed multi-hop sends,
320/// automatic failure detection, and rerouting.
321pub struct Mesh {
322    /// Shared `MeshNode`. `Arc` rather than by-value so NAPI /
323    /// FFI bindings can hand the same live node to multiple
324    /// wrappers (e.g. a `DaemonRuntime` alongside the existing
325    /// `NetMesh` class) without double-owning the underlying
326    /// socket. All public methods go through `.inner()` (Arc
327    /// deref), so holding the `Arc` changes no existing call
328    /// sites.
329    node: Arc<MeshNode>,
330    /// Channel config registry shared with the underlying `MeshNode`
331    /// so `register_channel` / subscriber ACL checks operate on the
332    /// same live data.
333    channel_configs: Arc<ChannelConfigRegistry>,
334    /// Held onto so the caller's `TokenCache` (and future capability
335    /// announcement state) stays alive for the mesh's lifetime —
336    /// `MeshNode` was already handed a clone of the keypair, so this
337    /// is purely for the auxiliary state that rides alongside.
338    identity: Option<crate::identity::Identity>,
339    /// Lazy auto-install state for the `tool.metadata.fetch` nRPC
340    /// service. The first `Mesh::serve_tool` call locks this
341    /// mutex, sees `None`, installs the handler, and stores
342    /// `Some(handle)`. Subsequent `serve_tool` calls see `Some(_)`
343    /// and skip the install. The handle lives for the lifetime of
344    /// the `Mesh`; the service stays registered even after every
345    /// individual tool ServeHandle is dropped (low cost — the
346    /// handler just answers `NotFound` for every name once the
347    /// registry is empty again).
348    ///
349    /// `pub(crate)` so the SDK's `tool` module — which lives in a
350    /// separate file but the same crate — can reach this slot
351    /// without an accessor stub.
352    #[cfg(feature = "tool")]
353    pub(crate) tool_metadata_fetch: Arc<parking_lot::Mutex<Option<crate::mesh_rpc::ServeHandle>>>,
354}
355
356impl Mesh {
357    /// Create a builder.
358    pub fn builder(bind_addr: &str, psk: &[u8; 32]) -> Result<MeshBuilder> {
359        MeshBuilder::new(bind_addr, psk)
360    }
361
362    /// Get this node's Noise public key.
363    ///
364    /// Share this with peers so they can connect to this node.
365    pub fn public_key(&self) -> &[u8; 32] {
366        self.node.public_key()
367    }
368
369    /// Get this node's ID (derived from ed25519 identity).
370    pub fn node_id(&self) -> u64 {
371        self.node.node_id()
372    }
373
374    /// Get the local bind address.
375    pub fn local_addr(&self) -> SocketAddr {
376        self.node.local_addr()
377    }
378
379    /// Install (or clear with `None`) the caller-side nRPC
380    /// observer for this `Mesh`. Fires on every `call_typed`
381    /// completion (success / server error / timeout / transport
382    /// error) with a typed [`crate::mesh_rpc::RpcCallEvent`].
383    /// See `DECK_DEMO_HARNESS_PLAN.md` Missing Item D for the
384    /// design rationale.
385    ///
386    /// Replaces any previously-installed observer.
387    /// Observers run inline on the dispatch task; implementations
388    /// must be cheap (push into a bounded ring / mpsc, not
389    /// block).
390    ///
391    /// v1 fires only `RpcDirection::Outbound`; server-side
392    /// (inbound) firing is a follow-up.
393    #[cfg(feature = "cortex")]
394    pub fn set_rpc_observer(&self, observer: Option<crate::mesh_rpc::RpcObserverHandle>) {
395        self.node.set_rpc_observer(observer);
396    }
397
398    /// Crate-internal accessor for the underlying `MeshNode`.
399    /// Used by `mesh_rpc` to delegate the typed RPC API; not
400    /// intended for downstream consumers (the public surface
401    /// stays on `Mesh` itself). Gated on the same feature
402    /// combination as its sole consumer (`mesh_rpc` /
403    /// `mesh_rpc_resilience`) so feature combinations that
404    /// exclude either don't trip dead-code lints.
405    #[cfg(all(feature = "net", feature = "cortex"))]
406    pub(crate) fn node(&self) -> &Arc<MeshNode> {
407        &self.node
408    }
409
410    /// Crate-internal accessor for the SDK's
411    /// `ChannelConfigRegistry`. Used by `mesh_rpc` to
412    /// auto-register the request channel + reply prefix on
413    /// `serve_rpc`.
414    #[cfg(all(feature = "net", feature = "cortex"))]
415    pub(crate) fn channel_configs_arc(&self) -> &Arc<ChannelConfigRegistry> {
416        &self.channel_configs
417    }
418
419    /// Connect to a peer as initiator.
420    ///
421    /// The peer must be listening (call `accept()` on their side).
422    /// `peer_pubkey` is the peer's Noise public key from `public_key()`.
423    pub async fn connect(
424        &self,
425        peer_addr: &str,
426        peer_pubkey: &[u8; 32],
427        peer_node_id: u64,
428    ) -> Result<()> {
429        let addr: SocketAddr = peer_addr
430            .parse()
431            .map_err(|e| SdkError::Config(format!("invalid peer address: {}", e)))?;
432        self.node.connect(addr, peer_pubkey, peer_node_id).await?;
433        Ok(())
434    }
435
436    /// Accept an incoming connection as responder.
437    ///
438    /// Waits for a peer to initiate a Noise handshake.
439    /// Returns the peer's address.
440    pub async fn accept(&self, peer_node_id: u64) -> Result<SocketAddr> {
441        let (addr, _) = self.node.accept(peer_node_id).await?;
442        Ok(addr)
443    }
444
445    /// Connect to a peer when the responder is already
446    /// `start()`ed and hasn't pre-`accept()`'d this initiator's
447    /// `node_id` — the standard "remote-attach against a running
448    /// daemon" case. Mirror of [`Self::connect`] for that
449    /// scenario; the local side must also have `start()` called
450    /// before this method (the dispatch loop is what completes
451    /// the handshake).
452    ///
453    /// `relay_addr` is the wire address to send msg1 to. The
454    /// degenerate single-hop case (relay == final destination)
455    /// is the CLI remote-attach pattern; the multi-hop case
456    /// (relay forwards to dest) is the same call signature.
457    /// Either way the destination's running dispatch loop
458    /// receives msg1 via the routed-handshake protocol and
459    /// replies with msg2.
460    ///
461    /// # Why a separate method from `connect`?
462    ///
463    /// `connect` uses the direct-handshake protocol, where the
464    /// responder must pre-register the initiator's `node_id`
465    /// via `accept()` before its `start()`. `connect_via` uses
466    /// the routed-handshake protocol — the initiator's full
467    /// `node_id` rides inside the Noise msg1 payload, so the
468    /// responder learns it on demand. No pre-`accept` needed.
469    pub async fn connect_via(
470        &self,
471        relay_addr: &str,
472        peer_pubkey: &[u8; 32],
473        peer_node_id: u64,
474    ) -> Result<()> {
475        let addr: SocketAddr = relay_addr
476            .parse()
477            .map_err(|e| SdkError::Config(format!("invalid relay address: {}", e)))?;
478        self.node
479            .connect_via(addr, peer_pubkey, peer_node_id)
480            .await?;
481        Ok(())
482    }
483
484    /// Start the receive loop, heartbeat sender, and router.
485    ///
486    /// Call this after connecting to peers. Events won't be received
487    /// until `start()` is called.
488    pub fn start(&self) {
489        // `start_arc` (vs bare `start`) enables the periodic capability
490        // re-announce, keeping this node's entry alive in its own and
491        // peers' folds past one announcement TTL.
492        self.node.start_arc();
493    }
494
495    /// Number of connected peers.
496    pub fn peer_count(&self) -> usize {
497        self.node.peer_count()
498    }
499
500    // ---- Sending ----
501
502    /// Send a serializable event to a direct peer.
503    pub async fn send_to(&self, peer_addr: &str, event: &impl Serialize) -> Result<()> {
504        let addr: SocketAddr = peer_addr
505            .parse()
506            .map_err(|e| SdkError::Config(format!("invalid address: {}", e)))?;
507        let json = serde_json::to_vec(event)?;
508        let batch = net::event::Batch {
509            shard_id: 0,
510            events: vec![net::event::InternalEvent::new(Bytes::from(json), 0, 0)],
511            sequence_start: 0,
512            process_nonce: net::event::batch_process_nonce(),
513        };
514        self.node.send_to_peer(addr, &batch).await?;
515        Ok(())
516    }
517
518    /// Send a serializable event via the routing table.
519    ///
520    /// The event is encrypted for the destination and forwarded through
521    /// intermediate nodes if needed. Requires a route to `dest_node_id`
522    /// in the routing table and a session with the destination.
523    pub async fn send(&self, dest_node_id: u64, event: &impl Serialize) -> Result<()> {
524        let json = serde_json::to_vec(event)?;
525        let batch = net::event::Batch {
526            shard_id: 0,
527            events: vec![net::event::InternalEvent::new(Bytes::from(json), 0, 0)],
528            sequence_start: 0,
529            process_nonce: net::event::batch_process_nonce(),
530        };
531        self.node.send_routed(dest_node_id, &batch).await?;
532        Ok(())
533    }
534
535    /// Send raw bytes to a direct peer.
536    pub async fn send_raw_to(&self, peer_addr: &str, data: &[u8]) -> Result<()> {
537        let addr: SocketAddr = peer_addr
538            .parse()
539            .map_err(|e| SdkError::Config(format!("invalid address: {}", e)))?;
540        let batch = net::event::Batch {
541            shard_id: 0,
542            events: vec![net::event::InternalEvent::new(
543                Bytes::copy_from_slice(data),
544                0,
545                0,
546            )],
547            sequence_start: 0,
548            process_nonce: net::event::batch_process_nonce(),
549        };
550        self.node.send_to_peer(addr, &batch).await?;
551        Ok(())
552    }
553
554    // ---- Receiving ----
555
556    /// Poll for received events.
557    ///
558    /// Returns up to `limit` events from all shards.
559    pub async fn recv(&self, limit: usize) -> Result<Vec<StoredEvent>> {
560        // Poll shard 0 (most events land here for single-stream sends)
561        let result = self.node.poll_shard(0, None, limit).await?;
562        Ok(result.events)
563    }
564
565    /// Poll a specific shard for events.
566    pub async fn recv_shard(&self, shard_id: u16, limit: usize) -> Result<Vec<StoredEvent>> {
567        let result = self.node.poll_shard(shard_id, None, limit).await?;
568        Ok(result.events)
569    }
570
571    // ---- Channels (distributed pub/sub) ----
572
573    /// Register a channel on this publisher. Subscribers who ask to
574    /// join are validated against `config` before being added to the
575    /// subscriber roster.
576    ///
577    /// `config.channel_id` must be built from the same canonical name
578    /// subscribers pass to `subscribe_channel`. The registry keys on
579    /// the canonical name (not the u16 hash) to avoid ACL bypass via
580    /// hash collision.
581    ///
582    /// Idempotent: re-registering the same channel replaces the prior
583    /// config.
584    pub fn register_channel(&self, config: ChannelConfig) {
585        self.channel_configs.insert(config);
586    }
587
588    /// Ask `publisher_node_id` to add this node to `channel`'s
589    /// subscriber set. Blocks until the publisher's `Ack` arrives or
590    /// the mesh's membership-ack timeout elapses.
591    ///
592    /// Returns `Ok(())` on acceptance; rejection (unauthorized /
593    /// unknown channel / rate-limited / too-many-channels) surfaces
594    /// as `SdkError::ChannelRejected(reason)`. Network-level failures
595    /// surface as `SdkError::Adapter(...)`.
596    ///
597    /// This bare form presents no credential. On a **token-gated**
598    /// channel it is always rejected — the publisher requires a token
599    /// chain on *every* subscribe and does not honor a credential
600    /// presented on a previous subscribe (e.g. before a reconnect or
601    /// roster eviction). Re-subscribe with
602    /// [`Self::subscribe_channel_with`] carrying the token each time.
603    pub async fn subscribe_channel(
604        &self,
605        publisher_node_id: u64,
606        channel: &ChannelName,
607    ) -> Result<()> {
608        self.subscribe_channel_with(publisher_node_id, channel, SubscribeOptions::default())
609            .await
610    }
611
612    /// Subscribe with options — optionally presenting a
613    /// [`PermissionToken`](crate::identity::PermissionToken).
614    ///
615    /// Use this when the publisher registered the channel with
616    /// `token_roots` (token enforcement) and/or a `subscribe_caps`
617    /// filter that your node's capabilities alone don't satisfy. The
618    /// publisher verifies the presented token chain on arrival — it
619    /// must root at one of the channel's `token_roots`, bind at its
620    /// leaf to the subscribing peer's `EntityId`, and authorize
621    /// `SUBSCRIBE` at every link — then retains it to re-check expiry
622    /// and revocation while the subscription lives.
623    ///
624    /// The credential must be presented on **every** subscribe: a
625    /// previously-accepted chain is not reused for a later bare
626    /// [`Self::subscribe_channel`], so after a reconnect or roster
627    /// eviction you must call this again with the token.
628    pub async fn subscribe_channel_with(
629        &self,
630        publisher_node_id: u64,
631        channel: &ChannelName,
632        opts: SubscribeOptions,
633    ) -> Result<()> {
634        let result = match opts.token {
635            Some(token) => {
636                self.node
637                    .subscribe_channel_with_token(publisher_node_id, channel.clone(), token)
638                    .await
639            }
640            None => {
641                self.node
642                    .subscribe_channel(publisher_node_id, channel.clone())
643                    .await
644            }
645        };
646        match result {
647            Ok(()) => Ok(()),
648            Err(e) => Err(adapter_to_channel_error(e)),
649        }
650    }
651
652    /// Mirror of [`Self::subscribe_channel`]. Idempotent on the
653    /// publisher side — unsubscribing a non-subscriber still returns
654    /// `Ok(())`.
655    pub async fn unsubscribe_channel(
656        &self,
657        publisher_node_id: u64,
658        channel: &ChannelName,
659    ) -> Result<()> {
660        match self
661            .node
662            .unsubscribe_channel(publisher_node_id, channel.clone())
663            .await
664        {
665            Ok(()) => Ok(()),
666            Err(e) => Err(adapter_to_channel_error(e)),
667        }
668    }
669
670    /// Publish one payload to every subscriber of `channel`.
671    /// `config.on_failure` controls whether per-peer errors
672    /// short-circuit the fan-out. Returns a [`PublishReport`]
673    /// describing per-peer outcomes.
674    pub async fn publish(
675        &self,
676        channel: &ChannelName,
677        payload: Bytes,
678        config: PublishConfig,
679    ) -> Result<PublishReport> {
680        let publisher = ChannelPublisher::new(channel.clone(), config);
681        Ok(self.node.publish(&publisher, payload).await?)
682    }
683
684    /// Fan multiple payloads to every subscriber of `channel` as one
685    /// batch per subscriber. Semantics match [`Self::publish`].
686    pub async fn publish_many(
687        &self,
688        channel: &ChannelName,
689        payloads: &[Bytes],
690        config: PublishConfig,
691    ) -> Result<PublishReport> {
692        let publisher = ChannelPublisher::new(channel.clone(), config);
693        Ok(self.node.publish_many(&publisher, payloads).await?)
694    }
695
696    // ---- Routing ----
697
698    /// Add a route to a destination node.
699    ///
700    /// Packets sent to `dest_node_id` via `send()` will be forwarded
701    /// through `next_hop_addr`.
702    pub fn add_route(&self, dest_node_id: u64, next_hop_addr: &str) -> Result<()> {
703        let addr: SocketAddr = next_hop_addr
704            .parse()
705            .map_err(|e| SdkError::Config(format!("invalid address: {}", e)))?;
706        self.node.router().add_route(dest_node_id, addr);
707        Ok(())
708    }
709
710    /// Remove a route.
711    pub fn remove_route(&self, dest_node_id: u64) {
712        self.node.router().remove_route(dest_node_id);
713    }
714
715    // ---- Mesh topology ----
716
717    /// Block a peer (simulate network partition).
718    pub fn block_peer(&self, peer_addr: &str) -> Result<()> {
719        let addr: SocketAddr = peer_addr
720            .parse()
721            .map_err(|e| SdkError::Config(format!("invalid address: {}", e)))?;
722        self.node.block_peer(addr);
723        Ok(())
724    }
725
726    /// Unblock a peer.
727    pub fn unblock_peer(&self, peer_addr: &str) -> Result<()> {
728        let addr: SocketAddr = peer_addr
729            .parse()
730            .map_err(|e| SdkError::Config(format!("invalid address: {}", e)))?;
731        self.node.unblock_peer(&addr);
732        Ok(())
733    }
734
735    /// Number of nodes discovered via pingwave propagation.
736    pub fn discovered_nodes(&self) -> usize {
737        self.node.proximity_graph().node_count()
738    }
739
740    /// Number of active reroutes (routes using alternates after failure).
741    pub fn active_reroutes(&self) -> usize {
742        self.node.reroute_policy().active_reroutes()
743    }
744
745    // ---- Streams ----
746
747    /// Open (or look up) a logical stream to a peer. See
748    /// [`net::adapter::net::MeshNode::open_stream`] for the full contract.
749    /// Repeated calls for the same `(peer, stream_id)` are idempotent;
750    /// the first open wins and subsequent configs are logged and
751    /// ignored.
752    pub fn open_stream(
753        &self,
754        peer_node_id: u64,
755        stream_id: u64,
756        config: StreamConfig,
757    ) -> Result<Stream> {
758        self.node
759            .open_stream(peer_node_id, stream_id, config)
760            .map_err(SdkError::from)
761    }
762
763    /// Close a stream: drop its `StreamState` and free the window. Idempotent.
764    pub fn close_stream(&self, peer_node_id: u64, stream_id: u64) {
765        self.node.close_stream(peer_node_id, stream_id);
766    }
767
768    /// Send a batch of events on an explicit stream.
769    ///
770    /// Returns [`SdkError::Backpressure`] when the stream's per-stream
771    /// in-flight window is full (no events were sent — the caller
772    /// decides whether to drop, retry, or buffer). [`SdkError::NotConnected`]
773    /// when the peer session is gone. All other failures surface as
774    /// [`SdkError::Adapter`].
775    pub async fn send_on_stream(&self, stream: &Stream, events: &[Bytes]) -> Result<()> {
776        self.node
777            .send_on_stream(stream, events)
778            .await
779            .map_err(SdkError::from)
780    }
781
782    /// Send events, retrying on `Backpressure` with exponential backoff
783    /// (5 ms → 200 ms, doubling) up to `max_retries` times. Transport
784    /// errors and `NotConnected` are returned immediately.
785    pub async fn send_with_retry(
786        &self,
787        stream: &Stream,
788        events: &[Bytes],
789        max_retries: usize,
790    ) -> Result<()> {
791        self.node
792            .send_with_retry(stream, events, max_retries)
793            .await
794            .map_err(SdkError::from)
795    }
796
797    /// Block the calling task until the send succeeds or a transport
798    /// error occurs. See [`Mesh::send_with_retry`] for finer control.
799    pub async fn send_blocking(&self, stream: &Stream, events: &[Bytes]) -> Result<()> {
800        self.node
801            .send_blocking(stream, events)
802            .await
803            .map_err(SdkError::from)
804    }
805
806    /// Snapshot of per-stream stats (tx/rx seq, window, in-flight,
807    /// backpressure count, activity).
808    pub fn stream_stats(&self, peer_node_id: u64, stream_id: u64) -> Option<StreamStats> {
809        self.node.stream_stats(peer_node_id, stream_id)
810    }
811
812    /// Snapshot stats for every stream in the session to `peer_node_id`.
813    pub fn all_stream_stats(&self, peer_node_id: u64) -> Vec<(u64, StreamStats)> {
814        self.node.all_stream_stats(peer_node_id)
815    }
816
817    // ---- Capability announcements ----
818
819    /// Announce this node's capabilities to every directly-connected
820    /// peer. Self-indexes too, so `find_nodes` called from this same
821    /// node matches on the announcement. Multi-hop propagation is
822    /// deferred — peers more than one hop away will not see the
823    /// announcement.
824    ///
825    /// Default TTL is 5 minutes; use
826    /// [`Self::announce_capabilities_with`] to override.
827    pub async fn announce_capabilities(
828        &self,
829        caps: crate::capabilities::CapabilitySet,
830    ) -> Result<()> {
831        self.node.announce_capabilities(caps).await?;
832        Ok(())
833    }
834
835    /// Extended announce with explicit TTL and signing opt-in.
836    /// `sign = true` is accepted but currently a no-op; signatures
837    /// tie in with Stage E (channel auth), once `node_id` →
838    /// `EntityId` binding is wired.
839    pub async fn announce_capabilities_with(
840        &self,
841        caps: crate::capabilities::CapabilitySet,
842        ttl: std::time::Duration,
843        sign: bool,
844    ) -> Result<()> {
845        self.node
846            .announce_capabilities_with(caps, ttl, sign)
847            .await?;
848        Ok(())
849    }
850
851    /// Query the capability index. Returns node ids whose latest
852    /// announcement matches `filter`; includes our own `node_id` if
853    /// our own announcement matches.
854    pub fn find_nodes(&self, filter: &crate::capabilities::CapabilityFilter) -> Vec<u64> {
855        self.node.find_nodes_by_filter(filter)
856    }
857
858    /// Scoped variant of [`Self::find_nodes`]. Filters candidates
859    /// through a [`crate::capabilities::ScopeFilter`] derived from
860    /// each node's `scope:*` reserved tags. Untagged nodes resolve
861    /// to `Global` and remain visible under most filters by design;
862    /// nodes tagged `scope:subnet-local` only show up under
863    /// [`crate::capabilities::ScopeFilter::SameSubnet`]. See
864    /// `docs/SCOPED_CAPABILITIES_PLAN.md` for the full table.
865    pub fn find_nodes_scoped(
866        &self,
867        filter: &crate::capabilities::CapabilityFilter,
868        scope: &crate::capabilities::ScopeFilter<'_>,
869    ) -> Vec<u64> {
870        self.node.find_nodes_by_filter_scoped(filter, scope)
871    }
872
873    /// Pick the single best-scoring node for a placement
874    /// requirement. Returns the winning node's id, or `None` if no
875    /// node matches.
876    pub fn find_best_node(&self, req: &crate::capabilities::CapabilityRequirement) -> Option<u64> {
877        self.node.find_best_node(req)
878    }
879
880    /// Scoped variant of [`Self::find_best_node`]. Picks the highest-
881    /// scoring node within the scope-filtered candidate set.
882    pub fn find_best_node_scoped(
883        &self,
884        req: &crate::capabilities::CapabilityRequirement,
885        scope: &crate::capabilities::ScopeFilter<'_>,
886    ) -> Option<u64> {
887        self.node.find_best_node_scoped(req, scope)
888    }
889
890    /// Bucketed aggregation over the local capability fold. Composes
891    /// [`TagMatcher`](crate::capabilities::TagMatcher) ×
892    /// [`GroupBy`](crate::capabilities::GroupBy) ×
893    /// [`Aggregation`](crate::capabilities::Aggregation) into a
894    /// `Vec<(bucket, value)>` sorted lex by bucket key. Phase 6c-A
895    /// of `MULTIFOLD_PHASE_6C_CAPACITY_AGGREGATION.md`.
896    ///
897    /// `matcher = None` walks every entry. Returns an empty vec when
898    /// no entries match.
899    pub fn capability_aggregate(
900        &self,
901        matcher: Option<crate::capabilities::TagMatcher>,
902        group_by: crate::capabilities::GroupBy,
903        agg: crate::capabilities::Aggregation,
904    ) -> Vec<(String, u64)> {
905        self.node
906            .capability_fold()
907            .aggregate(matcher, group_by, agg)
908    }
909
910    /// Capacity-ranked materialized view. Wraps
911    /// [`Self::capability_aggregate`] with per-bucket state
912    /// breakdown (`idle` / `busy` / `reserved`), an RTT gate, and
913    /// optional summed numeric capacity. Phase 6c-B of
914    /// `MULTIFOLD_PHASE_6C_CAPACITY_AGGREGATION.md`.
915    ///
916    /// `rtt_lookup` maps a publisher's `node_id` to current RTT in
917    /// milliseconds. When `query.max_rtt_ms` is `None`, the closure
918    /// is never invoked; when set, publishers whose lookup returns
919    /// `None` are dropped (fail-closed — never-pinged nodes don't
920    /// ride a "fastest available" filter as zero).
921    ///
922    /// Faulty entries are always excluded. Rows return sorted by
923    /// `available` descending; ties broken by bucket key ascending.
924    /// Truncated to `query.limit` (0 = no truncation).
925    ///
926    /// # Example
927    ///
928    /// ```
929    /// # async fn doc() -> net_sdk::error::Result<()> {
930    /// use net_sdk::capabilities::{
931    ///     CapabilitySet, CapacityQuery, GroupBy, TagMatcher,
932    /// };
933    /// use net_sdk::mesh::MeshBuilder;
934    ///
935    /// let node = MeshBuilder::new("127.0.0.1:0", &[0x42u8; 32])?
936    ///     .build()
937    ///     .await?;
938    /// node.announce_capabilities(
939    ///     CapabilitySet::new()
940    ///         .add_tag("hardware.gpu")
941    ///         .add_tag("hardware.gpu.h100")
942    ///         .add_tag("hardware.gpu.count=8"),
943    /// )
944    /// .await?;
945    ///
946    /// // Top GPU types by available capacity, no RTT filter,
947    /// // summed count column populated.
948    /// let view = node.capability_capacity_ranking(
949    ///     CapacityQuery {
950    ///         matcher: Some(TagMatcher::Prefix { value: "hardware.gpu".into() }),
951    ///         group_by: GroupBy::TagStem { prefix: "hardware.gpu".into() },
952    ///         max_rtt_ms: None,
953    ///         sum_axis_key: Some("hardware.gpu.count".into()),
954    ///         limit: 5,
955    ///     },
956    ///     |_node_id| None,
957    /// );
958    /// // Self-match: one bucket per stem this node carries.
959    /// assert!(view.iter().any(|row| row.bucket == "h100"));
960    /// # Ok(())
961    /// # }
962    /// ```
963    pub fn capability_capacity_ranking<R>(
964        &self,
965        query: crate::capabilities::CapacityQuery,
966        rtt_lookup: R,
967    ) -> Vec<crate::capabilities::CapacityRow>
968    where
969        R: Fn(u64) -> Option<u32>,
970    {
971        self.node
972            .capability_fold()
973            .capacity_ranking(query, rtt_lookup)
974    }
975
976    // ---- Lifecycle ----
977
978    /// Set a migration handler (for Mikoshi daemon migration).
979    pub fn set_migration_handler(&mut self, handler: Arc<MigrationSubprotocolHandler>) {
980        self.node.set_migration_handler(handler);
981    }
982
983    /// Gracefully shut down.
984    pub async fn shutdown(self) -> Result<()> {
985        self.node.shutdown().await?;
986        Ok(())
987    }
988
989    /// Get a reference to the underlying `MeshNode`.
990    pub fn inner(&self) -> &MeshNode {
991        &self.node
992    }
993
994    /// Clone the `Arc`-shared `MeshNode` handle out of the mesh.
995    ///
996    /// Used by FFI bindings (currently: NAPI) that need to hand
997    /// the same live node to the `net-sdk::compute::DaemonRuntime`
998    /// **and** to their own wrapper class without constructing a
999    /// second UDP socket. All public `MeshNode` operations go
1000    /// through `&MeshNode`, so two Arc holders observe exactly
1001    /// the same state.
1002    pub fn node_arc(&self) -> Arc<MeshNode> {
1003        self.node.clone()
1004    }
1005
1006    /// Construct a `Mesh` that shares an existing `MeshNode` with
1007    /// another owner. Used by FFI bindings that already hold an
1008    /// `Arc<MeshNode>` (e.g. NAPI's `NetMesh`) and need a `Mesh`
1009    /// wrapper so the SDK's `DaemonRuntime` can be built against
1010    /// the same live node.
1011    ///
1012    /// Does not re-install `channel_configs` or a `TokenCache` —
1013    /// the owner of the original `MeshNode` is responsible for
1014    /// that wiring. Supplied `channel_configs` / `identity`
1015    /// arguments are held onto here purely so the `Mesh`'s own
1016    /// helpers (channel registration lookup, identity getter)
1017    /// have data to return.
1018    pub fn from_node_arc(
1019        node: Arc<MeshNode>,
1020        channel_configs: Arc<ChannelConfigRegistry>,
1021        identity: Option<crate::identity::Identity>,
1022    ) -> Self {
1023        Self {
1024            node,
1025            channel_configs,
1026            identity,
1027            #[cfg(feature = "tool")]
1028            tool_metadata_fetch: Arc::new(parking_lot::Mutex::new(None)),
1029        }
1030    }
1031
1032    /// Caller-owned identity bound to this mesh, if any. Returns
1033    /// `None` for meshes built without `.identity(...)` (ephemeral
1034    /// keypair).
1035    pub fn identity(&self) -> Option<&crate::identity::Identity> {
1036        self.identity.as_ref()
1037    }
1038
1039    // ── NAT traversal ──────────────────────────────────────────
1040    //
1041    // Framing (load-bearing — see `docs/NAT_TRAVERSAL_PLAN.md`
1042    // stage 5): every user-visible docstring here must position
1043    // NAT traversal as **optimization, not correctness**. Nodes
1044    // behind NAT can always talk through the mesh's routed-
1045    // handshake path. These APIs let the mesh upgrade to a
1046    // *direct* path when the underlying NATs allow it, cutting
1047    // relay hops out of the data plane. A `nat_type` of
1048    // `symmetric` or a `PunchFailed` error is not a
1049    // connectivity failure — it just means traffic keeps
1050    // riding the relay.
1051    //
1052    // Anti-phrasings to avoid: "required for NATed peers",
1053    // "enables cross-NAT connectivity", "fixes NAT issues."
1054    // Each of these implies the mesh otherwise can't reach
1055    // NATed peers, which is false.
1056
1057    /// Current NAT classification for this mesh's public face,
1058    /// as observed against other peers during the classification
1059    /// sweep. One of `Open`, `Cone`, `Symmetric`, or `Unknown`
1060    /// (pre-sweep or insufficient data).
1061    ///
1062    /// **Optimization, not correctness.** A `Symmetric`
1063    /// classification doesn't prevent this mesh from
1064    /// communicating with any peer — it just means the direct-
1065    /// punch optimization is unlikely to succeed against some
1066    /// peers, and traffic will keep riding the routed path.
1067    ///
1068    /// Requires the `nat-traversal` cargo feature.
1069    #[cfg(feature = "nat-traversal")]
1070    pub fn nat_type(&self) -> net::adapter::net::traversal::classify::NatClass {
1071        self.node.nat_class()
1072    }
1073
1074    /// This mesh's public-facing `SocketAddr` as observed by a
1075    /// remote peer, or `None` before the first classification
1076    /// sweep has produced an observation.
1077    ///
1078    /// Piggybacks on outbound `CapabilityAnnouncement`s so peers
1079    /// can attempt a direct-connect without a separate
1080    /// discovery round-trip. Read by peers implementing the
1081    /// `connect_direct` rendezvous path.
1082    ///
1083    /// Requires the `nat-traversal` cargo feature.
1084    #[cfg(feature = "nat-traversal")]
1085    pub fn reflex_addr(&self) -> Option<SocketAddr> {
1086        self.node.reflex_addr()
1087    }
1088
1089    /// The NAT classification most recently advertised by
1090    /// `peer_node_id` (parsed from the `nat:*` tag on their
1091    /// capability announcement). Returns `NatClass::Unknown`
1092    /// when the peer hasn't announced or was compiled without
1093    /// NAT traversal — the pair-type matrix treats Unknown as
1094    /// "attempt direct, fall back on failure," not as
1095    /// "don't attempt."
1096    ///
1097    /// Requires the `nat-traversal` cargo feature.
1098    #[cfg(feature = "nat-traversal")]
1099    pub fn peer_nat_type(
1100        &self,
1101        peer_node_id: u64,
1102    ) -> net::adapter::net::traversal::classify::NatClass {
1103        self.node.peer_nat_class(peer_node_id)
1104    }
1105
1106    /// Send one reflex probe to `peer_node_id` and return the
1107    /// public `SocketAddr` the peer observed on the probe's UDP
1108    /// envelope. Useful for tests and for operators diagnosing a
1109    /// NAT-type classification that seems off.
1110    ///
1111    /// Times out after `TraversalConfig::reflex_timeout` (3 s
1112    /// default) on network delays, and fast-fails with
1113    /// `peer-not-reachable` on an unknown peer.
1114    ///
1115    /// Requires the `nat-traversal` cargo feature.
1116    #[cfg(feature = "nat-traversal")]
1117    pub async fn probe_reflex(&self, peer_node_id: u64) -> Result<SocketAddr> {
1118        Ok(self.node.probe_reflex(peer_node_id).await?)
1119    }
1120
1121    /// Explicitly re-run the NAT classification sweep against
1122    /// this node's currently-connected peers. Normally the
1123    /// background loop (spawned by `start()`) takes care of
1124    /// this; call this after a suspected NAT rebind (e.g. a
1125    /// gateway reboot) to accelerate the re-classification.
1126    ///
1127    /// No-op when fewer than 2 peers are connected — the
1128    /// two-probe rule needs two distinct targets to produce a
1129    /// classification. Never returns an error; a failed sweep
1130    /// leaves the previous classification intact.
1131    ///
1132    /// Requires the `nat-traversal` cargo feature.
1133    #[cfg(feature = "nat-traversal")]
1134    pub async fn reclassify_nat(&self) {
1135        self.node.reclassify_nat().await
1136    }
1137
1138    /// Establish a session to `peer_node_id` via the rendezvous
1139    /// path, using the pair-type matrix to decide between a
1140    /// direct handshake and a relay-coordinated punch. The
1141    /// returned session is equivalent in correctness to
1142    /// `connect()` — the *only* difference is that a
1143    /// `connect_direct` that lands on the punched path cuts
1144    /// relay hops out of the data plane.
1145    ///
1146    /// **Optimization, not correctness.** `connect_direct`
1147    /// always resolves: on a punch-failed outcome, the session
1148    /// is established via the routed-handshake fallback.
1149    /// Inspect `traversal_stats()` afterward to distinguish a
1150    /// successful punch from a relay fallback.
1151    ///
1152    /// `coordinator` names a peer we already have a session
1153    /// with — typically a stable relay-capable node. The
1154    /// coordinator mediates the introduction; it doesn't carry
1155    /// user-plane traffic once the punch succeeds.
1156    ///
1157    /// Fails with an `SdkError::Traversal` variant whose `kind`
1158    /// is `peer-not-reachable` (no cached reflex for `peer`),
1159    /// `transport` (socket-level error on the final handshake),
1160    /// or (internal, retried on fallback) `punch-failed`.
1161    ///
1162    /// Requires the `nat-traversal` cargo feature.
1163    #[cfg(feature = "nat-traversal")]
1164    pub async fn connect_direct(
1165        &self,
1166        peer_node_id: u64,
1167        peer_pubkey: &[u8; 32],
1168        coordinator: u64,
1169    ) -> Result<()> {
1170        self.node
1171            .connect_direct(peer_node_id, peer_pubkey, coordinator)
1172            .await?;
1173        Ok(())
1174    }
1175
1176    /// Cumulative counters for this mesh's NAT-traversal
1177    /// activity: punch attempts, successful punches, and relay
1178    /// fallbacks. Monotonic — counters never reset. Useful for
1179    /// diagnostics + telemetry (success rate, relay load
1180    /// trends).
1181    ///
1182    /// Requires the `nat-traversal` cargo feature.
1183    #[cfg(feature = "nat-traversal")]
1184    pub fn traversal_stats(&self) -> net::adapter::net::traversal::TraversalStatsSnapshot {
1185        self.node.traversal_stats()
1186    }
1187
1188    /// Install a runtime reflex override. Forces `nat_type() =
1189    /// "open"` and `reflex_addr() = Some(external)` immediately,
1190    /// short-circuiting any further classifier sweeps.
1191    ///
1192    /// Intended for operator-driven updates — a port-forward
1193    /// that went live mid-session, or a stage-4 port-mapping
1194    /// task that just installed a UPnP / NAT-PMP mapping.
1195    /// Builder-level [`MeshBuilder::reflex_override`] covers the
1196    /// startup-time case; this is the runtime equivalent.
1197    ///
1198    /// **Optimization, not correctness.** Nodes without an
1199    /// override still reach every peer via the routed-handshake
1200    /// path. The override pins the publicly-advertised address
1201    /// when it's already known.
1202    ///
1203    /// Requires the `nat-traversal` cargo feature.
1204    #[cfg(feature = "nat-traversal")]
1205    pub fn set_reflex_override(&self, external: SocketAddr) {
1206        self.node.set_reflex_override(external);
1207    }
1208
1209    /// Drop a previously-installed reflex override. The
1210    /// classifier resumes on its normal cadence; the next sweep
1211    /// repopulates `reflex_addr` and `nat_type` from real probe
1212    /// observations. `reflex_addr` clears to `None` immediately
1213    /// so a between-sweep read doesn't return a stale override.
1214    ///
1215    /// No-op when no override is active — safe to call
1216    /// unconditionally during shutdown or a port-mapper revoke
1217    /// path.
1218    ///
1219    /// Requires the `nat-traversal` cargo feature.
1220    #[cfg(feature = "nat-traversal")]
1221    pub fn clear_reflex_override(&self) {
1222        self.node.clear_reflex_override();
1223    }
1224}
1225
1226/// Map an `AdapterError` from a subscribe / unsubscribe / publish
1227/// call into the channel-aware `SdkError` variant. Rejection acks
1228/// come through as `AdapterError::Connection("membership request
1229/// rejected: Some(<reason>)")`; parse that into
1230/// [`SdkError::ChannelRejected`].
1231fn adapter_to_channel_error(err: net::error::AdapterError) -> SdkError {
1232    use net::error::AdapterError;
1233    if let AdapterError::Connection(ref msg) = err {
1234        let prefix = "membership request rejected: ";
1235        if let Some(tail) = msg.strip_prefix(prefix) {
1236            let reason = parse_ack_reason(tail);
1237            return SdkError::ChannelRejected(reason);
1238        }
1239    }
1240    SdkError::from(err)
1241}
1242
1243fn parse_ack_reason(s: &str) -> Option<AckReason> {
1244    // `{:?}` of `Option<AckReason>` produces `Some(Unauthorized)` etc.
1245    let inside = s.trim().strip_prefix("Some(")?.strip_suffix(')')?;
1246    match inside {
1247        "Unauthorized" => Some(AckReason::Unauthorized),
1248        "UnknownChannel" => Some(AckReason::UnknownChannel),
1249        "RateLimited" => Some(AckReason::RateLimited),
1250        "TooManyChannels" => Some(AckReason::TooManyChannels),
1251        _ => None,
1252    }
1253}