Skip to main content

MeshNode

Struct MeshNode 

Source
pub struct MeshNode { /* private fields */ }
Expand description

Multi-peer mesh node.

Composes NetSession (per-peer encryption), NetRouter (forwarding), and FailureDetector (heartbeat monitoring) behind a single UDP socket.

Implementations§

Source§

impl MeshNode

Source

pub fn public_key(&self) -> &[u8; 32]

Get the Noise static public key (for peers to connect to this node).

Source

pub fn is_shutdown(&self) -> bool

Whether Self::shutdown has been invoked on this node.

Exposed for tests and for FFI callers that want to verify a shutdown actually landed (rather than being a silent no-op because extra Arc references were outstanding, as an earlier net_mesh_shutdown variant did).

Source

pub async fn new( identity: EntityKeypair, config: MeshNodeConfig, ) -> Result<MeshNode, AdapterError>

Create a new mesh node.

Binds a UDP socket but does not connect to any peers yet. Call connect() to establish sessions with peers, then start() to begin the receive loop.

Source

pub fn node_id(&self) -> u64

Get this node’s ID.

Source

pub fn auth_guard(&self) -> &Arc<AuthGuard>

The per-packet authorization fast path. Writes land here on successful subscribe (via AuthGuard::allow_channel) and reads happen on every publish fan-out. Exposed primarily for tests + operator observability; production code should reach for register_channel / subscribe_channel instead.

Source

pub fn token_cache(&self) -> Option<&Arc<TokenCache>>

The shared TokenCache installed on this node, if any. Only populated when a caller registered one via Self::set_token_cache. Exposed for tests that need to assert the cache is not populated as a side effect of a rejected subscribe.

Source

pub fn subscriber_chain_count(&self) -> usize

Number of retained subscribe token chains. A verified chain is stored here only after a subscribe passes the root-anchored gate; the sweep and publish re-check consult it. Exposed for tests that assert a rejected subscribe retains nothing.

Source

pub fn entity_id(&self) -> &EntityId

Get this node’s ed25519 entity id (derived from the keypair handed to MeshNode::new). 32 bytes. Used by CapabilityAnnouncement + channel-auth path.

Source

pub fn peer_entity_id(&self, node_id: u64) -> Option<EntityId>

Look up a peer’s pinned entity_id, if the TOFU binding has been established. Returns None before we’ve received a signature-verified CapabilityAnnouncement from the peer. Exposed primarily for tests + operator observability; the channel-auth subscribe gate consults this map internally.

Source

pub fn peer_addr(&self, node_id: u64) -> Option<SocketAddr>

The peer’s socket address, if we have an active session with them. Used by the migration subprotocol to route orchestrator-originated messages (e.g. TakeSnapshot) to the source node by its node_id.

Source

pub fn migration_identity_context(&self) -> MigrationIdentityContext

Build a MigrationIdentityContext bound to this node.

The context’s closures capture this node’s long-term Noise static private key (for the envelope-open path) and an Arc-clone of the peer map (for the peer-static lookup used by the source-side seal path). The private key is wrapped in a StaticSecret inside the unseal_snapshot closure, which is zeroize-on-drop — the key is never surfaced as a readable field on the returned value.

Used by the SDK’s compute runtime to wire identity-envelope support into the migration dispatcher without handing the key across the crate boundary. The previous shape exposed static_x25519_priv() -> [u8; 32] as a pub method, which leaked long-term secret material to any SDK caller — any code with an Arc<Mesh> could copy the node’s identity key out and impersonate it indefinitely.

Source

pub fn peer_static_x25519(&self, node_id: u64) -> Option<[u8; 32]>

The peer’s Noise static X25519 public key, captured during the handshake that established the session. Load-bearing for daemon migration: the source uses this key as the seal recipient on the IdentityEnvelope, so the only party that can unseal the daemon’s ed25519 seed is the peer whose static private key completed the Noise handshake.

Returns None if we have no session with node_id, or if the underlying handshake produced a zero-filled static pubkey (a sentinel for test-only code paths that construct SessionKeys without running a real handshake).

Source

pub fn peer_subnet(&self, node_id: u64) -> Option<SubnetId>

Look up a peer’s assigned subnet, if one has been recorded. Only populated from signature-verified CapabilityAnnouncements — unsigned announcements do not write here even when a node is running with require_signed_capabilities = false. Exposed for tests + operator observability; subnet_visible consults this map on the publish / subscribe fan-out path.

Source

pub fn local_subnet(&self) -> SubnetId

This node’s own SubnetId — the value supplied via MeshNodeConfig::subnet (or SubnetId::GLOBAL when none was configured). Stable for the node’s lifetime; the substrate doesn’t reassign the local subnet at runtime.

Source

pub fn local_subnet_policy(&self) -> Option<&Arc<SubnetPolicy>>

Read-only handle to the SubnetPolicy that derived this node’s local_subnet, when one was supplied. None when the local subnet came from MeshNodeConfig::subnet directly without going through a policy. Operator tools surface this to explain “why is this node in subnet X.”

Source

pub fn known_subnets(&self) -> Vec<(u64, SubnetId)>

Snapshot of every (node_id, subnet_id) pair the local node has cached from signature-verified capability announcements. Sorted by node_id for stable output. Used by operator tooling (net subnet ls / subnet tree) to enumerate the mesh’s subnet topology from a single vantage point — anything not yet announced (or announced unsigned) is invisible here, matching peer_subnet’s “signed-only” contract.

Source

pub fn local_addr(&self) -> SocketAddr

Get the local bind address.

Source

pub fn router(&self) -> &Arc<NetRouter>

Get the router (for adding routes, checking stats).

Source

pub fn failure_detector(&self) -> &Arc<FailureDetector>

Get the failure detector.

Source

pub fn set_migration_handler(&self, handler: Arc<MigrationSubprotocolHandler>)

Set the migration subprotocol handler.

Can be called before or after start(). When set, inbound packets with subprotocol_id == 0x0500 are dispatched to this handler instead of being queued as events. Idempotent w.r.t. replacing the handler — a second call swaps in the new one atomically.

Use Self::clear_migration_handler to uninstall (returns the mesh to the no-handler state where inbound migration packets hit the ComputeNotSupported fallback). Needed by DaemonRuntime::shutdown and by start’s lost-race cleanup path — the mesh must not hold a live handler pointing at a runtime that is no longer serving daemons.

Source

pub fn clear_migration_handler(&self)

Uninstall the migration subprotocol handler. After this call, inbound migration subprotocol packets hit the no-handler fallback and synthesise ComputeNotSupported for migration-initiating messages (other message types are dropped).

Used by the SDK’s DaemonRuntime::start to clean up after losing the install-vs-CAS race against a concurrent shutdown: if start installed a handler but its CAS to Ready lost to shutdown’s state flip, the mesh would otherwise be left with a live handler owned by a runtime that’s already been torn down.

Source

pub fn has_migration_handler(&self) -> bool

Returns true iff a migration subprotocol handler is currently installed on this mesh. Used primarily by tests that need to observe the ordering of handler installation against other runtime state transitions — the ArcSwap load itself is a public API surface regardless.

Source

pub fn block_peer(&self, addr: SocketAddr)

Block packets from/to a peer address (simulates network partition).

Source

pub fn unblock_peer(&self, addr: &SocketAddr)

Unblock a peer address (simulates partition healing).

Source

pub fn is_blocked(&self, addr: &SocketAddr) -> bool

Check if a peer is blocked.

Source

pub fn proximity_graph(&self) -> &Arc<ProximityGraph>

Get the proximity graph.

Source

pub fn reroute_policy(&self) -> &Arc<ReroutePolicy>

Get the reroute policy (for checking reroute stats in tests).

Source

pub fn peer_count(&self) -> usize

Number of connected peers.

Source

pub async fn connect( &self, peer_addr: SocketAddr, peer_pubkey: &[u8; 32], peer_node_id: u64, ) -> Result<u64, AdapterError>

Connect to a peer. Performs a Noise NKpsk0 handshake as initiator.

The peer must be listening and ready to accept the handshake. Returns the peer’s node ID on success.

Source

pub async fn accept( &self, peer_node_id: u64, ) -> Result<(SocketAddr, u64), AdapterError>

Accept a connection from a peer. Performs Noise NKpsk0 as responder.

Waits for an incoming handshake packet and completes the handshake. Returns the peer’s address and assigns the given node_id.

§Ordering contract

accept() MUST be called before Self::start(). Once start() has spawned the dispatch loop, the dispatcher consumes every inbound UDP datagram from the shared socket; try_handshake_responder polls the same socket directly and races the dispatcher for incoming msg1 packets. Because no per-pending-responder registry exists today (initiator-side handshakes use one — see pending_direct_initiators at mesh.rs:~1216; the responder side is a deferred design item), a start() → accept() ordering produces a swallowed msg1 and a hang. To prevent that hang silently turning into a debugging nightmare, calling accept() after start() now returns an explicit error rather than spinning forever.

