pub struct MeshNode { /* private fields */ }Expand description
Multi-peer mesh node.
Composes NetSession (per-peer encryption), NetRouter (forwarding),
and FailureDetector (heartbeat monitoring) behind a single UDP socket.
Implementations§
Source§impl MeshNode
impl MeshNode
Sourcepub fn public_key(&self) -> &[u8; 32]
pub fn public_key(&self) -> &[u8; 32]
Get the Noise static public key (for peers to connect to this node).
Sourcepub fn is_shutdown(&self) -> bool
pub fn is_shutdown(&self) -> bool
Whether Self::shutdown has been invoked on this node.
Exposed for tests and for FFI callers that want to verify a
shutdown actually landed (rather than being a silent no-op
because extra Arc references were outstanding, as an earlier
net_mesh_shutdown variant did).
Sourcepub async fn new(
identity: EntityKeypair,
config: MeshNodeConfig,
) -> Result<Self, AdapterError>
pub async fn new( identity: EntityKeypair, config: MeshNodeConfig, ) -> Result<Self, AdapterError>
Create a new mesh node.
Binds a UDP socket but does not connect to any peers yet.
Call connect() to establish sessions with peers, then
start() to begin the receive loop.
Sourcepub fn auth_guard(&self) -> &Arc<AuthGuard> ⓘ
pub fn auth_guard(&self) -> &Arc<AuthGuard> ⓘ
The per-packet authorization fast path. Writes land here on
successful subscribe (via AuthGuard::allow_channel) and
reads happen on every publish fan-out. Exposed primarily for
tests + operator observability; production code should reach
for register_channel / subscribe_channel instead.
Sourcepub fn token_cache(&self) -> Option<&Arc<TokenCache>>
pub fn token_cache(&self) -> Option<&Arc<TokenCache>>
The shared TokenCache installed on this node, if any. Only
populated when a caller registered one via
Self::set_token_cache. Exposed for tests that need to
assert the cache is not populated as a side effect of a
rejected subscribe.
Sourcepub fn subscriber_chain_count(&self) -> usize
pub fn subscriber_chain_count(&self) -> usize
Number of retained subscribe token chains. A verified chain is stored here only after a subscribe passes the root-anchored gate; the sweep and publish re-check consult it. Exposed for tests that assert a rejected subscribe retains nothing.
Sourcepub fn entity_id(&self) -> &EntityId
pub fn entity_id(&self) -> &EntityId
Get this node’s ed25519 entity id (derived from the
keypair handed to MeshNode::new). 32 bytes. Used by
CapabilityAnnouncement + channel-auth path.
Sourcepub fn peer_entity_id(&self, node_id: u64) -> Option<EntityId>
pub fn peer_entity_id(&self, node_id: u64) -> Option<EntityId>
Look up a peer’s pinned entity_id, if the TOFU binding
has been established. Returns None before we’ve received
a signature-verified CapabilityAnnouncement from the peer.
Exposed primarily for tests + operator observability; the
channel-auth subscribe gate consults this map internally.
Sourcepub fn peer_addr(&self, node_id: u64) -> Option<SocketAddr>
pub fn peer_addr(&self, node_id: u64) -> Option<SocketAddr>
The peer’s socket address, if we have an active session
with them. Used by the migration subprotocol to route
orchestrator-originated messages (e.g. TakeSnapshot) to
the source node by its node_id.
Sourcepub fn migration_identity_context(&self) -> MigrationIdentityContext
pub fn migration_identity_context(&self) -> MigrationIdentityContext
Build a MigrationIdentityContext
bound to this node.
The context’s closures capture this node’s long-term Noise
static private key (for the envelope-open path) and an
Arc-clone of the peer map (for the peer-static lookup used
by the source-side seal path). The private key is wrapped in
a StaticSecret inside the unseal_snapshot closure, which
is zeroize-on-drop — the key is never surfaced as a
readable field on the returned value.
Used by the SDK’s compute runtime to wire identity-envelope
support into the migration dispatcher without handing the
key across the crate boundary. The previous shape exposed
static_x25519_priv() -> [u8; 32] as a pub method, which
leaked long-term secret material to any SDK caller — any
code with an Arc<Mesh> could copy the node’s identity key
out and impersonate it indefinitely.
Sourcepub fn peer_static_x25519(&self, node_id: u64) -> Option<[u8; 32]>
pub fn peer_static_x25519(&self, node_id: u64) -> Option<[u8; 32]>
The peer’s Noise static X25519 public key, captured during
the handshake that established the session. Load-bearing for
daemon migration: the source uses this key as the seal
recipient on the IdentityEnvelope, so the only party that
can unseal the daemon’s ed25519 seed is the peer whose
static private key completed the Noise handshake.
Returns None if we have no session with node_id, or if
the underlying handshake produced a zero-filled static
pubkey (a sentinel for test-only code paths that construct
SessionKeys without running a real handshake).
Sourcepub fn peer_subnet(&self, node_id: u64) -> Option<SubnetId>
pub fn peer_subnet(&self, node_id: u64) -> Option<SubnetId>
Look up a peer’s assigned subnet, if one has been recorded.
Only populated from signature-verified
CapabilityAnnouncements — unsigned announcements do not
write here even when a node is running with
require_signed_capabilities = false. Exposed for tests +
operator observability; subnet_visible consults this map
on the publish / subscribe fan-out path.
Sourcepub fn local_subnet(&self) -> SubnetId
pub fn local_subnet(&self) -> SubnetId
This node’s own SubnetId — the value supplied via
MeshNodeConfig::subnet (or SubnetId::GLOBAL when none was
configured). Stable for the node’s lifetime; the substrate
doesn’t reassign the local subnet at runtime.
Sourcepub fn local_subnet_policy(&self) -> Option<&Arc<SubnetPolicy>>
pub fn local_subnet_policy(&self) -> Option<&Arc<SubnetPolicy>>
Read-only handle to the SubnetPolicy that derived this
node’s local_subnet, when one was supplied. None when
the local subnet came from MeshNodeConfig::subnet
directly without going through a policy. Operator tools
surface this to explain “why is this node in subnet X.”
Sourcepub fn known_subnets(&self) -> Vec<(u64, SubnetId)>
pub fn known_subnets(&self) -> Vec<(u64, SubnetId)>
Snapshot of every (node_id, subnet_id) pair the local
node has cached from signature-verified capability
announcements. Sorted by node_id for stable output. Used
by operator tooling (net subnet ls / subnet tree) to
enumerate the mesh’s subnet topology from a single
vantage point — anything not yet announced (or announced
unsigned) is invisible here, matching peer_subnet’s
“signed-only” contract.
Sourcepub fn local_addr(&self) -> SocketAddr
pub fn local_addr(&self) -> SocketAddr
Get the local bind address.
Sourcepub fn failure_detector(&self) -> &Arc<FailureDetector> ⓘ
pub fn failure_detector(&self) -> &Arc<FailureDetector> ⓘ
Get the failure detector.
Sourcepub fn set_migration_handler(&self, handler: Arc<MigrationSubprotocolHandler>)
pub fn set_migration_handler(&self, handler: Arc<MigrationSubprotocolHandler>)
Set the migration subprotocol handler.
Can be called before or after start(). When set, inbound
packets with subprotocol_id == 0x0500 are dispatched to
this handler instead of being queued as events. Idempotent
w.r.t. replacing the handler — a second call swaps in the
new one atomically.
Use Self::clear_migration_handler to uninstall (returns
the mesh to the no-handler state where inbound migration
packets hit the ComputeNotSupported fallback). Needed by
DaemonRuntime::shutdown and by start’s lost-race
cleanup path — the mesh must not hold a live handler
pointing at a runtime that is no longer serving daemons.
Sourcepub fn clear_migration_handler(&self)
pub fn clear_migration_handler(&self)
Uninstall the migration subprotocol handler. After this
call, inbound migration subprotocol packets hit the
no-handler fallback and synthesise ComputeNotSupported
for migration-initiating messages (other message types are
dropped).
Used by the SDK’s DaemonRuntime::start to clean up after
losing the install-vs-CAS race against a concurrent
shutdown: if start installed a handler but its CAS to
Ready lost to shutdown’s state flip, the mesh would
otherwise be left with a live handler owned by a runtime
that’s already been torn down.
Sourcepub fn has_migration_handler(&self) -> bool
pub fn has_migration_handler(&self) -> bool
Returns true iff a migration subprotocol handler is
currently installed on this mesh. Used primarily by tests
that need to observe the ordering of handler installation
against other runtime state transitions — the ArcSwap load
itself is a public API surface regardless.
Sourcepub fn block_peer(&self, addr: SocketAddr)
pub fn block_peer(&self, addr: SocketAddr)
Block packets from/to a peer address (simulates network partition).
Sourcepub fn unblock_peer(&self, addr: &SocketAddr)
pub fn unblock_peer(&self, addr: &SocketAddr)
Unblock a peer address (simulates partition healing).
Sourcepub fn is_blocked(&self, addr: &SocketAddr) -> bool
pub fn is_blocked(&self, addr: &SocketAddr) -> bool
Check if a peer is blocked.
Sourcepub fn proximity_graph(&self) -> &Arc<ProximityGraph> ⓘ
pub fn proximity_graph(&self) -> &Arc<ProximityGraph> ⓘ
Get the proximity graph.
Sourcepub fn reroute_policy(&self) -> &Arc<ReroutePolicy> ⓘ
pub fn reroute_policy(&self) -> &Arc<ReroutePolicy> ⓘ
Get the reroute policy (for checking reroute stats in tests).
Sourcepub fn peer_count(&self) -> usize
pub fn peer_count(&self) -> usize
Number of connected peers.
Sourcepub async fn connect(
&self,
peer_addr: SocketAddr,
peer_pubkey: &[u8; 32],
peer_node_id: u64,
) -> Result<u64, AdapterError>
pub async fn connect( &self, peer_addr: SocketAddr, peer_pubkey: &[u8; 32], peer_node_id: u64, ) -> Result<u64, AdapterError>
Connect to a peer. Performs a Noise NKpsk0 handshake as initiator.
The peer must be listening and ready to accept the handshake. Returns the peer’s node ID on success.
Sourcepub async fn accept(
&self,
peer_node_id: u64,
) -> Result<(SocketAddr, u64), AdapterError>
pub async fn accept( &self, peer_node_id: u64, ) -> Result<(SocketAddr, u64), AdapterError>
Accept a connection from a peer. Performs Noise NKpsk0 as responder.
Waits for an incoming handshake packet and completes the handshake. Returns the peer’s address and assigns the given node_id.
§Ordering contract
accept() MUST be called before Self::start(). Once
start() has spawned the dispatch loop, the dispatcher
consumes every inbound UDP datagram from the shared socket;
try_handshake_responder polls the same socket directly and
races the dispatcher for incoming msg1 packets. Because no
per-pending-responder registry exists today (initiator-side
handshakes use one — see pending_direct_initiators at
mesh.rs:~1216; the responder side is a deferred design item),
a start() → accept() ordering produces a swallowed msg1 and
a hang. To prevent that hang silently turning into a debugging
nightmare, calling accept() after start() now returns an
explicit error rather than spinning forever.
Sourcepub fn start(&self)
pub fn start(&self)
Start the receive loop and heartbeat tasks.
Must be called after connect() / accept() to begin processing
inbound packets.
Refuses (no-op return) if any accept() call is currently
in flight. Symmetric to accept’s contract: either
accept observes started=true and bails (in-flight
count goes to 0, then start proceeds), or start
observes accept_in_flight > 0 and refuses. The SeqCst
orderings make this mutually exclusive. Without this
counter check, start could fire between accept’s
started.load and its handshake_responder poll, after
which the dispatcher would race the responder for the
inbound msg1.
Note: this does NOT enable the periodic capability re-announce
(which keeps the node’s entry alive in its own and peers’ folds
past one TTL) — that needs an owned Arc to re-broadcast. Drive
the node through Self::start_arc (as the SDK / FFI do) to get
it; a bare start is for short-lived / test nodes.
Sourcepub fn start_arc(self: &Arc<Self>)
pub fn start_arc(self: &Arc<Self>)
Start the node through its Arc, enabling the periodic capability
re-announce on top of everything Self::start does. The
re-announce re-broadcasts this node’s capabilities every
MeshNodeConfig::capability_reannounce_interval, keeping its
entry alive in its own fold (so its callee-side nRPC gate doesn’t
expire its own services) AND in every peer’s fold (so it stays
discoverable) past one announcement TTL. Production entry points
(the SDK, the FFI) call this; a bare Self::start omits the
re-announce (fine for short-lived / test nodes). Idempotent.
Sourcepub fn spawn_nat_classify_loop(self: &Arc<Self>) -> JoinHandle<()>
pub fn spawn_nat_classify_loop(self: &Arc<Self>) -> JoinHandle<()>
Spawn the NAT classification loop. Waits until at least 2 peers are connected, fires the initial sweep, then re-checks periodically so a mid-session NAT rebind (e.g. gateway reboot) gets picked up without operator intervention.
Separate from Self::start because the loop needs an
Arc<MeshNode> to call Self::reclassify_nat across
.await points. Callers that hold a MeshNode behind an
Arc (SDK, FFI, all production paths) can spawn this
alongside start to get continuous classification; callers
that don’t can call Self::reclassify_nat manually via
the &self surface.
The loop is best-effort — it never returns an error surface
to the node, and a failed sweep leaves the previous
classification intact. Exits on shutdown_notify.
Sourcepub async fn send_to_peer(
&self,
peer_addr: SocketAddr,
batch: &Batch,
) -> Result<(), AdapterError>
pub async fn send_to_peer( &self, peer_addr: SocketAddr, batch: &Batch, ) -> Result<(), AdapterError>
Send a batch of events to a specific peer by address.
Sourcepub async fn send_routed(
&self,
dest_node_id: u64,
batch: &Batch,
) -> Result<(), AdapterError>
pub async fn send_routed( &self, dest_node_id: u64, batch: &Batch, ) -> Result<(), AdapterError>
Send a batch of events to a destination node via the routing table.
The events are encrypted with the destination’s session key and a routing header is prepended so intermediate nodes can forward without decrypting. The packet is sent to the next hop from the routing table, not directly to the destination.
Requires:
- A session with
dest_node_id(for encryption) - A route to
dest_node_idin the routing table (for next hop)
Sourcepub fn roster(&self) -> &Arc<SubscriberRoster> ⓘ
pub fn roster(&self) -> &Arc<SubscriberRoster> ⓘ
Access the per-channel subscriber roster. Used by ChannelPublisher
to enumerate subscribers; exposed for diagnostics.
Sourcepub fn register_rpc_inbound(
&self,
channel_hash: ChannelHash,
dispatcher: RpcInboundDispatcher,
) -> Option<RpcInboundDispatcher>
pub fn register_rpc_inbound( &self, channel_hash: ChannelHash, dispatcher: RpcInboundDispatcher, ) -> Option<RpcInboundDispatcher>
Register a per-channel-hash inbound dispatcher for nRPC.
When the mesh’s inbound dispatch sees a packet whose
NetHeader::channel_hash matches channel_hash, it routes
every event in the packet to dispatcher and skips the
per-shard inbound queue. Used by Mesh::serve_rpc /
Mesh::call to receive RPC events without polling the
shard queue.
Returns the previous dispatcher (if any) so callers can
detect a slot collision (typically a programming error —
two serve_rpc registrations for the same service on the
same node, or a hash collision between two different
channel names; the latter is bounded at ~1/65536 per pair).
Hot-path cost. One DashMap get per inbound packet. Absent registrations skip the conditional entirely.
Sourcepub fn unregister_rpc_inbound(
&self,
channel_hash: ChannelHash,
) -> Option<RpcInboundDispatcher>
pub fn unregister_rpc_inbound( &self, channel_hash: ChannelHash, ) -> Option<RpcInboundDispatcher>
Remove the registered dispatcher for channel_hash. Returns
the prior dispatcher if one was registered. After removal,
inbound events for channel_hash resume landing in the
per-shard inbound queue.
Sourcepub fn rpc_inbound_dispatcher_registered(
&self,
channel_hash: ChannelHash,
) -> bool
pub fn rpc_inbound_dispatcher_registered( &self, channel_hash: ChannelHash, ) -> bool
Cheap probe: is a dispatcher already registered for this
canonical channel hash? Used by the caller-side
ensure_reply_subscription to skip a redundant registration
when multiple targets serve the same service (they share
one reply channel + one dispatcher per caller).
Sourcepub fn rpc_metrics_snapshot(&self) -> RpcMetricsSnapshot
pub fn rpc_metrics_snapshot(&self) -> RpcMetricsSnapshot
Snapshot of caller-side nRPC metrics. Cheap (one DashMap
iteration); call on every Prometheus scrape. Format with
RpcMetricsSnapshot::prometheus_text.
Sourcepub fn set_rpc_observer(&self, observer: Option<RpcObserverHandle>)
pub fn set_rpc_observer(&self, observer: Option<RpcObserverHandle>)
Install (or clear with None) the caller-side nRPC
observer. Replaces any previously-installed observer.
Cheap to load on the hot path — the dispatch path checks
for an installed observer via one ArcSwap::load and
short-circuits when None. Observers run inline on the
dispatch task; implementations must be cheap (push into
a bounded ring or mpsc, not block).
See cortex::rpc_observer::RpcObserver for the trait
shape and the captured event metadata.
Sourcepub fn rpc_observer(&self) -> Option<RpcObserverHandle>
pub fn rpc_observer(&self) -> Option<RpcObserverHandle>
Hot-path load of the currently-installed nRPC observer,
if any. Cheap — one ArcSwap::load. The dispatch path
calls this on every completed boundary and short-
circuits when None.
Sourcepub fn reserve_cancel_token(&self) -> u64
pub fn reserve_cancel_token(&self) -> u64
Reserve a fresh cancel token. Pass on a subsequent call
via crate::adapter::net::mesh_rpc::CallOptions::cancel_token;
later, call Self::cancel from anywhere to abort the
in-flight task. Tokens are monotonically-increasing,
process-global, never reused. An unused reservation is
harmless — the registry only allocates an entry on the
first paired register or cancel.
Sourcepub fn cancel(&self, token: u64)
pub fn cancel(&self, token: u64)
Abort the in-flight call associated with token.
Idempotent — no-op if the token was never used, the call
already resolved, or token == 0 (the “no token”
sentinel).
Race-safe: a cancel that arrives BEFORE the call’s
register runs (the gap between
Self::reserve_cancel_token and call construction)
latches a pre-cancel flag on the orphan entry; the
subsequent register pre-arms the cancel signal so the
call short-circuits to
crate::adapter::net::mesh_rpc::RpcError::Cancelled
without ever publishing the REQUEST.
Triggers a Drop-on-cancel CANCEL frame on the wire via
the call-shape-specific guards (UnaryCallGuard /
ClientStreamCallRaw::Drop / DuplexCallRaw::Drop). See
crate::adapter::net::cancel_registry for the registry
implementation.
Sourcepub fn cancel_registry_len(&self) -> usize
pub fn cancel_registry_len(&self) -> usize
Number of in-flight calls currently registered with the
cancel registry. Diagnostic — exposed so integration tests
can deterministically poll for call setup completion
instead of guessing with sleep. Includes orphan
cancel-only entries that haven’t aged out yet.
Sourcepub fn set_channel_configs(&mut self, configs: Arc<ChannelConfigRegistry>)
pub fn set_channel_configs(&mut self, configs: Arc<ChannelConfigRegistry>)
Install a ChannelConfigRegistry whose can_subscribe /
can_publish rules are consulted for incoming Subscribe
messages. Also constructs a SubnetGateway over the
same registry + this node’s local_subnet; the gateway
is read by MeshNode::gateway() and powers the
net gateway operator surface.
When unset (the default), all subscribes are accepted, the
subnet-visibility gate is skipped, and gateway() returns
None. Full capability/token-based authorization additionally
requires a TokenCache — see Self::set_token_cache.
Call this BEFORE Self::start — the dispatch context is
captured at start time, so a registry installed after start
won’t be visible to the receive loop.
Sourcepub fn gateway(&self) -> Option<&Arc<SubnetGateway>>
pub fn gateway(&self) -> Option<&Arc<SubnetGateway>>
Read-only handle to this node’s SubnetGateway, or
None when no ChannelConfigRegistry has been installed
(see Self::set_channel_configs). Operator tooling
(net gateway stats|exports) consults this to read the
forwarded/dropped counters and export table.
Sourcepub fn channel_configs(&self) -> Option<&Arc<ChannelConfigRegistry>>
pub fn channel_configs(&self) -> Option<&Arc<ChannelConfigRegistry>>
Read-only handle to this node’s installed
ChannelConfigRegistry, or None when no registry has
been installed via Self::set_channel_configs.
Operator tooling (net channel ls|visibility) reads
this to enumerate configured channels.
Sourcepub fn set_aggregator_registry(&mut self, registry: Arc<AggregatorRegistry>)
pub fn set_aggregator_registry(&mut self, registry: Arc<AggregatorRegistry>)
Install a shared AggregatorRegistry on the node. Once
installed, operator CLI verbs
(net aggregator spawn / ls / scale) and the Deck
AGGREGATORS panel can read + mutate live aggregator
groups through it. Call this BEFORE Self::start —
installing after the receive loop is live races against
channel-publish initialization. The debug_assert! makes
the constraint observable in tests; release builds carry
the doc-comment contract only.
Sourcepub fn aggregator_registry(&self) -> Option<&Arc<AggregatorRegistry>>
pub fn aggregator_registry(&self) -> Option<&Arc<AggregatorRegistry>>
Read-only handle to this node’s installed
AggregatorRegistry,
or None when no registry has been installed via
Self::set_aggregator_registry. Nodes that don’t run
aggregators leave this empty — callers should treat the
None case as “no aggregators registered” and skip
rendering / acting.
Sourcepub fn set_token_cache(&mut self, cache: Arc<TokenCache>)
pub fn set_token_cache(&mut self, cache: Arc<TokenCache>)
Install a shared TokenCache used by the channel-auth path.
When set, authorize_subscribe and publish_many consult
it via ChannelConfig::can_subscribe / can_publish.
Subscribers that present a token on the wire have their
token installed into this cache (after signature
verification) before the ACL check runs.
When unset, require_token channels always reject —
without a cache there’s no way to validate presented tokens
or find pre-cached ones.
Sourcepub async fn subscribe_channel(
&self,
publisher_node_id: u64,
channel: ChannelName,
) -> Result<(), AdapterError>
pub async fn subscribe_channel( &self, publisher_node_id: u64, channel: ChannelName, ) -> Result<(), AdapterError>
Ask publisher_node_id to add this node to channel’s subscriber set.
Blocks until the publisher’s Ack arrives or
membership_ack_timeout elapses. Returns Ok(()) iff the publisher
accepted the subscribe; AckReason failures surface as
AdapterError::Connection. No token is presented — use
Self::subscribe_channel_with_token for channels with
require_token set. Mode defaults to Broadcast (every published
event delivered to this subscriber); use
Self::subscribe_channel_in_queue_group for work-distribution.
Sourcepub async fn subscribe_channel_with_token(
&self,
publisher_node_id: u64,
channel: ChannelName,
token: PermissionToken,
) -> Result<(), AdapterError>
pub async fn subscribe_channel_with_token( &self, publisher_node_id: u64, channel: ChannelName, token: PermissionToken, ) -> Result<(), AdapterError>
Subscribe with a single pre-issued PermissionToken — the
common case where the channel owner granted the subscriber
directly (a one-link chain). For a delegated credential
(owner → … → subscriber) use
Self::subscribe_channel_with_chain.
The publisher verifies the chain roots at one of the channel’s
token_roots and binds to the subscriber’s entity before
admitting the subscribe.
Sourcepub async fn subscribe_channel_with_chain(
&self,
publisher_node_id: u64,
channel: ChannelName,
chain: TokenChain,
) -> Result<(), AdapterError>
pub async fn subscribe_channel_with_chain( &self, publisher_node_id: u64, channel: ChannelName, chain: TokenChain, ) -> Result<(), AdapterError>
Subscribe presenting a full delegation TokenChain
(root-to-leaf). Use when the subscriber’s grant was delegated
rather than issued directly by the channel owner: the chain’s
root link must be signed by one of the channel’s token_roots
and the leaf must be bound to this node’s entity. See
TokenChain::verify_authorizes for the full contract the
publisher applies.
Sourcepub fn set_publish_chain(&self, channel: &ChannelName, chain: TokenChain)
pub fn set_publish_chain(&self, channel: &ChannelName, chain: TokenChain)
Install this node’s own PUBLISH credential for channel as a
(possibly delegated, multi-link) TokenChain.
Use this when the node’s right to publish was delegated
(owner → … → this node) rather than granted directly by the
channel owner: the publish-side ACL builds a single-link chain
from the local TokenCache, whose issuer is the delegator (not
a channel root), so a delegated grant would otherwise fail the
root-anchor check and the node couldn’t publish. The held chain
is preferred over the cache fallback on the publish path; its
leaf must be bound to this node’s entity. Re-verified against the
current clock + revocation on every publish, so an expired or
revoked held chain fails closed like any other.
Sourcepub async fn subscribe_channel_in_queue_group(
&self,
publisher_node_id: u64,
channel: ChannelName,
queue_group: String,
) -> Result<(), AdapterError>
pub async fn subscribe_channel_in_queue_group( &self, publisher_node_id: u64, channel: ChannelName, queue_group: String, ) -> Result<(), AdapterError>
Subscribe in the named queue group: every published event is
delivered to exactly ONE member of the group, distributed
round-robin across members. The publisher’s roster carries
the mode; the same (channel, queue_group) pair across
multiple subscribers forms one work-distribution pool. Used
by request/response patterns (nRPC) and any one-of-N
job-distribution shape.
Sourcepub async fn subscribe_channel_in_queue_group_with_token(
&self,
publisher_node_id: u64,
channel: ChannelName,
queue_group: String,
token: PermissionToken,
) -> Result<(), AdapterError>
pub async fn subscribe_channel_in_queue_group_with_token( &self, publisher_node_id: u64, channel: ChannelName, queue_group: String, token: PermissionToken, ) -> Result<(), AdapterError>
Queue-group subscribe with a pre-issued
PermissionToken. Same auth flow as
Self::subscribe_channel_with_token, queue-group
semantics from Self::subscribe_channel_in_queue_group.
Sourcepub async fn unsubscribe_channel(
&self,
publisher_node_id: u64,
channel: ChannelName,
) -> Result<(), AdapterError>
pub async fn unsubscribe_channel( &self, publisher_node_id: u64, channel: ChannelName, ) -> Result<(), AdapterError>
Ask publisher_node_id to remove this node from channel’s
subscriber set. Mirror of subscribe_channel. Mode-agnostic
— unsubscribe finds the peer in whichever mode they’re in.
Sourcepub fn channel_publisher(
&self,
channel: ChannelName,
config: PublishConfig,
) -> ChannelPublisher
pub fn channel_publisher( &self, channel: ChannelName, config: PublishConfig, ) -> ChannelPublisher
Build a ChannelPublisher recipe. Does NOT talk to the wire —
combine with publish or
publish_many to actually fan out.
Sourcepub async fn publish(
&self,
publisher: &ChannelPublisher,
payload: Bytes,
) -> Result<PublishReport, AdapterError>
pub async fn publish( &self, publisher: &ChannelPublisher, payload: Bytes, ) -> Result<PublishReport, AdapterError>
Fan payload out to every subscriber of the publisher’s channel.
One per-peer unicast per subscriber — no multicast primitive, no
group crypto. Per-peer concurrency is bounded by
PublishConfig::max_inflight. The failure policy controls whether
per-peer errors short-circuit the fan-out (see OnFailure).
Sourcepub async fn publish_many(
&self,
publisher: &ChannelPublisher,
events: &[Bytes],
) -> Result<PublishReport, AdapterError>
pub async fn publish_many( &self, publisher: &ChannelPublisher, events: &[Bytes], ) -> Result<PublishReport, AdapterError>
Fan multiple payloads out to every subscriber of the publisher’s
channel. Semantics are the same as publish; the
whole events slice is delivered as one batch per subscriber.
Sourcepub async fn publish_fold_to_peer<P>(
&self,
peer_addr: SocketAddr,
ann: &SignedAnnouncement<P>,
) -> Result<usize, AdapterError>where
P: Serialize + DeserializeOwned,
pub async fn publish_fold_to_peer<P>(
&self,
peer_addr: SocketAddr,
ann: &SignedAnnouncement<P>,
) -> Result<usize, AdapterError>where
P: Serialize + DeserializeOwned,
Encode a super::behavior::fold::SignedAnnouncement and
send it to one peer as a
super::behavior::fold::SUBPROTOCOL_FOLD frame.
The receiver’s dispatch_packet arm decodes + verifies +
routes to the right typed super::behavior::fold::Fold<K>
via the installed
super::behavior::fold::FoldChannelRouter.
Returns the encoded byte count for metrics / diagnostics.
Sourcepub async fn publish_fold_broadcast<P>(
&self,
ann: &SignedAnnouncement<P>,
) -> Result<usize, AdapterError>where
P: Serialize + DeserializeOwned,
pub async fn publish_fold_broadcast<P>(
&self,
ann: &SignedAnnouncement<P>,
) -> Result<usize, AdapterError>where
P: Serialize + DeserializeOwned,
Best-effort fan-out of a fold announcement to every
currently-connected peer. Per-peer send failures are
logged and skipped rather than short-circuiting the rest
of the fan-out. The N peer sends run concurrently via
futures::future::join_all so a chatty publisher (e.g.
ReservationFold against a busy resource) doesn’t block
its own task on serial encryption + UDP-send latency.
Returns the number of peers the announcement was successfully shipped to.
Sourcepub async fn publish_capability_membership(
&self,
membership: CapabilityMembership,
) -> Result<usize, AdapterError>
pub async fn publish_capability_membership( &self, membership: CapabilityMembership, ) -> Result<usize, AdapterError>
Publish a super::behavior::fold::CapabilityFold
membership announcement against this node’s identity.
The counter is sharded per-class_hash so concurrent
publishes to different classes don’t contend.
Sourcepub async fn publish_route(
&self,
destination: NodeId,
next_hop: SocketAddr,
metric: u32,
) -> Result<usize, AdapterError>
pub async fn publish_route( &self, destination: NodeId, next_hop: SocketAddr, metric: u32, ) -> Result<usize, AdapterError>
Publish a super::behavior::fold::RoutingFold route
announcement. The fold doesn’t shard by class, so both
the counter and the envelope use class 0. via is
auto-stamped to this node’s node_id.
Sourcepub async fn publish_reservation(
&self,
resource_id: ResourceId,
state: ReservationState,
) -> Result<usize, AdapterError>
pub async fn publish_reservation( &self, resource_id: ResourceId, state: ReservationState, ) -> Result<usize, AdapterError>
Publish a super::behavior::fold::ReservationFold
state transition for resource_id. Generation is
per-resource (counter sharded by resource_id) so
concurrent transitions against different resources don’t
contend; the envelope’s class field is unused for this
fold and stays 0.
Sourcepub async fn send_subprotocol(
&self,
peer_addr: SocketAddr,
subprotocol_id: u16,
payload: &[u8],
) -> Result<(), AdapterError>
pub async fn send_subprotocol( &self, peer_addr: SocketAddr, subprotocol_id: u16, payload: &[u8], ) -> Result<(), AdapterError>
Send a raw subprotocol message to a peer.
The payload is sent as a single event frame with the specified
subprotocol_id set in the Net header (included in AEAD AAD).
Sourcepub async fn announce_capabilities(
&self,
caps: CapabilitySet,
) -> Result<(), AdapterError>
pub async fn announce_capabilities( &self, caps: CapabilitySet, ) -> Result<(), AdapterError>
Announce this node’s capabilities to every directly-connected
peer. Also self-indexes so single-node find_nodes_by_filter
queries return us too.
TTL defaults to 5 minutes. Unsigned (signatures tie in with
Stage E channel auth). For explicit control over TTL or
signing, see Self::announce_capabilities_with.
Sourcepub async fn announce_capabilities_with(
&self,
caps: CapabilitySet,
ttl: Duration,
sign: bool,
) -> Result<(), AdapterError>
pub async fn announce_capabilities_with( &self, caps: CapabilitySet, ttl: Duration, sign: bool, ) -> Result<(), AdapterError>
Extended announce with explicit TTL and signing opt-in.
sign = true signs the announcement with the node’s
EntityKeypair so receivers can validate end-to-end.
sign = false broadcasts unsigned — useful in trusted
environments where the wire signature adds no value.
Receivers with require_signed_capabilities = true drop
unsigned announcements regardless.
Sourcepub async fn announce_chain(
&self,
origin_hash: u64,
tip_seq: u64,
) -> Result<(), AdapterError>
pub async fn announce_chain( &self, origin_hash: u64, tip_seq: u64, ) -> Result<(), AdapterError>
Advertise that this node holds the causal chain identified
by origin_hash up to and including tip_seq. Emits the
causal:<hex>:<tip_seq> reserved tag and re-broadcasts the
node’s capability announcement.
Idempotent on the chain identity — every prior tip / range
advertisement for the same origin_hash is replaced. The
most recent call wins.
Capability Phase B per CAPABILITY_SYSTEM_PLAN.md §B and a
hard prerequisite for REDEX_DISTRIBUTED_PLAN.md Phase C/D/E.
Sourcepub async fn announce_chain_range(
&self,
origin_hash: u64,
start_seq: u64,
end_seq: u64,
) -> Result<(), AdapterError>
pub async fn announce_chain_range( &self, origin_hash: u64, start_seq: u64, end_seq: u64, ) -> Result<(), AdapterError>
Advertise that this node holds the half-open range
[start_seq, end_seq) of the chain identified by
origin_hash. Emits the
causal:<hex>[<start>..<end>] reserved tag and re-broadcasts.
start_seq >= end_seq is a no-op — a degenerate range
would advertise nothing meaningful. Idempotent on the chain
identity; the most recent call replaces every prior tip /
range / presence form for the same origin_hash.
Sourcepub async fn withdraw_chain(&self, origin_hash: u64) -> Result<(), AdapterError>
pub async fn withdraw_chain(&self, origin_hash: u64) -> Result<(), AdapterError>
Withdraw every causal:<hex>* advertisement for
origin_hash and re-broadcast. Idempotent — repeated calls
converge to the same view (the chain tag absent from the
announced set).
Sourcepub async fn announce_heat(
&self,
origin_hash: u64,
rate: f64,
) -> Result<(), AdapterError>
pub async fn announce_heat( &self, origin_hash: u64, rate: f64, ) -> Result<(), AdapterError>
Annotate the local capability set with a heat:<hex>=<rate>
reserved tag for origin_hash and re-broadcast. Replaces
any prior heat tag for the same chain — the most recent
call wins.
rate is clamped to [0.0, 1.0] by the substrate’s
CapabilitySet::heat_level builder; the wire emits the
clamped value with two decimal places. Data-gravity
callers normalize their unbounded read-rate to that range
before calling.
Rebel Yell Phase 4. See
docs/misc/DATAFORTS_PLAN.md § Phase 4.
Sourcepub async fn withdraw_heat(&self, origin_hash: u64) -> Result<(), AdapterError>
pub async fn withdraw_heat(&self, origin_hash: u64) -> Result<(), AdapterError>
Withdraw every heat:<hex>=* tag for origin_hash and
re-broadcast. Peers drop the heat annotation; the
chain’s causal: advertisements are untouched.
Sourcepub async fn announce_heat_batch(
&self,
updates: &[(u64, Option<f64>)],
) -> Result<(), AdapterError>
pub async fn announce_heat_batch( &self, updates: &[(u64, Option<f64>)], ) -> Result<(), AdapterError>
Apply a batch of heat emissions in a single
announce_capabilities round-trip. Each update is one of:
Some(rate): replace this chain’sheat:tag withheat:<hex>=<clamped rate>.None: withdraw everyheat:annotation for this chain.
Used by gravity_tick to coalesce per-chain emissions. The
previous single-shot per-chain announce_heat loop rebroadcast
the full capability set N times per tick — O(n_chains² × n_tags)
wire work on a busy node. This batch path mutates the snapshot
once and rebroadcasts once.
Non-finite rates skip silently (with a trace log).
Source§impl MeshNode
impl MeshNode
Sourcepub async fn announce_blob_heat(
&self,
hash: [u8; 32],
rate: f64,
) -> Result<(), AdapterError>
pub async fn announce_blob_heat( &self, hash: [u8; 32], rate: f64, ) -> Result<(), AdapterError>
Advertise the heat:blob:<hex64>=<rate> reserved tag for
chunk hash. rate is clamped to [0.0, 1.0] to match
the chain-heat wire shape. PR-5j-c — operators wire this
via the BlobHeatSink
impl below; the MeshBlobAdapter::tick_blob_heat
loop is the driver.
Sourcepub async fn withdraw_blob_heat(
&self,
hash: [u8; 32],
) -> Result<(), AdapterError>
pub async fn withdraw_blob_heat( &self, hash: [u8; 32], ) -> Result<(), AdapterError>
Withdraw every heat:blob:<hex>=* tag for hash and
re-broadcast. Peers drop the blob-heat annotation.
Sourcepub async fn announce_blob_heat_batch(
&self,
updates: &[([u8; 32], Option<f64>)],
) -> Result<(), AdapterError>
pub async fn announce_blob_heat_batch( &self, updates: &[([u8; 32], Option<f64>)], ) -> Result<(), AdapterError>
Apply a batch of blob-heat emissions in a single
announce_capabilities round-trip. Each update is either
Some(rate) (replace this hash’s heat:blob: tag) or
None (withdraw every blob-heat annotation for the hash).
Coalescing matches the chain-heat batch path so a busy
tick doesn’t spawn N rebroadcasts.
Source§impl MeshNode
impl MeshNode
Sourcepub async fn send_overflow_push(
self: &Arc<Self>,
target_node_id: u64,
blob_hash: [u8; 32],
size_bytes: u64,
) -> Result<OverflowPushAck, BlobError>
pub async fn send_overflow_push( self: &Arc<Self>, target_node_id: u64, blob_hash: [u8; 32], size_bytes: u64, ) -> Result<OverflowPushAck, BlobError>
Send an overflow push nudge to target_node_id. The
chunk bytes themselves don’t ride this RPC — the
nudge tells the receiver to open the chunk channel
against its local Redex with replication armed; the
existing per-chunk replication runtime pulls the
bytes from any holder advertising causal:<hash>
(typically this node, since we’re shedding it).
Ok(OverflowPushAck::Accepted) means the receiver
accepted + opened the chunk channel. The sender still
has to observe the durability watermark (the target’s
causal:<hash> advertisement) before deleting the
local copy — that’s the safe-delete gate. Per the
overflow plan, this two-phase pattern lets a failed
open on the receive side (network blip during chunk
pull) keep the bytes alive on the sender.
Maps non-Ok results to typed BlobError so the
controller’s push_errors counter bumps uniformly.
See super::dataforts::blob::overflow::OverflowPushAck
for the variant breakdown.
Sourcepub fn serve_overflow_push(
self: &Arc<Self>,
adapter: Arc<MeshBlobAdapter>,
) -> Result<ServeHandle, ServeError>
pub fn serve_overflow_push( self: &Arc<Self>, adapter: Arc<MeshBlobAdapter>, ) -> Result<ServeHandle, ServeError>
Register the receive-side overflow-push handler on
this node. The handler reads live local caps + the
capability index on each request, runs admission,
and on Admit opens the chunk channel against
adapter via super::dataforts::blob::adapter::BlobAdapter::prefetch.
Returns the super::mesh_rpc::ServeHandle the
operator drops to deregister the handler. Multiple
calls on the same node would conflict on the service
name; the second call returns
super::mesh_rpc::ServeError::AlreadyServing.
Sourcepub async fn announce_blob_overflow_state(
&self,
adapter: &MeshBlobAdapter,
) -> Result<(), AdapterError>
pub async fn announce_blob_overflow_state( &self, adapter: &MeshBlobAdapter, ) -> Result<(), AdapterError>
Rebroadcast the local capability set with the
dataforts.blob.overflow tag set to match adapter’s
current overflow_enabled() state. Convenience for
operators who flip super::dataforts::blob::MeshBlobAdapter::set_overflow_enabled
at runtime: the boolean lives on the adapter, but the
capability index reads from this node’s announced caps,
so peers only observe the change after the next
announce. Without this helper, every toggle path needs a
matching announce_capabilities call — easy to forget,
and the symptom (sender keeps round-tripping pushes that
reject SenderNotOverflowing) is non-obvious.
Snapshots the current user caps, sets / clears the
presence tag based on adapter.overflow_enabled(), and
announces the updated set. Returns the
AdapterError from the inner announce_capabilities
if the announce fails (rate-limited, transport down,
etc.).
Source§impl MeshNode
impl MeshNode
Sourcepub fn serve_blob_transfer(
self: &Arc<Self>,
adapter: Arc<MeshBlobAdapter>,
) -> Arc<BlobTransferEngine> ⓘ
pub fn serve_blob_transfer( self: &Arc<Self>, adapter: Arc<MeshBlobAdapter>, ) -> Arc<BlobTransferEngine> ⓘ
Install the blob-transfer engine over adapter (FairScheduler
transport plan). After this, the node serves on-demand chunk
fetches (the SUBPROTOCOL_BLOB_TRANSFER control branch + the
transfer-stream data divert route to the engine) and can issue
them via Self::transfer_fetch_chunk. Idempotent — re-install
replaces the engine.
Returns the installed engine handle — the same one stored on the
node — so callers (e.g. the blob.transfers RPC install) can serve
introspection over the exact registry that’s doing the fetches.
Sourcepub async fn transfer_fetch_chunk(
self: &Arc<Self>,
holder: u64,
hash: [u8; 32],
) -> Result<Bytes, BlobError>
pub async fn transfer_fetch_chunk( self: &Arc<Self>, holder: u64, hash: [u8; 32], ) -> Result<Bytes, BlobError>
Fetch the chunk addressed by hash from holder over a
reliable, scheduled transfer stream — bytes move over the
FairScheduler-managed stream transport, not RedEX replication or
nRPC. Returns the BLAKE3-verified bytes, or
BlobError::NotFound on a holder miss / timeout (so the caller
can fail over to another advertised holder).
Sourcepub async fn transfer_fetch_chunk_discovered(
self: &Arc<Self>,
hash: [u8; 32],
) -> Result<Bytes, BlobError>
pub async fn transfer_fetch_chunk_discovered( self: &Arc<Self>, hash: [u8; 32], ) -> Result<Bytes, BlobError>
Fetch the chunk addressed by hash from whichever connected
peer holds it, without the caller naming a holder. Probes peers
in turn over the transfer transport: a peer that lacks the chunk
replies NotFound promptly (so misses are cheap), and the first
peer that serves the BLAKE3-verified bytes wins. Returns
BlobError::NotFound if no connected peer has it (the caller
may then fall back to its own backend or fail the read).
This is the usable “fetch by content hash” entry point that
Self::transfer_fetch_chunk (which requires a known holder)
can’t be on its own.
Discovery model (deliberate). v1 probes connected peers
directly rather than consulting the capability fold for
causal:<hash> advertisers. The per-chunk causal:<hex64> tag
is a single-datagram advertisement that caps at ~15-20
chunks/node, so it does not scale as a per-chunk holder index —
which is exactly why the directory transfer
(super::dataforts::dir) pulls from a known source instead.
Probing sidesteps the ceiling entirely: the holder’s prompt
NotFound is the membership test. A future optimization can use
a node-level “serves blobs” capability (one tag, no ceiling) or a
dedicated zero-byte probe frame to ORDER / prune candidates and
to stay robust against a silent peer; today each probe is bounded
by DISCOVERY_PROBE_TIMEOUT so one unresponsive peer can’t
stall the whole search.
Source§impl MeshNode
impl MeshNode
Sourcepub fn set_replication_inbound_router(
&self,
router: Option<Arc<dyn ReplicationInboundRouter>>,
)
pub fn set_replication_inbound_router( &self, router: Option<Arc<dyn ReplicationInboundRouter>>, )
Install (or replace) the SUBPROTOCOL_REDEX inbound
router. Used by Redex to register a per-node router
that owns the per-channel runtime registry; the mesh
dispatch hot-path consults this router on every inbound
SUBPROTOCOL_REDEX frame.
Passing None un-installs the router — every subsequent
inbound replication frame is dropped silently until a new
router is installed.
Sourcepub fn set_meshdb_inbound_router(
&self,
router: Option<Arc<dyn MeshDbInboundRouter>>,
)
pub fn set_meshdb_inbound_router( &self, router: Option<Arc<dyn MeshDbInboundRouter>>, )
Install (or replace) the SUBPROTOCOL_MESHDB inbound
router. The MeshDB transport installs one of these to
receive federated-query traffic — both inbound requests
(routed to the server-side query handler) and inbound
responses (routed to the matching in-flight caller).
Passing None un-installs the router — subsequent
inbound MeshDB frames are dropped silently until a new
router is installed.
Idempotent — re-installing replaces the previous handle.
Hot-path cost is one parking_lot::RwLock read per
inbound SUBPROTOCOL_MESHDB frame.
Sourcepub fn has_meshdb_inbound_router(&self) -> bool
pub fn has_meshdb_inbound_router(&self) -> bool
True iff a MeshDB inbound router is currently installed. Useful for tests and the operator surface to confirm the install landed.
Sourcepub fn set_fold_router(&self, router: Option<Arc<dyn FoldChannelRouter>>)
pub fn set_fold_router(&self, router: Option<Arc<dyn FoldChannelRouter>>)
Install (or uninstall) the fold-framework channel router. Absent router = fold packets dropped silently. Idempotent; re-installing replaces the previous handle.
Sourcepub fn has_fold_router(&self) -> bool
pub fn has_fold_router(&self) -> bool
True iff a fold-framework channel router is currently installed. Useful for tests and the operator surface to confirm the install landed.
Sourcepub fn fold_stats(&self) -> Vec<FoldStats>
pub fn fold_stats(&self) -> Vec<FoldStats>
Aggregated super::behavior::fold::FoldStats for every
fold the installed router addresses. The
net-mesh fold list CLI command and the Deck FOLDS panel
call this once per scrape tick. Returns an empty Vec
when no router is installed.
Sourcepub fn set_greedy_observer(&self, observer: Option<Arc<dyn GreedyObserver>>)
pub fn set_greedy_observer(&self, observer: Option<Arc<dyn GreedyObserver>>)
Install (or uninstall) the greedy-LRU observer. Redex
calls this from
Redex::enable_greedy_dataforts
to wire the inbound dispatch fanout. None uninstalls;
subsequent standard-event packets fall through the
observer hook untouched.
Idempotent — re-installing replaces the previous handle. Hot-path cost is unchanged when called with the same Arc.
Sourcepub fn has_greedy_observer(&self) -> bool
pub fn has_greedy_observer(&self) -> bool
True iff a greedy observer is currently installed. Useful for tests and for the operator surface to confirm the install landed.
Sourcepub fn find_chain_holders(&self, origin_hash: u64) -> Vec<u64>
pub fn find_chain_holders(&self, origin_hash: u64) -> Vec<u64>
Return every node currently advertising any causal:<hex>*
variant for origin_hash. Includes this node when self-
indexed. Sorted by ascending RTT from this node where the
proximity graph has measurements, with self at the front
and unmeasured peers (no recent ping) at the back; among
equally-measured peers, ties broken by ascending NodeId.
Reads the capability fold directly; no broadcast.
Sourcepub fn find_nodes_by_filter(&self, filter: &CapabilityFilter) -> Vec<u64>
pub fn find_nodes_by_filter(&self, filter: &CapabilityFilter) -> Vec<u64>
Query the capability fold. Returns node ids (including our
own node_id) whose latest announcement matches filter.
Routes through capability_bridge::find_nodes_matching so
post-query predicates (memory, vram, GPU) execute in-memory
alongside the fold’s tag-based intersection.
Sourcepub fn find_nodes_by_filter_scoped(
&self,
filter: &CapabilityFilter,
scope: &ScopeFilter<'_>,
) -> Vec<u64>
pub fn find_nodes_by_filter_scoped( &self, filter: &CapabilityFilter, scope: &ScopeFilter<'_>, ) -> Vec<u64>
Scoped variant of Self::find_nodes_by_filter. Filters
candidates through scope (derived from each peer’s
scope:* reserved tags) on top of the capability filter.
SubnetLocal peers and the ScopeFilter::SameSubnet
filter resolve same-subnet membership against
peer_subnets.
Warm-up rule. When a peer’s subnet is unknown:
- With a
local_subnet_policy, the candidate is admitted (a fresh peer’s announcement may not have landed yet — the policy will resolve it on receipt). - Without a
local_subnet_policy,peer_subnetsstays permanently empty (the dispatch handler only writes it when a policy is installed), so “unknown” means “will never resolve” — admitting unknowns there leaks every peer throughSameSubnet. The candidate is excluded.
Sourcepub fn peer_reflex_addr(&self, peer_node_id: u64) -> Option<SocketAddr>
pub fn peer_reflex_addr(&self, peer_node_id: u64) -> Option<SocketAddr>
Read a peer’s most recently advertised public reflex
SocketAddr from the capability index. None before the
peer has sent a stage-2 announcement, or when the peer was
compiled without nat-traversal.
Stage 3 (rendezvous) reads this field to resolve the punch target’s public address. Exposed for observability and for tests that want to verify capability-announcement propagation of the reflex field.
Requires the nat-traversal cargo feature.
Sourcepub fn peer_nat_class(&self, peer_node_id: u64) -> NatClass
pub fn peer_nat_class(&self, peer_node_id: u64) -> NatClass
Read a peer’s most recently advertised NAT classification
from the capability index. Parses the nat:* tag on the
peer’s announcement. Returns NatClass::Unknown when the
peer has not indexed (we’ve never received an announcement),
or the announcement carried no nat:* tag (peer was
compiled without nat-traversal, or hasn’t classified yet).
Consumed by the pair-type matrix (plan §3) — connect_direct
reads this to decide whether to attempt a punch or
short-circuit to the routed path.
Requires the nat-traversal cargo feature.
Sourcepub fn traversal_stats(&self) -> TraversalStatsSnapshot
pub fn traversal_stats(&self) -> TraversalStatsSnapshot
Cumulative traversal counters — punch attempts, successes, and relay fallbacks. Returns a consistent point-in-time snapshot.
See super::traversal::TraversalStatsSnapshot for the
field semantics. Counters are monotonic and never reset;
callers that want deltas should subtract snapshots.
Requires the nat-traversal cargo feature.
Sourcepub async fn connect_direct(
&self,
peer_node_id: u64,
peer_pubkey: &[u8; 32],
coordinator: u64,
) -> Result<u64, TraversalError>
pub async fn connect_direct( &self, peer_node_id: u64, peer_pubkey: &[u8; 32], coordinator: u64, ) -> Result<u64, TraversalError>
Establish a direct session to peer_node_id, using the
pair-type matrix (plan §3) to decide between a direct
handshake, a rendezvous-coordinated single-shot punch, or
a routed-only fallback.
§Flow
- Read local + remote NAT classifications (self
nat_class()+peer_nat_class). Unknown sides are handled per the matrix — never treated as “don’t attempt” (plan decision 8). - Resolve the peer’s reflex address from the local
capability index. Fails with
super::traversal::TraversalError::PeerNotReachableif no reflex is cached (peer hasn’t announced yet). - Apply the matrix:
Direct→ connect via the routing table’s first-hop;coordinatoris not consulted and its reachability is irrelevant.relay_fallbacksincrements (we didn’t attempt a punch).SkipPunch→ connect viacoordinatoras the relay; symmetric pairs have no better option. Fails withPeerNotReachableifcoordinatorisn’t a live peer.SinglePunch→ askcoordinatorto mediate viaSelf::request_punch. On successful introduction, incrementpunches_attempted+punches_succeededand connect topeer_reflex. On failure, incrementpunches_attempted+relay_fallbacksand fall back to connecting via the coordinator — the plan’s framing treats punch-failed as “optimization missed,” not a connectivity failure.
§Scope note
Stage 3c wires the orchestration + stats end-to-end but
always establishes the session via the routed handshake
through coordinator — the framing “traffic rides the
relay until a direct punch upgrades it” matches the plan’s
“optimization, not correctness” contract. Stage 3d lands
the keep-alive train + PunchAck round-trip, at which
point a successful SinglePunch outcome upgrades to a
direct session; failed punches (or matrix-skipped pairs)
continue to resolve on the routed path as they do today.
Stats are set on the stage-3c semantics already:
punches_attempted increments when the matrix picks
SinglePunch and the coordinator mediates; stage 3d
refines punches_succeeded / relay_fallbacks against
the real keep-alive outcome.
Requires the nat-traversal cargo feature.
Sourcepub fn find_best_node(&self, req: &CapabilityRequirement) -> Option<u64>
pub fn find_best_node(&self, req: &CapabilityRequirement) -> Option<u64>
Rank peers for a scored requirement. Returns the best-
scoring node’s id, or None if no peer matches.
Phase 3b note: scoring runs against a tag-only
CapabilitySet
synthesized from the fold (the fold’s
CapabilityMembership
doesn’t carry the full legacy hardware/models projection).
Hardware- and model-based preference weights (memory,
vram, tokens/sec, loaded) read zero, so this method
degrades to “any matching candidate, lex-sorted by
node_id.” That’s the same shape as the cap-propagation-
race fallback; production has no rich-scoring caller per
the Phase 3b survey.
Sourcepub fn find_best_node_scoped(
&self,
req: &CapabilityRequirement,
scope: &ScopeFilter<'_>,
) -> Option<u64>
pub fn find_best_node_scoped( &self, req: &CapabilityRequirement, scope: &ScopeFilter<'_>, ) -> Option<u64>
Scoped variant of Self::find_best_node. See
Self::find_nodes_by_filter_scoped for the scope
resolution semantics; selection picks the highest-scoring
candidate within the scoped set.
Phase 3b note: same scoring caveat as
Self::find_best_node — the fold’s
CapabilityMembership
doesn’t carry the legacy hardware/models projection, so
scoring degrades to “any matching candidate, lex-sorted.”
Sourcepub fn capability_fold(&self) -> &Arc<Fold<CapabilityFold>> ⓘ
pub fn capability_fold(&self) -> &Arc<Fold<CapabilityFold>> ⓘ
Shared reference to the capability fold — the canonical capability-state surface. Used by operator dashboards, the rendezvous coordinator (reflex lookup), and the dataforts-blob overflow handler.
Sourcepub fn reservation_fold(&self) -> &Arc<Fold<ReservationFold>> ⓘ
pub fn reservation_fold(&self) -> &Arc<Fold<ReservationFold>> ⓘ
Shared reference to the reservation fold. Always present (allocated at construction even when the node never publishes a reservation) so the aggregator surface + future scheduler callers don’t have to discriminate on presence.
Sourcepub fn get_node_by_origin_hash(&self, origin_hash: u64) -> Option<u64>
pub fn get_node_by_origin_hash(&self, origin_hash: u64) -> Option<u64>
Resolve a wire origin_hash to its publisher’s node_id,
or None when no publisher has claimed it yet. Post-
WIRE_ORIGIN_HASH_64BIT the wire hash is the full
EntityId::origin_hash() u64; accidental collisions are
2^-32 (effectively impossible). The map is populated
first-write-wins, so an adversary grinding a colliding
keypair (~2^32 work) cannot displace an established
claimant.
Sourcepub fn open_stream(
&self,
peer_node_id: u64,
stream_id: u64,
config: StreamConfig,
) -> Result<Stream, AdapterError>
pub fn open_stream( &self, peer_node_id: u64, stream_id: u64, config: StreamConfig, ) -> Result<Stream, AdapterError>
Open (or look up) a logical stream to a connected peer.
A stream is one ordered, independently reliability-configured
channel inside the encrypted session to peer_node_id. Multiple
streams share one session, one cipher, and one UDP socket, but
have independent sequence numbers and reliability state. See
Stream for the full contract.
Idempotent: repeated calls for the same (peer_node_id, stream_id) return handles backed by the same underlying state;
a config argument that differs from the first call’s is ignored
with a warning log. Close + re-open to change a stream’s config.
Sourcepub fn close_stream(&self, peer_node_id: u64, stream_id: u64)
pub fn close_stream(&self, peer_node_id: u64, stream_id: u64)
Close a stream: drop its StreamState from the session, ending
delivery of any buffered inbound events for the stream and
dropping outbound packets that haven’t hit the wire yet.
Idempotent. CloseBehavior::DrainThenClose is honored only to
the extent the router’s scheduler has already flushed; there is
no wire “drain-then-close” signal in v1.
Sourcepub async fn close_stream_graceful(
&self,
peer_node_id: u64,
stream_id: u64,
timeout: Duration,
)
pub async fn close_stream_graceful( &self, peer_node_id: u64, stream_id: u64, timeout: Duration, )
Close a reliable stream gracefully (H-7, DrainThenClose):
wait until the reliability layer has no unacked packets — i.e. the
receiver has acked everything (with H-9 ack-pruning, pending
empties as grants arrive) — or timeout elapses, then close. Use
after the last bytes of a reliable send so retransmit can still
fill gaps before teardown; closing eagerly (close_stream) drops
the retransmit window and can strand a lost tail packet on a lossy
link. A fire-and-forget stream (nothing tracked) drains instantly.
Sourcepub async fn send_on_stream(
&self,
stream: &Stream,
events: &[Bytes],
) -> Result<(), StreamError>
pub async fn send_on_stream( &self, stream: &Stream, events: &[Bytes], ) -> Result<(), StreamError>
Send a batch of events on an explicit stream.
Uses the stream’s reliability mode from its original open_stream
config. Returns Backpressure when the stream’s in-flight count
(tx_inflight) would exceed its configured tx_window; the event
was not enqueued — the caller decides what to do (drop, retry,
or buffer at the app layer). tx_window == 0 disables the check
and preserves pre-backpressure behavior. Transport is returned
for underlying socket send failures.
Returns NotConnected when the stream was never opened or has
been closed since (close_stream, idle eviction, cap-exceeded
LRU). A previously-closed Stream handle is inert by design —
reusing it does NOT silently re-create the stream with default
config; the caller must explicitly re-open.
Sourcepub async fn send_with_retry(
&self,
stream: &Stream,
events: &[Bytes],
max_retries: usize,
) -> Result<(), StreamError>
pub async fn send_with_retry( &self, stream: &Stream, events: &[Bytes], max_retries: usize, ) -> Result<(), StreamError>
Send events on stream, retrying on Backpressure with
exponential backoff (5 ms → 200 ms, doubling) up to max_retries
times. Transport failures are returned immediately — they’re a
real error, not a pressure signal, and retrying would just mask
them. Returns the final Backpressure error if the stream stays
saturated across every attempt.
Sourcepub async fn send_blocking(
&self,
stream: &Stream,
events: &[Bytes],
) -> Result<(), StreamError>
pub async fn send_blocking( &self, stream: &Stream, events: &[Bytes], ) -> Result<(), StreamError>
Convenience wrapper around send_with_retry
with a generous retry count. Blocks the calling task until the
send succeeds or a transport error occurs. Use when you’d rather
wait than drop; prefer send_with_retry if you need a concrete
upper bound on retry attempts.
Sourcepub fn stream_stats(
&self,
peer_node_id: u64,
stream_id: u64,
) -> Option<StreamStats>
pub fn stream_stats( &self, peer_node_id: u64, stream_id: u64, ) -> Option<StreamStats>
Snapshot of per-stream stats for a single stream.
Returns None if either the peer or the stream doesn’t exist.
Sourcepub fn all_stream_stats(&self, peer_node_id: u64) -> Vec<(u64, StreamStats)>
pub fn all_stream_stats(&self, peer_node_id: u64) -> Vec<(u64, StreamStats)>
Snapshot of per-stream stats for every stream in the session to
peer_node_id. Empty vec if the peer doesn’t exist.
Sourcepub async fn connect_via(
&self,
relay_addr: SocketAddr,
dest_pubkey: &[u8; 32],
dest_node_id: u64,
) -> Result<u64, AdapterError>
pub async fn connect_via( &self, relay_addr: SocketAddr, dest_pubkey: &[u8; 32], dest_node_id: u64, ) -> Result<u64, AdapterError>
Connect to dest_node_id via a routed handshake through
relay_addr. Unlike Self::connect (which requires the
responder to pre-accept() this initiator’s node_id before
start()), this path embeds the initiator’s full node_id
inside the Noise msg1 payload and routes the packet through
the dispatch loop’s handle_routed_handshake Case 2 — so a
daemon that’s already start()ed accepts msg1 from a brand-
new initiator without prior coordination.
When relay_addr == final destination (the CLI remote-attach
case), the routed path is degenerate one-hop. The post-
handshake peer install populates addr_to_node[relay_addr]
via entry().or_insert(...) so address-keyed sends resolve
the peer; in a true multi-hop scenario the relay’s prior
mapping stays intact.
Retries up to MeshNodeConfig::handshake_retries (default
3) — a single UDP drop on msg1 or msg2 should not surface
as a typed error.
Sourcepub async fn connect_routed(
&self,
dest_pubkey: &[u8; 32],
dest_node_id: u64,
) -> Result<u64, AdapterError>
pub async fn connect_routed( &self, dest_pubkey: &[u8; 32], dest_node_id: u64, ) -> Result<u64, AdapterError>
Connect to a peer by node id, using the routing table to pick the
first hop. Fails with Connection("no route to ...") if the
routing table doesn’t have a route to the destination yet — in
which case the caller can retry once pingwaves have propagated.
Sourcepub async fn probe_reflex(
&self,
peer_node_id: u64,
) -> Result<SocketAddr, TraversalError>
pub async fn probe_reflex( &self, peer_node_id: u64, ) -> Result<SocketAddr, TraversalError>
Send one reflex probe to peer_node_id and return the
public SocketAddr the peer observed on the probe’s UDP
envelope.
Waits up to super::traversal::TraversalConfig::reflex_timeout (default
3 s) for the response. Fails with super::traversal::TraversalError::ReflexTimeout
on timeout, super::traversal::TraversalError::PeerNotReachable if the peer
has no active session, or super::traversal::TraversalError::Transport on
a socket-level send failure.
Requires the nat-traversal cargo feature.
Sourcepub fn nat_class(&self) -> NatClass
pub fn nat_class(&self) -> NatClass
The current NAT classification for this node. Unknown
until the classification sweep has run; updated atomically
by the sweep and by Self::reclassify_nat. Read-only for
external callers — the sweep is the only writer.
Requires the nat-traversal cargo feature.
Sourcepub fn reflex_addr(&self) -> Option<SocketAddr>
pub fn reflex_addr(&self) -> Option<SocketAddr>
This node’s public-facing SocketAddr as observed by a
remote peer during the classification sweep. None before
the first sweep has produced an observation. Exposed
primarily for tests + observability; the announce-
capabilities path piggybacks this value onto every signed
CapabilityAnnouncement.
Requires the nat-traversal cargo feature.
Sourcepub fn set_reflex_override(&self, external: SocketAddr)
pub fn set_reflex_override(&self, external: SocketAddr)
Install a runtime reflex override. Forces nat_class = Open and reflex_addr = Some(external) immediately, and
short-circuits any further classifier sweeps until
Self::clear_reflex_override is called.
§Publishing to peers
This method updates only local state. To propagate the
change to peers, call Self::announce_capabilities
afterward. The setter resets the announce rate-limit
floor so the next announce is guaranteed to broadcast
rather than coalesce against the previous send — cubic
P2 pinned this, after flagging that callers who set an
override within min_announce_interval of a prior
announce would find peers still seeing the old reflex.
Optimization, not correctness. A node with no override still reaches every peer through the routed-handshake path; the override just pins the publicly-advertised address when it’s already known (port-forwarded server, a successful stage-4 port-mapping install, etc).
Safe to call concurrently with announce_capabilities —
the triple-write runs under traversal_publish_mu
(alongside the announce’s multi-field read), so a
concurrent announce either sees the pre-override state
or the fully-installed override, never a torn mix.
Requires the nat-traversal cargo feature.
Sourcepub fn clear_reflex_override(&self)
pub fn clear_reflex_override(&self)
Drop a previously-installed runtime reflex override. The
classifier sweep resumes on its normal cadence; the next
sweep repopulates reflex_addr and nat_class from real
probe observations. reflex_addr is cleared to None
immediately so a between-sweep read doesn’t return a stale
override value as “still current.”
§Publishing to peers
Mirrors Self::set_reflex_override: only local state
changes here. Call Self::announce_capabilities after
this to tell peers. The rate-limit floor is reset so that
call broadcasts unconditionally.
No-op when no override is active — safe to call unconditionally during shutdown / port-mapping revoke paths.
Requires the nat-traversal cargo feature.
Sourcepub async fn request_punch(
&self,
relay: u64,
target: u64,
self_reflex: SocketAddr,
) -> Result<PunchIntroduce, TraversalError>
pub async fn request_punch( &self, relay: u64, target: u64, self_reflex: SocketAddr, ) -> Result<PunchIntroduce, TraversalError>
Send a PunchRequest to a coordinator peer relay, asking
it to mediate a hole-punch to target. Returns the
PunchIntroduce produced by the coordinator (the one
arriving on this node’s side of the introduction — carrying
target’s reflex and the shared fire_at).
This is a stage-3b primitive: it exercises the coordinator
fan-out end-to-end but does not itself schedule the
keep-alive train or finalize the punched session. The
full connect_direct flow lands in stage 3c.
Fails with:
super::traversal::TraversalError::PeerNotReachableifrelayhas no active session.super::traversal::TraversalError::Transporton a socket-level send failure.super::traversal::TraversalError::PunchFailedif the coordinator doesn’t introduce withinsuper::traversal::TraversalConfig::punch_deadline(likely: R has no cached reflex fortarget).
Requires the nat-traversal cargo feature.
Sourcepub async fn await_punch_introduce(
&self,
counterpart: u64,
coordinator: u64,
) -> Result<PunchIntroduce, TraversalError>
pub async fn await_punch_introduce( &self, counterpart: u64, coordinator: u64, ) -> Result<PunchIntroduce, TraversalError>
Install a waiter for an incoming PunchIntroduce from
counterpart, brokered by coordinator. The dispatch arm
admits the introduce only when the session peer matches
coordinator. The returned future resolves when the
dispatcher decodes a matching introduce, or with
super::traversal::TraversalError::PunchFailed after
super::traversal::TraversalConfig::punch_deadline.
Stage-3b responder-side primitive: the peer being punched
into uses this to observe the introduce without
initiating the flow itself. The responder must know which
coordinator will forward the introduce — supply that node’s
id as coordinator. Stage 3c wires the keep-alive train
onto the returned introduce.
Requires the nat-traversal cargo feature.
Sourcepub async fn await_punch_ack(
&self,
counterpart: u64,
coordinator: u64,
) -> Result<PunchAck, TraversalError>
pub async fn await_punch_ack( &self, counterpart: u64, coordinator: u64, ) -> Result<PunchAck, TraversalError>
Install a waiter for an incoming PunchAck whose
from_peer matches counterpart. Stage-3d correlation
surface — the SinglePunch path in connect_direct
registers the waiter before firing request_punch and
awaits it afterward. Times out with
super::traversal::TraversalError::PunchFailed after
super::traversal::TraversalConfig::punch_deadline.
Note: the caller inserts the oneshot sender into
pending_punch_acks before issuing the request so the ack
can’t arrive and be dropped before the await call is
entered.
Requires the nat-traversal cargo feature.
Sourcepub async fn reclassify_nat(&self)
pub async fn reclassify_nat(&self)
Fire the classification sweep. Picks up to two currently-
connected peers, runs Self::probe_reflex against each in
parallel, feeds the observations to
super::traversal::classify::ClassifyFsm, and updates
nat_class + reflex_addr with the result.
Runs at most one sweep at a time — a second call while a
sweep is in flight is a no-op. Exits early if fewer than 2
peers are currently connected; callers should check
Self::nat_class after the returned future completes to
see whether classification produced a definite verdict or
stayed at Unknown.
Bounded by super::traversal::TraversalConfig::classify_deadline — even if
probes hang, the sweep returns within that window with
whatever observations arrived.
Requires the nat-traversal cargo feature.
Source§impl MeshNode
impl MeshNode
Sourcepub fn serve_rpc<H: RpcHandler>(
self: &Arc<Self>,
service: &str,
handler: Arc<H>,
) -> Result<ServeHandle, ServeError>
pub fn serve_rpc<H: RpcHandler>( self: &Arc<Self>, service: &str, handler: Arc<H>, ) -> Result<ServeHandle, ServeError>
Register an nRPC handler for service on this node.
Subscribes this node to <service>.requests (so the local
register_rpc_inbound dispatcher feeds inbound REQUEST
events into the RpcServerFold) and wires the fold’s
RESPONSE-emit callback to publish on
<service>.replies.<caller_origin> via the existing
pub/sub path.
Local-only registration (Phase 1). Multi-instance
services that load-balance via SubscriptionMode::QueueGroup
require each replica to call serve_rpc on its own node;
the mesh-level subscriber roster + dispatch_recipients
then routes one-of-N as designed. Each replica’s local
serve_rpc must use the same service name (which becomes
the queue-group identifier).
Returns a ServeHandle whose Drop tears down the
registration. Concurrent registrations for the same service
on one node return Err(ServeError::AlreadyServing).
Sourcepub fn serve_rpc_streaming<H: RpcStreamingHandler>(
self: &Arc<Self>,
service: &str,
handler: Arc<H>,
) -> Result<ServeHandle, ServeError>
pub fn serve_rpc_streaming<H: RpcStreamingHandler>( self: &Arc<Self>, service: &str, handler: Arc<H>, ) -> Result<ServeHandle, ServeError>
Streaming variant of Self::serve_rpc. The handler
receives an RpcResponseSink
it writes chunks to via sink.send(body); returning
Ok(()) closes the stream cleanly, Err(_) closes with
an error frame.
Wire-level identical to the unary path apart from the
per-chunk nrpc-streaming header markers
(continue / end). Same auto-registration of
<service>.requests + <service>.replies. prefix.
Sourcepub fn serve_rpc_client_stream<H: RpcClientStreamingHandler>(
self: &Arc<Self>,
service: &str,
handler: Arc<H>,
) -> Result<ServeHandle, ServeError>
pub fn serve_rpc_client_stream<H: RpcClientStreamingHandler>( self: &Arc<Self>, service: &str, handler: Arc<H>, ) -> Result<ServeHandle, ServeError>
Register a client-streaming nRPC handler for service.
Mirror of Self::serve_rpc_streaming but using the
request-side fold (RpcStreamingRequestFold) — the
handler receives one stream of REQUEST_CHUNK bodies and
emits one terminal RESPONSE.
Wires two emit callbacks:
- A sync
RpcResponseEmitterfor the terminal RESPONSE (single emit per call, no ordering concern). - An
RpcRequestGrantEmitterfor upload-direction credit grants, which publishesDISPATCH_RPC_REQUEST_GRANTevents on the caller’s reply channel.
Bidi streaming plan (Phase C).
Sourcepub async fn call_client_stream(
self: &Arc<Self>,
target_node_id: u64,
service: &str,
opts: CallOptions,
) -> Result<ClientStreamCallRaw, RpcError>
pub async fn call_client_stream( self: &Arc<Self>, target_node_id: u64, service: &str, opts: CallOptions, ) -> Result<ClientStreamCallRaw, RpcError>
Client-streaming variant of Self::call. Returns a
ClientStreamCallRaw handle the caller pushes N items
into via send, then finish to await the terminal
RESPONSE.
Lazy initial REQUEST. This method does NOT publish a
REQUEST to the wire. It only ensures the caller’s reply
subscription is set up and registers the pending entry; the
initial REQUEST is emitted by the first send (or by
finish for the zero-item degenerate path).
Sets FLAG_RPC_CLIENT_STREAMING_REQUEST on the initial
REQUEST so the server’s request-streaming fold knows to
open a request-side stream. Optional request_window_initial
header opts into upload-direction flow control.
Bidi streaming plan (Phase C).
Sourcepub fn serve_rpc_duplex<H: RpcDuplexHandler>(
self: &Arc<Self>,
service: &str,
handler: Arc<H>,
) -> Result<ServeHandle, ServeError>
pub fn serve_rpc_duplex<H: RpcDuplexHandler>( self: &Arc<Self>, service: &str, handler: Arc<H>, ) -> Result<ServeHandle, ServeError>
Register a duplex nRPC handler for service. Composes
Self::serve_rpc_client_stream (request-side stream)
with Self::serve_rpc_streaming (response-side multi-
fire emit) via RpcDuplexFold.
Wires THREE emit callbacks:
- Async
RpcAsyncResponseEmitterfor response chunks + the terminal frame (per-call ordering required because the response side is multi-fire). RpcRequestGrantEmitterfor upload-direction credit grants (one per consumed request chunk when flow control is opted into).
Bidi streaming plan (Phase D).
Sourcepub async fn call_duplex(
self: &Arc<Self>,
target_node_id: u64,
service: &str,
opts: CallOptions,
) -> Result<DuplexCallRaw, RpcError>
pub async fn call_duplex( self: &Arc<Self>, target_node_id: u64, service: &str, opts: CallOptions, ) -> Result<DuplexCallRaw, RpcError>
Duplex variant of Self::call. Returns a
DuplexCallRaw handle with both upload (send,
finish_sending) and download (next, or impl
futures::Stream) surfaces. Use into_split to peel off
the two halves for the “encoder task + decoder task”
shape.
Initial REQUEST sets BOTH FLAG_RPC_CLIENT_STREAMING_REQUEST
AND FLAG_RPC_STREAMING_RESPONSE. Lazy publish — the
initial REQUEST flies on the first send (or on
finish_sending for the zero-item degenerate path).
Bidi streaming plan (Phase D).
Sourcepub async fn call_streaming(
self: &Arc<Self>,
target_node_id: u64,
service: &str,
payload: Bytes,
opts: CallOptions,
) -> Result<RpcStream, RpcError>
pub async fn call_streaming( self: &Arc<Self>, target_node_id: u64, service: &str, payload: Bytes, opts: CallOptions, ) -> Result<RpcStream, RpcError>
Streaming variant of Self::call. Returns an
RpcStream that yields chunks (as Result<Bytes, RpcError>)
until the server closes the stream.
Sets FLAG_RPC_STREAMING_RESPONSE on the request so the
server’s streaming fold knows to expect multi-fire emits.
Same lazy reply-subscription + direct-unicast REQUEST
as the unary call path.
Cancellation: dropping the returned RpcStream emits a
CANCEL to the server (best-effort) and discards any
in-flight chunks.
Sourcepub fn find_service_nodes(&self, service: &str) -> Vec<u64>
pub fn find_service_nodes(&self, service: &str) -> Vec<u64>
Find every node currently advertising service via the
nrpc:<service> capability tag. Returns node IDs in
roster order; the caller picks one (or use Self::call_service
for the round-robin shortcut).
Pre-Phase 2: requires the target nodes to have called
serve_rpc AND announce_capabilities so the
nrpc:<service> tag has propagated through capability
announcements. The local node’s own services are NOT
automatically included (callers don’t typically invoke
themselves via the network — for in-process invocation,
the user has the handler directly).
Sourcepub async fn call_service(
self: &Arc<Self>,
service: &str,
payload: Bytes,
opts: CallOptions,
) -> Result<RpcReply, RpcError>
pub async fn call_service( self: &Arc<Self>, service: &str, payload: Bytes, opts: CallOptions, ) -> Result<RpcReply, RpcError>
Issue an RPC call to service, picking one node from
those advertising the nrpc:<service> tag in the local
capability index according to opts.routing_policy.
Returns RpcError::NoRoute if no nodes advertise the
service (or if opts.filter_unhealthy is set and every
candidate is unavailable per the local ProximityGraph).
Sourcepub async fn call_service_streaming(
self: &Arc<Self>,
service: &str,
payload: Bytes,
opts: CallOptions,
) -> Result<RpcStream, RpcError>
pub async fn call_service_streaming( self: &Arc<Self>, service: &str, payload: Bytes, opts: CallOptions, ) -> Result<RpcStream, RpcError>
Capability-routed server-streaming call. Same routing as
call_service — capability-fold lookup, health filter,
routing-policy sort, capability-auth gate, target selection —
but the terminal step is call_streaming instead of
call. Returns the substrate’s RpcStream so callers can
drive an async for chunk in stream: loop.
Use cases: an agent invoking a long-running tool that emits
progress + a terminal result, a fan-out subscriber that wants
streaming chunks from whatever node currently advertises the
service, any consumer that today reaches for
find_service_nodes → manual target selection → call_streaming
and ends up re-implementing the cap-auth gate call_service
already enforces.
Honors CallOptions::cancel_token (v3) and
CallOptions::deadline exactly like call_streaming.
Sourcepub async fn call(
self: &Arc<Self>,
target_node_id: u64,
service: &str,
payload: Bytes,
opts: CallOptions,
) -> Result<RpcReply, RpcError>
pub async fn call( self: &Arc<Self>, target_node_id: u64, service: &str, payload: Bytes, opts: CallOptions, ) -> Result<RpcReply, RpcError>
Issue an RPC call to target_node_id for service.
Phase 1 — direct entity-to-entity addressing. The caller specifies which target to send to; service discovery (the “find me a healthy instance of X” lookup) is Phase 2.
Lazily subscribes the local node’s RpcClientFold to
<service>.replies.<self_origin> from target_node_id on
the first call to that (target, service) pair. The
subscription is reused across subsequent calls.
On opts.deadline expiring OR the future being dropped,
emits a CANCEL event so the server can drop the in-flight
handler.
Trait Implementations§
Source§impl Adapter for MeshNode
impl Adapter for MeshNode
Source§fn init<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn init<'life0, 'async_trait>(
&'life0 mut self,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn on_batch<'life0, 'async_trait>(
&'life0 self,
batch: Arc<Batch>,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn on_batch<'life0, 'async_trait>(
&'life0 self,
batch: Arc<Batch>,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn flush<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn flush<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn shutdown<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn shutdown<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn poll_shard<'life0, 'life1, 'async_trait>(
&'life0 self,
shard_id: u16,
from_id: Option<&'life1 str>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ShardPollResult, AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn poll_shard<'life0, 'life1, 'async_trait>(
&'life0 self,
shard_id: u16,
from_id: Option<&'life1 str>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ShardPollResult, AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§impl BlobHeatSink for MeshNode
impl BlobHeatSink for MeshNode
Source§fn announce_blob_heat<'life0, 'async_trait>(
&'life0 self,
hash: [u8; 32],
rate: f64,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn announce_blob_heat<'life0, 'async_trait>(
&'life0 self,
hash: [u8; 32],
rate: f64,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
heat:blob:<hex>=<rate> reserved
tag for chunk hash. Idempotent — most recent call wins.Source§fn withdraw_blob_heat<'life0, 'async_trait>(
&'life0 self,
hash: [u8; 32],
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn withdraw_blob_heat<'life0, 'async_trait>(
&'life0 self,
hash: [u8; 32],
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
heat:blob:<hex>=* tag for chunk hash.
Idempotent; mirrors HeatSink::withdraw_heat.Source§fn announce_blob_heat_batch<'life0, 'life1, 'async_trait>(
&'life0 self,
updates: &'life1 [([u8; 32], Option<f64>)],
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn announce_blob_heat_batch<'life0, 'life1, 'async_trait>(
&'life0 self,
updates: &'life1 [([u8; 32], Option<f64>)],
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§impl ChainTagSink for MeshNode
Substrate impl: route through MeshNode::announce_chain /
MeshNode::withdraw_chain. This is the production sink the
ReplicationCoordinator uses when a real MeshNode is
wired in.
impl ChainTagSink for MeshNode
Substrate impl: route through MeshNode::announce_chain /
MeshNode::withdraw_chain. This is the production sink the
ReplicationCoordinator uses when a real MeshNode is
wired in.
Source§fn announce_chain<'life0, 'async_trait>(
&'life0 self,
origin_hash: u64,
tip_seq: u64,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn announce_chain<'life0, 'async_trait>(
&'life0 self,
origin_hash: u64,
tip_seq: u64,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
origin_hash up to tip_seq.
Idempotent — repeated calls with the same origin_hash
replace the prior advertisement.Source§impl HeatSink for MeshNode
impl HeatSink for MeshNode
Source§fn announce_heat<'life0, 'async_trait>(
&'life0 self,
origin_hash: u64,
rate: f64,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn announce_heat<'life0, 'async_trait>(
&'life0 self,
origin_hash: u64,
rate: f64,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
heat:<hex>=<rate> reserved tag for
origin_hash. Idempotent — the most recent call wins.
Rate clamping (substrate enforces [0.0, 1.0]) happens
inside the impl.Source§fn withdraw_heat<'life0, 'async_trait>(
&'life0 self,
origin_hash: u64,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn withdraw_heat<'life0, 'async_trait>(
&'life0 self,
origin_hash: u64,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
heat:<hex>=* tag for origin_hash.
Idempotent. Peers drop the heat annotation; the chain’s
causal: advertisements stay.Source§fn announce_heat_batch<'life0, 'life1, 'async_trait>(
&'life0 self,
updates: &'life1 [(u64, Option<f64>)],
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn announce_heat_batch<'life0, 'life1, 'async_trait>(
&'life0 self,
updates: &'life1 [(u64, Option<f64>)],
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
(origin_hash, Option<rate>) updates in a
single round-trip. Some(rate) is emit/replace; None is
withdraw. The gravity tick uses this to coalesce per-chain
emissions into one capability rebroadcast — without it the
per-tick wire cost was O(n_chains × n_tags). Read moreSource§impl ReplicationDispatcher for MeshNode
impl ReplicationDispatcher for MeshNode
Source§fn send_heartbeat<'life0, 'async_trait>(
&'life0 self,
target: NodeId,
msg: SyncHeartbeat,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn send_heartbeat<'life0, 'async_trait>(
&'life0 self,
target: NodeId,
msg: SyncHeartbeat,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn send_sync_request<'life0, 'async_trait>(
&'life0 self,
target: NodeId,
msg: SyncRequest,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn send_sync_request<'life0, 'async_trait>(
&'life0 self,
target: NodeId,
msg: SyncRequest,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn send_sync_response<'life0, 'async_trait>(
&'life0 self,
target: NodeId,
msg: SyncResponse,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn send_sync_response<'life0, 'async_trait>(
&'life0 self,
target: NodeId,
msg: SyncResponse,
) -> Pin<Box<dyn Future<Output = Result<(), AdapterError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
SyncResponse to target (typically a replica
catching up). See trait-level note on Ok(()) semantics.