Source

pub fn start(&self)

Start the receive loop and heartbeat tasks.

Must be called after connect() / accept() to begin processing inbound packets.

Refuses (no-op return) if any accept() call is currently in flight. Symmetric to accept’s contract: either accept observes started=true and bails (in-flight count goes to 0, then start proceeds), or start observes accept_in_flight > 0 and refuses. The SeqCst orderings make this mutually exclusive. Without this counter check, start could fire between accept’s started.load and its handshake_responder poll, after which the dispatcher would race the responder for the inbound msg1.

Note: this does NOT enable the periodic capability re-announce (which keeps the node’s entry alive in its own and peers’ folds past one TTL) — that needs an owned Arc to re-broadcast. Drive the node through Self::start_arc (as the SDK / FFI do) to get it; a bare start is for short-lived / test nodes.

Source

pub fn start_arc(self: &Arc<MeshNode>)

Start the node through its Arc, enabling the periodic capability re-announce on top of everything Self::start does. The re-announce re-broadcasts this node’s capabilities every MeshNodeConfig::capability_reannounce_interval, keeping its entry alive in its own fold (so its callee-side nRPC gate doesn’t expire its own services) AND in every peer’s fold (so it stays discoverable) past one announcement TTL. Production entry points (the SDK, the FFI) call this; a bare Self::start omits the re-announce (fine for short-lived / test nodes). Idempotent.

Source

pub fn spawn_nat_classify_loop(self: &Arc<MeshNode>) -> JoinHandle<()>

Spawn the NAT classification loop. Waits until at least 2 peers are connected, fires the initial sweep, then re-checks periodically so a mid-session NAT rebind (e.g. gateway reboot) gets picked up without operator intervention.

Separate from Self::start because the loop needs an Arc<MeshNode> to call Self::reclassify_nat across .await points. Callers that hold a MeshNode behind an Arc (SDK, FFI, all production paths) can spawn this alongside start to get continuous classification; callers that don’t can call Self::reclassify_nat manually via the &self surface.

The loop is best-effort — it never returns an error surface to the node, and a failed sweep leaves the previous classification intact. Exits on shutdown_notify.

Source

pub async fn send_to_peer( &self, peer_addr: SocketAddr, batch: &Batch, ) -> Result<(), AdapterError>

Send a batch of events to a specific peer by address.

Source

pub async fn send_routed( &self, dest_node_id: u64, batch: &Batch, ) -> Result<(), AdapterError>

Send a batch of events to a destination node via the routing table.

The events are encrypted with the destination’s session key and a routing header is prepended so intermediate nodes can forward without decrypting. The packet is sent to the next hop from the routing table, not directly to the destination.

Requires:

  • A session with dest_node_id (for encryption)
  • A route to dest_node_id in the routing table (for next hop)
Source

pub fn roster(&self) -> &Arc<SubscriberRoster>

Access the per-channel subscriber roster. Used by ChannelPublisher to enumerate subscribers; exposed for diagnostics.

Source

pub fn register_rpc_inbound( &self, channel_hash: u64, dispatcher: Arc<dyn Fn(RpcInboundEvent) + Send + Sync>, ) -> Option<Arc<dyn Fn(RpcInboundEvent) + Send + Sync>>

Register a per-channel-hash inbound dispatcher for nRPC.

When the mesh’s inbound dispatch sees a packet whose NetHeader::channel_hash matches channel_hash, it routes every event in the packet to dispatcher and skips the per-shard inbound queue. Used by Mesh::serve_rpc / Mesh::call to receive RPC events without polling the shard queue.

Returns the previous dispatcher (if any) so callers can detect a slot collision (typically a programming error — two serve_rpc registrations for the same service on the same node, or a hash collision between two different channel names; the latter is bounded at ~1/65536 per pair).

Hot-path cost. One DashMap get per inbound packet. Absent registrations skip the conditional entirely.

Source

pub fn unregister_rpc_inbound( &self, channel_hash: u64, ) -> Option<Arc<dyn Fn(RpcInboundEvent) + Send + Sync>>

Remove the registered dispatcher for channel_hash. Returns the prior dispatcher if one was registered. After removal, inbound events for channel_hash resume landing in the per-shard inbound queue.

Source

pub fn rpc_inbound_dispatcher_registered(&self, channel_hash: u64) -> bool

Cheap probe: is a dispatcher already registered for this canonical channel hash? Used by the caller-side ensure_reply_subscription to skip a redundant registration when multiple targets serve the same service (they share one reply channel + one dispatcher per caller).

Source

pub fn tool_registry(&self) -> &Arc<ToolMetadataRegistry>

Local-only tool-descriptor registry. SDK-side serve_tool inserts here on registration + removes on Drop; the announce_capabilities_with path reads from it to auto-merge ai-tool:<name> tags, the typed ToolCapability, and the description / streaming / tags metadata keys. Also drives the tool.metadata.fetch RPC handler (A-2b) which answers “what’s the full schema for tool X on this node?”.

Source

pub fn list_tools(&self, matcher: Option<&TagMatcher>) -> Vec<ToolDescriptor>

Walk the capability fold for every ToolCapability carried in a published CapabilitySet, reconstruct a ToolDescriptor per (tool_id, version), and return the deduped list with node_count filled in.

One in-memory pass over the fold; no network. The fold is the source-of-truth for cross-node tool discovery (substrate announce merge in A-2a publishes ToolCapability + schema metadata + the ai-tool:<id> tag on every announce).

matcher is the standard TagMatcher — an entry is included if ANY of its tags match. Pass None to skip pre-filtering. The classic use case is region-scoping the discovery (Some(TagMatcher::Prefix { value: "region.eu".into() })).

Schema hydration: schemas live in CapabilitySet::metadata (too large for tag wire-format); the tag-decoded ToolCapability carries input_schema = None / output_schema = None until this method fills them from the membership’s metadata map using the ToolCapability::input_schema_metadata_key / output_schema_metadata_key keys.

Source

pub fn watch_tools( self: &Arc<MeshNode>, matcher: Option<TagMatcher>, interval: Option<Duration>, ) -> ToolListWatch

Subscribe to a stream of ToolListChange events that reflect every dynamic addition / removal / publisher-count change in the local capability fold’s tool view, filtered by matcher (same semantic as Self::list_tools).

The returned ToolListWatch is a futures::Stream<Item = ToolListChange>. The first event fires AFTER the initial snapshot — callers that need the baseline shape should call list_tools first and then start the watch.

Event-driven: the task parks on the capability fold’s change signal (Fold::subscribe_changes) and re-diffs only when the fold actually mutates (a local serve_tool, an inbound peer announcement, an eviction, or a TTL expiry) — no periodic walk on an idle fold. Change-detection latency is bounded by fold-apply latency, not a poll interval.

interval:

  • None — pure event-driven; the task only wakes on a fold change. An idle fold does zero periodic work.
  • Some(d) — event-driven plus a debounce ceiling: a safety-net re-diff fires at least every d even absent a change signal. Use this only if you want a hard upper bound on staleness independent of the signal path.

Lifecycle:

  • Dropping the ToolListWatch stops the task on its next wake (the task observes the closed sender and exits).
  • The watch handle never errors: a dropped fold (impossible while the MeshNode arc is alive) would simply end the stream. Decode-style errors don’t exist here — the underlying walk is in-memory and infallible.
Source

pub fn rpc_metrics_snapshot(&self) -> RpcMetricsSnapshot

Snapshot of caller-side nRPC metrics. Cheap (one DashMap iteration); call on every Prometheus scrape. Format with RpcMetricsSnapshot::prometheus_text.

Source

pub fn set_rpc_observer(&self, observer: Option<Arc<dyn RpcObserver>>)

Install (or clear with None) the caller-side nRPC observer. Replaces any previously-installed observer. Cheap to load on the hot path — the dispatch path checks for an installed observer via one ArcSwap::load and short-circuits when None. Observers run inline on the dispatch task; implementations must be cheap (push into a bounded ring or mpsc, not block).

See cortex::rpc_observer::RpcObserver for the trait shape and the captured event metadata.

Source

pub fn rpc_observer(&self) -> Option<Arc<dyn RpcObserver>>

Hot-path load of the currently-installed nRPC observer, if any. Cheap — one ArcSwap::load. The dispatch path calls this on every completed boundary and short- circuits when None.

Source

pub fn reserve_cancel_token(&self) -> u64

Reserve a fresh cancel token. Pass on a subsequent call via crate::adapter::net::mesh_rpc::CallOptions::cancel_token; later, call Self::cancel from anywhere to abort the in-flight task. Tokens are monotonically-increasing, process-global, never reused. An unused reservation is harmless — the registry only allocates an entry on the first paired register or cancel.

Source

pub fn cancel(&self, token: u64)

Abort the in-flight call associated with token. Idempotent — no-op if the token was never used, the call already resolved, or token == 0 (the “no token” sentinel).

Race-safe: a cancel that arrives BEFORE the call’s register runs (the gap between Self::reserve_cancel_token and call construction) latches a pre-cancel flag on the orphan entry; the subsequent register pre-arms the cancel signal so the call short-circuits to crate::adapter::net::mesh_rpc::RpcError::Cancelled without ever publishing the REQUEST.

Triggers a Drop-on-cancel CANCEL frame on the wire via the call-shape-specific guards (UnaryCallGuard / ClientStreamCallRaw::Drop / DuplexCallRaw::Drop). See crate::adapter::net::cancel_registry for the registry implementation.

Source

pub fn cancel_registry_len(&self) -> usize

Number of in-flight calls currently registered with the cancel registry. Diagnostic — exposed so integration tests can deterministically poll for call setup completion instead of guessing with sleep. Includes orphan cancel-only entries that haven’t aged out yet.

Source

pub fn set_channel_configs(&mut self, configs: Arc<ChannelConfigRegistry>)

Install a ChannelConfigRegistry whose can_subscribe / can_publish rules are consulted for incoming Subscribe messages. Also constructs a SubnetGateway over the same registry + this node’s local_subnet; the gateway is read by MeshNode::gateway() and powers the net gateway operator surface.

When unset (the default), all subscribes are accepted, the subnet-visibility gate is skipped, and gateway() returns None. Full capability/token-based authorization additionally requires a TokenCache — see Self::set_token_cache.

Call this BEFORE Self::start — the dispatch context is captured at start time, so a registry installed after start won’t be visible to the receive loop.

Source

pub fn gateway(&self) -> Option<&Arc<SubnetGateway>>

Read-only handle to this node’s SubnetGateway, or None when no ChannelConfigRegistry has been installed (see Self::set_channel_configs). Operator tooling (net gateway stats|exports) consults this to read the forwarded/dropped counters and export table.

Source

pub fn channel_configs(&self) -> Option<&Arc<ChannelConfigRegistry>>

Read-only handle to this node’s installed ChannelConfigRegistry, or None when no registry has been installed via Self::set_channel_configs. Operator tooling (net channel ls|visibility) reads this to enumerate configured channels.

Source

pub fn set_aggregator_registry(&mut self, registry: Arc<AggregatorRegistry>)

Install a shared AggregatorRegistry on the node. Once installed, operator CLI verbs (net aggregator spawn / ls / scale) and the Deck AGGREGATORS panel can read + mutate live aggregator groups through it. Call this BEFORE Self::start — installing after the receive loop is live races against channel-publish initialization. The debug_assert! makes the constraint observable in tests; release builds carry the doc-comment contract only.

Source

pub fn aggregator_registry(&self) -> Option<&Arc<AggregatorRegistry>>

Read-only handle to this node’s installed AggregatorRegistry, or None when no registry has been installed via Self::set_aggregator_registry. Nodes that don’t run aggregators leave this empty — callers should treat the None case as “no aggregators registered” and skip rendering / acting.

Source

pub fn set_token_cache(&mut self, cache: Arc<TokenCache>)

Install a shared TokenCache used by the channel-auth path. When set, authorize_subscribe and publish_many consult it via ChannelConfig::can_subscribe / can_publish. Subscribers that present a token on the wire have their token installed into this cache (after signature verification) before the ACL check runs.

When unset, require_token channels always reject — without a cache there’s no way to validate presented tokens or find pre-cached ones.

Source

pub async fn subscribe_channel( &self, publisher_node_id: u64, channel: ChannelName, ) -> Result<(), AdapterError>

Ask publisher_node_id to add this node to channel’s subscriber set.

Blocks until the publisher’s Ack arrives or membership_ack_timeout elapses. Returns Ok(()) iff the publisher accepted the subscribe; AckReason failures surface as AdapterError::Connection. No token is presented — use Self::subscribe_channel_with_token for channels with require_token set. Mode defaults to Broadcast (every published event delivered to this subscriber); use Self::subscribe_channel_in_queue_group for work-distribution.

Source

pub async fn subscribe_channel_with_token( &self, publisher_node_id: u64, channel: ChannelName, token: PermissionToken, ) -> Result<(), AdapterError>

Subscribe with a single pre-issued PermissionToken — the common case where the channel owner granted the subscriber directly (a one-link chain). For a delegated credential (owner → … → subscriber) use Self::subscribe_channel_with_chain.

The publisher verifies the chain roots at one of the channel’s token_roots and binds to the subscriber’s entity before admitting the subscribe.

Source

pub async fn subscribe_channel_with_chain( &self, publisher_node_id: u64, channel: ChannelName, chain: TokenChain, ) -> Result<(), AdapterError>

Subscribe presenting a full delegation TokenChain (root-to-leaf). Use when the subscriber’s grant was delegated rather than issued directly by the channel owner: the chain’s root link must be signed by one of the channel’s token_roots and the leaf must be bound to this node’s entity. See TokenChain::verify_authorizes for the full contract the publisher applies.

Source

pub fn set_publish_chain(&self, channel: &ChannelName, chain: TokenChain)

Install this node’s own PUBLISH credential for channel as a (possibly delegated, multi-link) TokenChain.

Use this when the node’s right to publish was delegated (owner → … → this node) rather than granted directly by the channel owner: the publish-side ACL builds a single-link chain from the local TokenCache, whose issuer is the delegator (not a channel root), so a delegated grant would otherwise fail the root-anchor check and the node couldn’t publish. The held chain is preferred over the cache fallback on the publish path; its leaf must be bound to this node’s entity. Re-verified against the current clock + revocation on every publish, so an expired or revoked held chain fails closed like any other.

Source

pub async fn subscribe_channel_in_queue_group( &self, publisher_node_id: u64, channel: ChannelName, queue_group: String, ) -> Result<(), AdapterError>

Subscribe in the named queue group: every published event is delivered to exactly ONE member of the group, distributed round-robin across members. The publisher’s roster carries the mode; the same (channel, queue_group) pair across multiple subscribers forms one work-distribution pool. Used by request/response patterns (nRPC) and any one-of-N job-distribution shape.

Source

pub async fn subscribe_channel_in_queue_group_with_token( &self, publisher_node_id: u64, channel: ChannelName, queue_group: String, token: PermissionToken, ) -> Result<(), AdapterError>

Queue-group subscribe with a pre-issued PermissionToken. Same auth flow as Self::subscribe_channel_with_token, queue-group semantics from Self::subscribe_channel_in_queue_group.

Source

pub async fn unsubscribe_channel( &self, publisher_node_id: u64, channel: ChannelName, ) -> Result<(), AdapterError>

Ask publisher_node_id to remove this node from channel’s subscriber set. Mirror of subscribe_channel. Mode-agnostic — unsubscribe finds the peer in whichever mode they’re in.

Source

pub fn channel_publisher( &self, channel: ChannelName, config: PublishConfig, ) -> ChannelPublisher

Build a ChannelPublisher recipe. Does NOT talk to the wire — combine with publish or publish_many to actually fan out.

Source

pub async fn publish( &self, publisher: &ChannelPublisher, payload: Bytes, ) -> Result<PublishReport, AdapterError>

Fan payload out to every subscriber of the publisher’s channel.

One per-peer unicast per subscriber — no multicast primitive, no group crypto. Per-peer concurrency is bounded by PublishConfig::max_inflight. The failure policy controls whether per-peer errors short-circuit the fan-out (see OnFailure).

Source

pub async fn publish_many( &self, publisher: &ChannelPublisher, events: &[Bytes], ) -> Result<PublishReport, AdapterError>

Fan multiple payloads out to every subscriber of the publisher’s channel. Semantics are the same as publish; the whole events slice is delivered as one batch per subscriber.

Source

pub async fn publish_fold_to_peer<P>( &self, peer_addr: SocketAddr, ann: &SignedAnnouncement<P>, ) -> Result<usize, AdapterError>

Encode a super::behavior::fold::SignedAnnouncement and send it to one peer as a super::behavior::fold::SUBPROTOCOL_FOLD frame. The receiver’s dispatch_packet arm decodes + verifies + routes to the right typed super::behavior::fold::Fold<K> via the installed super::behavior::fold::FoldChannelRouter.

Returns the encoded byte count for metrics / diagnostics.

Source

pub async fn publish_fold_broadcast<P>( &self, ann: &SignedAnnouncement<P>, ) -> Result<usize, AdapterError>

Best-effort fan-out of a fold announcement to every currently-connected peer. Per-peer send failures are logged and skipped rather than short-circuiting the rest of the fan-out. The N peer sends run concurrently via futures::future::join_all so a chatty publisher (e.g. ReservationFold against a busy resource) doesn’t block its own task on serial encryption + UDP-send latency.

Returns the number of peers the announcement was successfully shipped to.

Source

pub async fn publish_capability_membership( &self, membership: CapabilityMembership, ) -> Result<usize, AdapterError>

Publish a super::behavior::fold::CapabilityFold membership announcement against this node’s identity. The counter is sharded per-class_hash so concurrent publishes to different classes don’t contend.

Source

pub async fn publish_route( &self, destination: u64, next_hop: SocketAddr, metric: u32, ) -> Result<usize, AdapterError>

Publish a super::behavior::fold::RoutingFold route announcement. The fold doesn’t shard by class, so both the counter and the envelope use class 0. via is auto-stamped to this node’s node_id.

Source

pub async fn publish_reservation( &self, resource_id: u64, state: ReservationState, ) -> Result<usize, AdapterError>

Publish a super::behavior::fold::ReservationFold state transition for resource_id. Generation is per-resource (counter sharded by resource_id) so concurrent transitions against different resources don’t contend; the envelope’s class field is unused for this fold and stays 0.

Source

pub async fn send_subprotocol( &self, peer_addr: SocketAddr, subprotocol_id: u16, payload: &[u8], ) -> Result<(), AdapterError>

Send a raw subprotocol message to a peer.

The payload is sent as a single event frame with the specified subprotocol_id set in the Net header (included in AEAD AAD).

Source

pub async fn announce_capabilities( &self, caps: CapabilitySet, ) -> Result<(), AdapterError>

Announce this node’s capabilities to every directly-connected peer. Also self-indexes so single-node find_nodes_by_filter queries return us too.

TTL defaults to 5 minutes. Unsigned (signatures tie in with Stage E channel auth). For explicit control over TTL or signing, see Self::announce_capabilities_with.

Source

pub async fn announce_capabilities_with( &self, caps: CapabilitySet, ttl: Duration, sign: bool, ) -> Result<(), AdapterError>

Extended announce with explicit TTL and signing opt-in.

sign = true signs the announcement with the node’s EntityKeypair so receivers can validate end-to-end. sign = false broadcasts unsigned — useful in trusted environments where the wire signature adds no value. Receivers with require_signed_capabilities = true drop unsigned announcements regardless.

Source

pub async fn announce_chain( &self, origin_hash: u64, tip_seq: u64, ) -> Result<(), AdapterError>

Advertise that this node holds the causal chain identified by origin_hash up to and including tip_seq. Emits the causal:<hex>:<tip_seq> reserved tag and re-broadcasts the node’s capability announcement.

Idempotent on the chain identity — every prior tip / range advertisement for the same origin_hash is replaced. The most recent call wins.

Capability Phase B per CAPABILITY_SYSTEM_PLAN.md §B and a hard prerequisite for REDEX_DISTRIBUTED_PLAN.md Phase C/D/E.

Source

pub async fn announce_chain_range( &self, origin_hash: u64, start_seq: u64, end_seq: u64, ) -> Result<(), AdapterError>

Advertise that this node holds the half-open range [start_seq, end_seq) of the chain identified by origin_hash. Emits the causal:<hex>[<start>..<end>] reserved tag and re-broadcasts.

start_seq >= end_seq is a no-op — a degenerate range would advertise nothing meaningful. Idempotent on the chain identity; the most recent call replaces every prior tip / range / presence form for the same origin_hash.

Source

pub async fn withdraw_chain(&self, origin_hash: u64) -> Result<(), AdapterError>

Withdraw every causal:<hex>* advertisement for origin_hash and re-broadcast. Idempotent — repeated calls converge to the same view (the chain tag absent from the announced set).

Source

pub async fn announce_heat( &self, origin_hash: u64, rate: f64, ) -> Result<(), AdapterError>

Annotate the local capability set with a heat:<hex>=<rate> reserved tag for origin_hash and re-broadcast. Replaces any prior heat tag for the same chain — the most recent call wins.

rate is clamped to [0.0, 1.0] by the substrate’s CapabilitySet::heat_level builder; the wire emits the clamped value with two decimal places. Data-gravity callers normalize their unbounded read-rate to that range before calling.

Rebel Yell Phase 4. See docs/misc/DATAFORTS_PLAN.md § Phase 4.

Source

pub async fn withdraw_heat(&self, origin_hash: u64) -> Result<(), AdapterError>

Withdraw every heat:<hex>=* tag for origin_hash and re-broadcast. Peers drop the heat annotation; the chain’s causal: advertisements are untouched.

Source

pub async fn announce_heat_batch( &self, updates: &[(u64, Option<f64>)], ) -> Result<(), AdapterError>

Apply a batch of heat emissions in a single announce_capabilities round-trip. Each update is one of:

  • Some(rate): replace this chain’s heat: tag with heat:<hex>=<clamped rate>.
  • None: withdraw every heat: annotation for this chain.

Used by gravity_tick to coalesce per-chain emissions. The previous single-shot per-chain announce_heat loop rebroadcast the full capability set N times per tick — O(n_chains² × n_tags) wire work on a busy node. This batch path mutates the snapshot once and rebroadcasts once.

Non-finite rates skip silently (with a trace log).

Source§

impl MeshNode

Source

pub async fn announce_blob_heat( &self, hash: [u8; 32], rate: f64, ) -> Result<(), AdapterError>

Advertise the heat:blob:<hex64>=<rate> reserved tag for chunk hash. rate is clamped to [0.0, 1.0] to match the chain-heat wire shape. PR-5j-c — operators wire this via the BlobHeatSink impl below; the MeshBlobAdapter::tick_blob_heat loop is the driver.

Source

pub async fn withdraw_blob_heat( &self, hash: [u8; 32], ) -> Result<(), AdapterError>

Withdraw every heat:blob:<hex>=* tag for hash and re-broadcast. Peers drop the blob-heat annotation.

Source

pub async fn announce_blob_heat_batch( &self, updates: &[([u8; 32], Option<f64>)], ) -> Result<(), AdapterError>

Apply a batch of blob-heat emissions in a single announce_capabilities round-trip. Each update is either Some(rate) (replace this hash’s heat:blob: tag) or None (withdraw every blob-heat annotation for the hash). Coalescing matches the chain-heat batch path so a busy tick doesn’t spawn N rebroadcasts.

Source§

impl MeshNode

Source

pub async fn send_overflow_push( self: &Arc<MeshNode>, target_node_id: u64, blob_hash: [u8; 32], size_bytes: u64, ) -> Result<OverflowPushAck, BlobError>

Send an overflow push nudge to target_node_id. The chunk bytes themselves don’t ride this RPC — the nudge tells the receiver to open the chunk channel against its local Redex with replication armed; the existing per-chunk replication runtime pulls the bytes from any holder advertising causal:<hash> (typically this node, since we’re shedding it).

Ok(OverflowPushAck::Accepted) means the receiver accepted + opened the chunk channel. The sender still has to observe the durability watermark (the target’s causal:<hash> advertisement) before deleting the local copy — that’s the safe-delete gate. Per the overflow plan, this two-phase pattern lets a failed open on the receive side (network blip during chunk pull) keep the bytes alive on the sender.

Maps non-Ok results to typed BlobError so the controller’s push_errors counter bumps uniformly. See super::dataforts::blob::overflow::OverflowPushAck for the variant breakdown.

Source

pub fn serve_overflow_push( self: &Arc<MeshNode>, adapter: Arc<MeshBlobAdapter>, ) -> Result<ServeHandle, ServeError>

Register the receive-side overflow-push handler on this node. The handler reads live local caps + the capability index on each request, runs admission, and on Admit opens the chunk channel against adapter via super::dataforts::blob::adapter::BlobAdapter::prefetch.

Returns the super::mesh_rpc::ServeHandle the operator drops to deregister the handler. Multiple calls on the same node would conflict on the service name; the second call returns super::mesh_rpc::ServeError::AlreadyServing.

Source

pub async fn announce_blob_overflow_state( &self, adapter: &MeshBlobAdapter, ) -> Result<(), AdapterError>

Rebroadcast the local capability set with the dataforts.blob.overflow tag set to match adapter’s current overflow_enabled() state. Convenience for operators who flip super::dataforts::blob::MeshBlobAdapter::set_overflow_enabled at runtime: the boolean lives on the adapter, but the capability index reads from this node’s announced caps, so peers only observe the change after the next announce. Without this helper, every toggle path needs a matching announce_capabilities call — easy to forget, and the symptom (sender keeps round-tripping pushes that reject SenderNotOverflowing) is non-obvious.

Snapshots the current user caps, sets / clears the presence tag based on adapter.overflow_enabled(), and announces the updated set. Returns the AdapterError from the inner announce_capabilities if the announce fails (rate-limited, transport down, etc.).

Source§

impl MeshNode

Source

pub fn serve_blob_transfer( self: &Arc<MeshNode>, adapter: Arc<MeshBlobAdapter>, ) -> Arc<BlobTransferEngine>

Install the blob-transfer engine over adapter (FairScheduler transport plan). After this, the node serves on-demand chunk fetches (the SUBPROTOCOL_BLOB_TRANSFER control branch + the transfer-stream data divert route to the engine) and can issue them via Self::transfer_fetch_chunk. Idempotent — re-install replaces the engine.

Returns the installed engine handle — the same one stored on the node — so callers (e.g. the blob.transfers RPC install) can serve introspection over the exact registry that’s doing the fetches.

Source

pub async fn transfer_fetch_chunk( self: &Arc<MeshNode>, holder: u64, hash: [u8; 32], ) -> Result<Bytes, BlobError>

Fetch the chunk addressed by hash from holder over a reliable, scheduled transfer stream — bytes move over the FairScheduler-managed stream transport, not RedEX replication or nRPC. Returns the BLAKE3-verified bytes, or BlobError::NotFound on a holder miss / timeout (so the caller can fail over to another advertised holder).

Source

pub async fn transfer_fetch_chunk_discovered( self: &Arc<MeshNode>, hash: [u8; 32], ) -> Result<Bytes, BlobError>

Fetch the chunk addressed by hash from whichever connected peer holds it, without the caller naming a holder. Probes peers in turn over the transfer transport: a peer that lacks the chunk replies NotFound promptly (so misses are cheap), and the first peer that serves the BLAKE3-verified bytes wins. Returns BlobError::NotFound if no connected peer has it (the caller may then fall back to its own backend or fail the read).

This is the usable “fetch by content hash” entry point that Self::transfer_fetch_chunk (which requires a known holder) can’t be on its own.

Discovery model (deliberate). v1 probes connected peers directly rather than consulting the capability fold for causal:<hash> advertisers. The per-chunk causal:<hex64> tag is a single-datagram advertisement that caps at ~15-20 chunks/node, so it does not scale as a per-chunk holder index — which is exactly why the directory transfer (super::dataforts::dir) pulls from a known source instead. Probing sidesteps the ceiling entirely: the holder’s prompt NotFound is the membership test. A future optimization can use a node-level “serves blobs” capability (one tag, no ceiling) or a dedicated zero-byte probe frame to ORDER / prune candidates and to stay robust against a silent peer; today each probe is bounded by DISCOVERY_PROBE_TIMEOUT so one unresponsive peer can’t stall the whole search.

Source§

impl MeshNode

Source

pub fn set_replication_inbound_router( &self, router: Option<Arc<dyn ReplicationInboundRouter>>, )

Install (or replace) the SUBPROTOCOL_REDEX inbound router. Used by Redex to register a per-node router that owns the per-channel runtime registry; the mesh dispatch hot-path consults this router on every inbound SUBPROTOCOL_REDEX frame.

Passing None un-installs the router — every subsequent inbound replication frame is dropped silently until a new router is installed.

Source

pub fn set_meshdb_inbound_router( &self, router: Option<Arc<dyn MeshDbInboundRouter>>, )

Install (or replace) the SUBPROTOCOL_MESHDB inbound router. The MeshDB transport installs one of these to receive federated-query traffic — both inbound requests (routed to the server-side query handler) and inbound responses (routed to the matching in-flight caller).

Passing None un-installs the router — subsequent inbound MeshDB frames are dropped silently until a new router is installed.

Idempotent — re-installing replaces the previous handle. Hot-path cost is one parking_lot::RwLock read per inbound SUBPROTOCOL_MESHDB frame.

Source

pub fn has_meshdb_inbound_router(&self) -> bool

True iff a MeshDB inbound router is currently installed. Useful for tests and the operator surface to confirm the install landed.

Source

pub fn set_fold_router(&self, router: Option<Arc<dyn FoldChannelRouter>>)

Install (or uninstall) the fold-framework channel router. Absent router = fold packets dropped silently. Idempotent; re-installing replaces the previous handle.

Source

pub fn has_fold_router(&self) -> bool

True iff a fold-framework channel router is currently installed. Useful for tests and the operator surface to confirm the install landed.

Source

pub fn fold_stats(&self) -> Vec<FoldStats>

Aggregated super::behavior::fold::FoldStats for every fold the installed router addresses. The net-mesh fold list CLI command and the Deck FOLDS panel call this once per scrape tick. Returns an empty Vec when no router is installed.

Source

pub fn set_greedy_observer(&self, observer: Option<Arc<dyn GreedyObserver>>)

Install (or uninstall) the greedy-LRU observer. Redex calls this from Redex::enable_greedy_dataforts to wire the inbound dispatch fanout. None uninstalls; subsequent standard-event packets fall through the observer hook untouched.

Idempotent — re-installing replaces the previous handle. Hot-path cost is unchanged when called with the same Arc.

Source

pub fn has_greedy_observer(&self) -> bool

True iff a greedy observer is currently installed. Useful for tests and for the operator surface to confirm the install landed.

Source

pub fn find_chain_holders(&self, origin_hash: u64) -> Vec<u64>

Return every node currently advertising any causal:<hex>* variant for origin_hash. Includes this node when self- indexed. Sorted by ascending RTT from this node where the proximity graph has measurements, with self at the front and unmeasured peers (no recent ping) at the back; among equally-measured peers, ties broken by ascending NodeId.

Reads the capability fold directly; no broadcast.

Source

pub fn find_nodes_by_filter(&self, filter: &CapabilityFilter) -> Vec<u64>

Query the capability fold. Returns node ids (including our own node_id) whose latest announcement matches filter. Routes through capability_bridge::find_nodes_matching so post-query predicates (memory, vram, GPU) execute in-memory alongside the fold’s tag-based intersection.

Source

pub fn find_nodes_by_filter_scoped( &self, filter: &CapabilityFilter, scope: &ScopeFilter<'_>, ) -> Vec<u64>

Scoped variant of Self::find_nodes_by_filter. Filters candidates through scope (derived from each peer’s scope:* reserved tags) on top of the capability filter. SubnetLocal peers and the ScopeFilter::SameSubnet filter resolve same-subnet membership against peer_subnets.

Warm-up rule. When a peer’s subnet is unknown:

  • With a local_subnet_policy, the candidate is admitted (a fresh peer’s announcement may not have landed yet — the policy will resolve it on receipt).
  • Without a local_subnet_policy, peer_subnets stays permanently empty (the dispatch handler only writes it when a policy is installed), so “unknown” means “will never resolve” — admitting unknowns there leaks every peer through SameSubnet. The candidate is excluded.
Source

pub fn peer_reflex_addr(&self, peer_node_id: u64) -> Option<SocketAddr>

Read a peer’s most recently advertised public reflex SocketAddr from the capability index. None before the peer has sent a stage-2 announcement, or when the peer was compiled without nat-traversal.

Stage 3 (rendezvous) reads this field to resolve the punch target’s public address. Exposed for observability and for tests that want to verify capability-announcement propagation of the reflex field.

Requires the nat-traversal cargo feature.

Source

pub fn peer_nat_class(&self, peer_node_id: u64) -> NatClass

Read a peer’s most recently advertised NAT classification from the capability index. Parses the nat:* tag on the peer’s announcement. Returns NatClass::Unknown when the peer has not indexed (we’ve never received an announcement), or the announcement carried no nat:* tag (peer was compiled without nat-traversal, or hasn’t classified yet).

Consumed by the pair-type matrix (plan §3) — connect_direct reads this to decide whether to attempt a punch or short-circuit to the routed path.

Requires the nat-traversal cargo feature.

Source

pub fn traversal_stats(&self) -> TraversalStatsSnapshot

Cumulative traversal counters — punch attempts, successes, and relay fallbacks. Returns a consistent point-in-time snapshot.

See super::traversal::TraversalStatsSnapshot for the field semantics. Counters are monotonic and never reset; callers that want deltas should subtract snapshots.

Requires the nat-traversal cargo feature.

Source

pub async fn connect_direct( &self, peer_node_id: u64, peer_pubkey: &[u8; 32], coordinator: u64, ) -> Result<u64, TraversalError>

Establish a direct session to peer_node_id, using the pair-type matrix (plan §3) to decide between a direct handshake, a rendezvous-coordinated single-shot punch, or a routed-only fallback.

§Flow
  1. Read local + remote NAT classifications (self nat_class() + peer_nat_class). Unknown sides are handled per the matrix — never treated as “don’t attempt” (plan decision 8).
  2. Resolve the peer’s reflex address from the local capability index. Fails with super::traversal::TraversalError::PeerNotReachable if no reflex is cached (peer hasn’t announced yet).
  3. Apply the matrix:
    • Direct → connect via the routing table’s first-hop; coordinator is not consulted and its reachability is irrelevant. relay_fallbacks increments (we didn’t attempt a punch).
    • SkipPunch → connect via coordinator as the relay; symmetric pairs have no better option. Fails with PeerNotReachable if coordinator isn’t a live peer.
    • SinglePunch → ask coordinator to mediate via Self::request_punch. On successful introduction, increment punches_attempted + punches_succeeded and connect to peer_reflex. On failure, increment punches_attempted + relay_fallbacks and fall back to connecting via the coordinator — the plan’s framing treats punch-failed as “optimization missed,” not a connectivity failure.
§Scope note

Stage 3c wires the orchestration + stats end-to-end but always establishes the session via the routed handshake through coordinator — the framing “traffic rides the relay until a direct punch upgrades it” matches the plan’s “optimization, not correctness” contract. Stage 3d lands the keep-alive train + PunchAck round-trip, at which point a successful SinglePunch outcome upgrades to a direct session; failed punches (or matrix-skipped pairs) continue to resolve on the routed path as they do today.

Stats are set on the stage-3c semantics already: punches_attempted increments when the matrix picks SinglePunch and the coordinator mediates; stage 3d refines punches_succeeded / relay_fallbacks against the real keep-alive outcome.

Requires the nat-traversal cargo feature.

Source

pub fn find_best_node(&self, req: &CapabilityRequirement) -> Option<u64>

Rank peers for a scored requirement. Returns the best- scoring node’s id, or None if no peer matches.

Phase 3b note: scoring runs against a tag-only CapabilitySet synthesized from the fold (the fold’s CapabilityMembership doesn’t carry the full legacy hardware/models projection). Hardware- and model-based preference weights (memory, vram, tokens/sec, loaded) read zero, so this method degrades to “any matching candidate, lex-sorted by node_id.” That’s the same shape as the cap-propagation- race fallback; production has no rich-scoring caller per the Phase 3b survey.

Source

pub fn find_best_node_scoped( &self, req: &CapabilityRequirement, scope: &ScopeFilter<'_>, ) -> Option<u64>

Scoped variant of Self::find_best_node. See Self::find_nodes_by_filter_scoped for the scope resolution semantics; selection picks the highest-scoring candidate within the scoped set.

Phase 3b note: same scoring caveat as Self::find_best_node — the fold’s CapabilityMembership doesn’t carry the legacy hardware/models projection, so scoring degrades to “any matching candidate, lex-sorted.”

Source

pub fn capability_fold(&self) -> &Arc<Fold<CapabilityFold>>

Shared reference to the capability fold — the canonical capability-state surface. Used by operator dashboards, the rendezvous coordinator (reflex lookup), and the dataforts-blob overflow handler.

Source

pub fn reservation_fold(&self) -> &Arc<Fold<ReservationFold>>

Shared reference to the reservation fold. Always present (allocated at construction even when the node never publishes a reservation) so the aggregator surface + future scheduler callers don’t have to discriminate on presence.

Source

pub fn get_node_by_origin_hash(&self, origin_hash: u64) -> Option<u64>

Resolve a wire origin_hash to its publisher’s node_id, or None when no publisher has claimed it yet. Post- WIRE_ORIGIN_HASH_64BIT the wire hash is the full EntityId::origin_hash() u64; accidental collisions are 2^-32 (effectively impossible). The map is populated first-write-wins, so an adversary grinding a colliding keypair (~2^32 work) cannot displace an established claimant.

Source

pub fn open_stream( &self, peer_node_id: u64, stream_id: u64, config: StreamConfig, ) -> Result<Stream, AdapterError>

Open (or look up) a logical stream to a connected peer.

A stream is one ordered, independently reliability-configured channel inside the encrypted session to peer_node_id. Multiple streams share one session, one cipher, and one UDP socket, but have independent sequence numbers and reliability state. See Stream for the full contract.

Idempotent: repeated calls for the same (peer_node_id, stream_id) return handles backed by the same underlying state; a config argument that differs from the first call’s is ignored with a warning log. Close + re-open to change a stream’s config.

Source

pub fn close_stream(&self, peer_node_id: u64, stream_id: u64)

Close a stream: drop its StreamState from the session, ending delivery of any buffered inbound events for the stream and dropping outbound packets that haven’t hit the wire yet. Idempotent. CloseBehavior::DrainThenClose is honored only to the extent the router’s scheduler has already flushed; there is no wire “drain-then-close” signal in v1.

Source

pub async fn close_stream_graceful( &self, peer_node_id: u64, stream_id: u64, timeout: Duration, )

Close a reliable stream gracefully (H-7, DrainThenClose): wait until the reliability layer has no unacked packets — i.e. the receiver has acked everything (with H-9 ack-pruning, pending empties as grants arrive) — or timeout elapses, then close. Use after the last bytes of a reliable send so retransmit can still fill gaps before teardown; closing eagerly (close_stream) drops the retransmit window and can strand a lost tail packet on a lossy link. A fire-and-forget stream (nothing tracked) drains instantly.

Source

pub async fn send_on_stream( &self, stream: &Stream, events: &[Bytes], ) -> Result<(), StreamError>

Send a batch of events on an explicit stream.

Uses the stream’s reliability mode from its original open_stream config. Returns Backpressure when the stream’s in-flight count (tx_inflight) would exceed its configured tx_window; the event was not enqueued — the caller decides what to do (drop, retry, or buffer at the app layer). tx_window == 0 disables the check and preserves pre-backpressure behavior. Transport is returned for underlying socket send failures.

Returns NotConnected when the stream was never opened or has been closed since (close_stream, idle eviction, cap-exceeded LRU). A previously-closed Stream handle is inert by design — reusing it does NOT silently re-create the stream with default config; the caller must explicitly re-open.

Source

pub async fn send_with_retry( &self, stream: &Stream, events: &[Bytes], max_retries: usize, ) -> Result<(), StreamError>

Send events on stream, retrying on Backpressure with exponential backoff (5 ms → 200 ms, doubling) up to max_retries times. Transport failures are returned immediately — they’re a real error, not a pressure signal, and retrying would just mask them. Returns the final Backpressure error if the stream stays saturated across every attempt.

Source

pub async fn send_blocking( &self, stream: &Stream, events: &[Bytes], ) -> Result<(), StreamError>

Convenience wrapper around send_with_retry with a generous retry count. Blocks the calling task until the send succeeds or a transport error occurs. Use when you’d rather wait than drop; prefer send_with_retry if you need a concrete upper bound on retry attempts.

Source

pub fn stream_stats( &self, peer_node_id: u64, stream_id: u64, ) -> Option<StreamStats>

Snapshot of per-stream stats for a single stream.

Returns None if either the peer or the stream doesn’t exist.

Source

pub fn all_stream_stats(&self, peer_node_id: u64) -> Vec<(u64, StreamStats)>

Snapshot of per-stream stats for every stream in the session to peer_node_id. Empty vec if the peer doesn’t exist.

Source

pub async fn connect_via( &self, relay_addr: SocketAddr, dest_pubkey: &[u8; 32], dest_node_id: u64, ) -> Result<u64, AdapterError>

Connect to dest_node_id via a routed handshake through relay_addr. Unlike Self::connect (which requires the responder to pre-accept() this initiator’s node_id before start()), this path embeds the initiator’s full node_id inside the Noise msg1 payload and routes the packet through the dispatch loop’s handle_routed_handshake Case 2 — so a daemon that’s already start()ed accepts msg1 from a brand- new initiator without prior coordination.

When relay_addr == final destination (the CLI remote-attach case), the routed path is degenerate one-hop. The post- handshake peer install populates addr_to_node[relay_addr] via entry().or_insert(...) so address-keyed sends resolve the peer; in a true multi-hop scenario the relay’s prior mapping stays intact.

Retries up to MeshNodeConfig::handshake_retries (default 3) — a single UDP drop on msg1 or msg2 should not surface as a typed error.

Source

pub async fn connect_routed( &self, dest_pubkey: &[u8; 32], dest_node_id: u64, ) -> Result<u64, AdapterError>

Connect to a peer by node id, using the routing table to pick the first hop. Fails with Connection("no route to ...") if the routing table doesn’t have a route to the destination yet — in which case the caller can retry once pingwaves have propagated.

Source

pub async fn probe_reflex( &self, peer_node_id: u64, ) -> Result<SocketAddr, TraversalError>

Send one reflex probe to peer_node_id and return the public SocketAddr the peer observed on the probe’s UDP envelope.

Waits up to super::traversal::TraversalConfig::reflex_timeout (default 3 s) for the response. Fails with super::traversal::TraversalError::ReflexTimeout on timeout, super::traversal::TraversalError::PeerNotReachable if the peer has no active session, or super::traversal::TraversalError::Transport on a socket-level send failure.

Requires the nat-traversal cargo feature.

Source

pub fn nat_class(&self) -> NatClass

The current NAT classification for this node. Unknown until the classification sweep has run; updated atomically by the sweep and by Self::reclassify_nat. Read-only for external callers — the sweep is the only writer.

Requires the nat-traversal cargo feature.

Source

pub fn reflex_addr(&self) -> Option<SocketAddr>

This node’s public-facing SocketAddr as observed by a remote peer during the classification sweep. None before the first sweep has produced an observation. Exposed primarily for tests + observability; the announce- capabilities path piggybacks this value onto every signed CapabilityAnnouncement.

Requires the nat-traversal cargo feature.

Source

pub fn set_reflex_override(&self, external: SocketAddr)

Install a runtime reflex override. Forces nat_class = Open and reflex_addr = Some(external) immediately, and short-circuits any further classifier sweeps until Self::clear_reflex_override is called.

§Publishing to peers

This method updates only local state. To propagate the change to peers, call Self::announce_capabilities afterward. The setter resets the announce rate-limit floor so the next announce is guaranteed to broadcast rather than coalesce against the previous send — cubic P2 pinned this, after flagging that callers who set an override within min_announce_interval of a prior announce would find peers still seeing the old reflex.

Optimization, not correctness. A node with no override still reaches every peer through the routed-handshake path; the override just pins the publicly-advertised address when it’s already known (port-forwarded server, a successful stage-4 port-mapping install, etc).

Safe to call concurrently with announce_capabilities — the triple-write runs under traversal_publish_mu (alongside the announce’s multi-field read), so a concurrent announce either sees the pre-override state or the fully-installed override, never a torn mix.

Requires the nat-traversal cargo feature.

Source

pub fn clear_reflex_override(&self)

Drop a previously-installed runtime reflex override. The classifier sweep resumes on its normal cadence; the next sweep repopulates reflex_addr and nat_class from real probe observations. reflex_addr is cleared to None immediately so a between-sweep read doesn’t return a stale override value as “still current.”

§Publishing to peers

Mirrors Self::set_reflex_override: only local state changes here. Call Self::announce_capabilities after this to tell peers. The rate-limit floor is reset so that call broadcasts unconditionally.

No-op when no override is active — safe to call unconditionally during shutdown / port-mapping revoke paths.

Requires the nat-traversal cargo feature.

Source

pub async fn request_punch( &self, relay: u64, target: u64, self_reflex: SocketAddr, ) -> Result<PunchIntroduce, TraversalError>

Send a PunchRequest to a coordinator peer relay, asking it to mediate a hole-punch to target. Returns the PunchIntroduce produced by the coordinator (the one arriving on this node’s side of the introduction — carrying target’s reflex and the shared fire_at).

This is a stage-3b primitive: it exercises the coordinator fan-out end-to-end but does not itself schedule the keep-alive train or finalize the punched session. The full connect_direct flow lands in stage 3c.

Fails with:

Requires the nat-traversal cargo feature.

Source

pub async fn await_punch_introduce( &self, counterpart: u64, coordinator: u64, ) -> Result<PunchIntroduce, TraversalError>

Install a waiter for an incoming PunchIntroduce from counterpart, brokered by coordinator. The dispatch arm admits the introduce only when the session peer matches coordinator. The returned future resolves when the dispatcher decodes a matching introduce, or with super::traversal::TraversalError::PunchFailed after super::traversal::TraversalConfig::punch_deadline.

Stage-3b responder-side primitive: the peer being punched into uses this to observe the introduce without initiating the flow itself. The responder must know which coordinator will forward the introduce — supply that node’s id as coordinator. Stage 3c wires the keep-alive train onto the returned introduce.

Requires the nat-traversal cargo feature.

Source

pub async fn await_punch_ack( &self, counterpart: u64, coordinator: u64, ) -> Result<PunchAck, TraversalError>

Install a waiter for an incoming PunchAck whose from_peer matches counterpart. Stage-3d correlation surface — the SinglePunch path in connect_direct registers the waiter before firing request_punch and awaits it afterward. Times out with super::traversal::TraversalError::PunchFailed after super::traversal::TraversalConfig::punch_deadline.

Note: the caller inserts the oneshot sender into pending_punch_acks before issuing the request so the ack can’t arrive and be dropped before the await call is entered.

Requires the nat-traversal cargo feature.

Source

pub async fn reclassify_nat(&self)

Fire the classification sweep. Picks up to two currently- connected peers, runs Self::probe_reflex against each in parallel, feeds the observations to super::traversal::classify::ClassifyFsm, and updates nat_class + reflex_addr with the result.

Runs at most one sweep at a time — a second call while a sweep is in flight is a no-op. Exits early if fewer than 2 peers are currently connected; callers should check Self::nat_class after the returned future completes to see whether classification produced a definite verdict or stayed at Unknown.

Bounded by super::traversal::TraversalConfig::classify_deadline — even if probes hang, the sweep returns within that window with whatever observations arrived.

Requires the nat-traversal cargo feature.

Source§

impl MeshNode

Source

pub fn serve_rpc<H>( self: &Arc<MeshNode>, service: &str, handler: Arc<H>, ) -> Result<ServeHandle, ServeError>
where H: RpcHandler,

Register an nRPC handler for service on this node.

Subscribes this node to <service>.requests (so the local register_rpc_inbound dispatcher feeds inbound REQUEST events into the RpcServerFold) and wires the fold’s RESPONSE-emit callback to publish on <service>.replies.<caller_origin> via the existing pub/sub path.

Local-only registration (Phase 1). Multi-instance services that load-balance via SubscriptionMode::QueueGroup require each replica to call serve_rpc on its own node; the mesh-level subscriber roster + dispatch_recipients then routes one-of-N as designed. Each replica’s local serve_rpc must use the same service name (which becomes the queue-group identifier).

Returns a ServeHandle whose Drop tears down the registration. Concurrent registrations for the same service on one node return Err(ServeError::AlreadyServing).

Source

pub fn serve_rpc_streaming<H>( self: &Arc<MeshNode>, service: &str, handler: Arc<H>, ) -> Result<ServeHandle, ServeError>

Streaming variant of Self::serve_rpc. The handler receives an RpcResponseSink it writes chunks to via sink.send(body); returning Ok(()) closes the stream cleanly, Err(_) closes with an error frame.

Wire-level identical to the unary path apart from the per-chunk nrpc-streaming header markers (continue / end). Same auto-registration of <service>.requests + <service>.replies. prefix.

Source

pub fn serve_rpc_client_stream<H>( self: &Arc<MeshNode>, service: &str, handler: Arc<H>, ) -> Result<ServeHandle, ServeError>

Register a client-streaming nRPC handler for service. Mirror of Self::serve_rpc_streaming but using the request-side fold (RpcStreamingRequestFold) — the handler receives one stream of REQUEST_CHUNK bodies and emits one terminal RESPONSE.

Wires two emit callbacks:

Bidi streaming plan (Phase C).

Source

pub async fn call_client_stream( self: &Arc<MeshNode>, target_node_id: u64, service: &str, opts: CallOptions, ) -> Result<ClientStreamCallRaw, RpcError>

Client-streaming variant of Self::call. Returns a ClientStreamCallRaw handle the caller pushes N items into via send, then finish to await the terminal RESPONSE.

Lazy initial REQUEST. This method does NOT publish a REQUEST to the wire. It only ensures the caller’s reply subscription is set up and registers the pending entry; the initial REQUEST is emitted by the first send (or by finish for the zero-item degenerate path).

Sets FLAG_RPC_CLIENT_STREAMING_REQUEST on the initial REQUEST so the server’s request-streaming fold knows to open a request-side stream. Optional request_window_initial header opts into upload-direction flow control.

Bidi streaming plan (Phase C).

Source

pub fn serve_rpc_duplex<H>( self: &Arc<MeshNode>, service: &str, handler: Arc<H>, ) -> Result<ServeHandle, ServeError>

Register a duplex nRPC handler for service. Composes Self::serve_rpc_client_stream (request-side stream) with Self::serve_rpc_streaming (response-side multi- fire emit) via RpcDuplexFold.

Wires THREE emit callbacks:

  • Async RpcAsyncResponseEmitter for response chunks + the terminal frame (per-call ordering required because the response side is multi-fire).
  • RpcRequestGrantEmitter for upload-direction credit grants (one per consumed request chunk when flow control is opted into).

Bidi streaming plan (Phase D).

Source

pub async fn call_duplex( self: &Arc<MeshNode>, target_node_id: u64, service: &str, opts: CallOptions, ) -> Result<DuplexCallRaw, RpcError>

Duplex variant of Self::call. Returns a DuplexCallRaw handle with both upload (send, finish_sending) and download (next, or impl futures::Stream) surfaces. Use into_split to peel off the two halves for the “encoder task + decoder task” shape.

Initial REQUEST sets BOTH FLAG_RPC_CLIENT_STREAMING_REQUEST AND FLAG_RPC_STREAMING_RESPONSE. Lazy publish — the initial REQUEST flies on the first send (or on finish_sending for the zero-item degenerate path).

Bidi streaming plan (Phase D).

Source

pub async fn call_streaming( self: &Arc<MeshNode>, target_node_id: u64, service: &str, payload: Bytes, opts: CallOptions, ) -> Result<RpcStream, RpcError>

Streaming variant of Self::call. Returns an RpcStream that yields chunks (as Result<Bytes, RpcError>) until the server closes the stream.

Sets FLAG_RPC_STREAMING_RESPONSE on the request so the server’s streaming fold knows to expect multi-fire emits. Same lazy reply-subscription + direct-unicast REQUEST as the unary call path.

Cancellation: dropping the returned RpcStream emits a CANCEL to the server (best-effort) and discards any in-flight chunks.

Source

pub fn find_service_nodes(&self, service: &str) -> Vec<u64>

Find every node currently advertising service via the nrpc:<service> capability tag. Returns node IDs in roster order; the caller picks one (or use Self::call_service for the round-robin shortcut).

Pre-Phase 2: requires the target nodes to have called serve_rpc AND announce_capabilities so the nrpc:<service> tag has propagated through capability announcements. The local node’s own services are NOT automatically included (callers don’t typically invoke themselves via the network — for in-process invocation, the user has the handler directly).

Source

pub async fn call_service( self: &Arc<MeshNode>, service: &str, payload: Bytes, opts: CallOptions, ) -> Result<RpcReply, RpcError>

Issue an RPC call to service, picking one node from those advertising the nrpc:<service> tag in the local capability index according to opts.routing_policy.

Returns RpcError::NoRoute if no nodes advertise the service (or if opts.filter_unhealthy is set and every candidate is unavailable per the local ProximityGraph).

Source

pub async fn call_service_streaming( self: &Arc<MeshNode>, service: &str, payload: Bytes, opts: CallOptions, ) -> Result<RpcStream, RpcError>

Capability-routed server-streaming call. Same routing as call_service — capability-fold lookup, health filter, routing-policy sort, capability-auth gate, target selection — but the terminal step is call_streaming instead of call. Returns the substrate’s RpcStream so callers can drive an async for chunk in stream: loop.

Use cases: an agent invoking a long-running tool that emits progress + a terminal result, a fan-out subscriber that wants streaming chunks from whatever node currently advertises the service, any consumer that today reaches for find_service_nodes → manual target selection → call_streaming and ends up re-implementing the cap-auth gate call_service already enforces.

Honors CallOptions::cancel_token (v3) and CallOptions::deadline exactly like call_streaming.

Source

pub async fn call( self: &Arc<MeshNode>, target_node_id: u64, service: &str, payload: Bytes, opts: CallOptions, ) -> Result<RpcReply, RpcError>

Issue an RPC call to target_node_id for service.

Phase 1 — direct entity-to-entity addressing. The caller specifies which target to send to; service discovery (the “find me a healthy instance of X” lookup) is Phase 2.

Lazily subscribes the local node’s RpcClientFold to <service>.replies.<self_origin> from target_node_id on the first call to that (target, service) pair. The subscription is reused across subsequent calls.

On opts.deadline expiring OR the future being dropped, emits a CANCEL event so the server can drop the in-flight handler.

Trait Implementations§

Source§

impl Adapter for MeshNode

Source§

fn init<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, MeshNode: 'async_trait,

Initialize the adapter. Read more
Source§

fn on_batch<'life0, 'async_trait>( &'life0 self, batch: Arc<Batch>, ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, MeshNode: 'async_trait,

Process a batch of events. Read more
Source§

fn flush<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, MeshNode: 'async_trait,

Force flush any buffered data. Read more
Source§

fn shutdown<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, MeshNode: 'async_trait,

Gracefully shut down the adapter. Read more
Source§

fn poll_shard<'life0, 'life1, 'async_trait>( &'life0 self, shard_id: u16, from_id: Option<&'life1 str>, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ShardPollResult, AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, MeshNode: 'async_trait,

Poll events from a single shard. Read more
Source§

fn name(&self) -> &'static str

Get the adapter name (for logging/metrics).
Source§

fn is_healthy<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = bool> + Send + 'async_trait>>
where 'life0: 'async_trait, MeshNode: 'async_trait,

Check if the adapter is healthy. Read more
Source§

impl BlobHeatSink for MeshNode

Source§

fn announce_blob_heat<'life0, 'async_trait>( &'life0 self, hash: [u8; 32], rate: f64, ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, MeshNode: 'async_trait,

Emit (or replace) the heat:blob:<hex>=<rate> reserved tag for chunk hash. Idempotent — most recent call wins.
Source§

fn withdraw_blob_heat<'life0, 'async_trait>( &'life0 self, hash: [u8; 32], ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, MeshNode: 'async_trait,

Withdraw every heat:blob:<hex>=* tag for chunk hash. Idempotent; mirrors HeatSink::withdraw_heat.
Source§

fn announce_blob_heat_batch<'life0, 'life1, 'async_trait>( &'life0 self, updates: &'life1 [([u8; 32], Option<f64>)], ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, MeshNode: 'async_trait,

Batched form for coalescing a tick’s worth of emissions into one capability rebroadcast. Default impl falls back to the per-hash methods; production impls override to hold the rebroadcast until every update has landed.
Source§

impl ChainTagSink for MeshNode

Substrate impl: route through MeshNode::announce_chain / MeshNode::withdraw_chain. This is the production sink the ReplicationCoordinator uses when a real MeshNode is wired in.

Source§

fn announce_chain<'life0, 'async_trait>( &'life0 self, origin_hash: u64, tip_seq: u64, ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, MeshNode: 'async_trait,

Advertise this node holds origin_hash up to tip_seq. Idempotent — repeated calls with the same origin_hash replace the prior advertisement.
Source§

fn withdraw_chain<'life0, 'async_trait>( &'life0 self, origin_hash: u64, ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, MeshNode: 'async_trait,

Withdraw every advertisement for origin_hash. Idempotent.
Source§

impl Drop for MeshNode

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

fn pin_drop(self: Pin<&mut Self>)

🔬This is a nightly-only experimental API. (pin_ergonomics)
Execute the destructor for this type, but different to Drop::drop, it requires self to be pinned. Read more
Source§

impl HeatSink for MeshNode

Source§

fn announce_heat<'life0, 'async_trait>( &'life0 self, origin_hash: u64, rate: f64, ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, MeshNode: 'async_trait,

Emit (or replace) the heat:<hex>=<rate> reserved tag for origin_hash. Idempotent — the most recent call wins. Rate clamping (substrate enforces [0.0, 1.0]) happens inside the impl.
Source§

fn withdraw_heat<'life0, 'async_trait>( &'life0 self, origin_hash: u64, ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, MeshNode: 'async_trait,

Withdraw every heat:<hex>=* tag for origin_hash. Idempotent. Peers drop the heat annotation; the chain’s causal: advertisements stay.
Source§

fn announce_heat_batch<'life0, 'life1, 'async_trait>( &'life0 self, updates: &'life1 [(u64, Option<f64>)], ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, MeshNode: 'async_trait,

Apply a batch of (origin_hash, Option<rate>) updates in a single round-trip. Some(rate) is emit/replace; None is withdraw. The gravity tick uses this to coalesce per-chain emissions into one capability rebroadcast — without it the per-tick wire cost was O(n_chains × n_tags). Read more
Source§

impl ReplicationDispatcher for MeshNode

Source§

fn send_heartbeat<'life0, 'async_trait>( &'life0 self, target: u64, msg: SyncHeartbeat, ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, MeshNode: 'async_trait,

Send a SyncHeartbeat to target. See trait-level note on Ok(()) semantics.
Source§

fn send_sync_request<'life0, 'async_trait>( &'life0 self, target: u64, msg: SyncRequest, ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, MeshNode: 'async_trait,

Send a SyncRequest to target (typically a leader). See trait-level note on Ok(()) semantics.
Source§

fn send_sync_response<'life0, 'async_trait>( &'life0 self, target: u64, msg: SyncResponse, ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, MeshNode: 'async_trait,

Send a SyncResponse to target (typically a replica catching up). See trait-level note on Ok(()) semantics.
Source§

fn send_sync_nack<'life0, 'async_trait>( &'life0 self, target: u64, msg: SyncNack, ) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>
where 'life0: 'async_trait, MeshNode: 'async_trait,

Send a SyncNack to target. See trait-level note on Ok(()) semantics.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more