Skip to main content

Mesh

Struct Mesh 

Source
pub struct Mesh { /* private fields */ }
Expand description

A multi-peer mesh node.

Manages encrypted connections to multiple peers over a single UDP socket. Supports direct peer-to-peer sends, routed multi-hop sends, automatic failure detection, and rerouting.

Implementations§

Source§

impl Mesh

Source

pub fn builder(bind_addr: &str, psk: &[u8; 32]) -> Result<MeshBuilder>

Create a builder.

Source

pub fn public_key(&self) -> &[u8; 32]

Get this node’s Noise public key.

Share this with peers so they can connect to this node.

Source

pub fn node_id(&self) -> u64

Get this node’s ID (derived from ed25519 identity).

Source

pub fn local_addr(&self) -> SocketAddr

Get the local bind address.

Source

pub fn set_rpc_observer(&self, observer: Option<RpcObserverHandle>)

Install (or clear with None) the caller-side nRPC observer for this Mesh. Fires on every call_typed completion (success / server error / timeout / transport error) with a typed crate::mesh_rpc::RpcCallEvent. See DECK_DEMO_HARNESS_PLAN.md Missing Item D for the design rationale.

Replaces any previously-installed observer. Observers run inline on the dispatch task; implementations must be cheap (push into a bounded ring / mpsc, not block).

v1 fires only RpcDirection::Outbound; server-side (inbound) firing is a follow-up.

Source

pub async fn connect( &self, peer_addr: &str, peer_pubkey: &[u8; 32], peer_node_id: u64, ) -> Result<()>

Connect to a peer as initiator.

The peer must be listening (call accept() on their side). peer_pubkey is the peer’s Noise public key from public_key().

Source

pub async fn accept(&self, peer_node_id: u64) -> Result<SocketAddr>

Accept an incoming connection as responder.

Waits for a peer to initiate a Noise handshake. Returns the peer’s address.

Source

pub async fn connect_via( &self, relay_addr: &str, peer_pubkey: &[u8; 32], peer_node_id: u64, ) -> Result<()>

Connect to a peer when the responder is already start()ed and hasn’t pre-accept()’d this initiator’s node_id — the standard “remote-attach against a running daemon” case. Mirror of Self::connect for that scenario; the local side must also have start() called before this method (the dispatch loop is what completes the handshake).

relay_addr is the wire address to send msg1 to. The degenerate single-hop case (relay == final destination) is the CLI remote-attach pattern; the multi-hop case (relay forwards to dest) is the same call signature. Either way the destination’s running dispatch loop receives msg1 via the routed-handshake protocol and replies with msg2.

§Why a separate method from connect?

connect uses the direct-handshake protocol, where the responder must pre-register the initiator’s node_id via accept() before its start(). connect_via uses the routed-handshake protocol — the initiator’s full node_id rides inside the Noise msg1 payload, so the responder learns it on demand. No pre-accept needed.

Source

pub fn start(&self)

Start the receive loop, heartbeat sender, and router.

Call this after connecting to peers. Events won’t be received until start() is called.

Source

pub fn peer_count(&self) -> usize

Number of connected peers.

Source

pub async fn send_to( &self, peer_addr: &str, event: &impl Serialize, ) -> Result<()>

Send a serializable event to a direct peer.

Source

pub async fn send( &self, dest_node_id: u64, event: &impl Serialize, ) -> Result<()>

Send a serializable event via the routing table.

The event is encrypted for the destination and forwarded through intermediate nodes if needed. Requires a route to dest_node_id in the routing table and a session with the destination.

Source

pub async fn send_raw_to(&self, peer_addr: &str, data: &[u8]) -> Result<()>

Send raw bytes to a direct peer.

Source

pub async fn recv(&self, limit: usize) -> Result<Vec<StoredEvent>>

Poll for received events.

Returns up to limit events from all shards.

Source

pub async fn recv_shard( &self, shard_id: u16, limit: usize, ) -> Result<Vec<StoredEvent>>

Poll a specific shard for events.

Source

pub fn register_channel(&self, config: ChannelConfig)

Register a channel on this publisher. Subscribers who ask to join are validated against config before being added to the subscriber roster.

config.channel_id must be built from the same canonical name subscribers pass to subscribe_channel. The registry keys on the canonical name (not the u16 hash) to avoid ACL bypass via hash collision.

Idempotent: re-registering the same channel replaces the prior config.

Source

pub async fn subscribe_channel( &self, publisher_node_id: u64, channel: &ChannelName, ) -> Result<()>

Ask publisher_node_id to add this node to channel’s subscriber set. Blocks until the publisher’s Ack arrives or the mesh’s membership-ack timeout elapses.

Returns Ok(()) on acceptance; rejection (unauthorized / unknown channel / rate-limited / too-many-channels) surfaces as SdkError::ChannelRejected(reason). Network-level failures surface as SdkError::Adapter(...).

This bare form presents no credential. On a token-gated channel it is always rejected — the publisher requires a token chain on every subscribe and does not honor a credential presented on a previous subscribe (e.g. before a reconnect or roster eviction). Re-subscribe with Self::subscribe_channel_with carrying the token each time.

Source

pub async fn subscribe_channel_with( &self, publisher_node_id: u64, channel: &ChannelName, opts: SubscribeOptions, ) -> Result<()>

Subscribe with options — optionally presenting a PermissionToken.

Use this when the publisher registered the channel with token_roots (token enforcement) and/or a subscribe_caps filter that your node’s capabilities alone don’t satisfy. The publisher verifies the presented token chain on arrival — it must root at one of the channel’s token_roots, bind at its leaf to the subscribing peer’s EntityId, and authorize SUBSCRIBE at every link — then retains it to re-check expiry and revocation while the subscription lives.

The credential must be presented on every subscribe: a previously-accepted chain is not reused for a later bare Self::subscribe_channel, so after a reconnect or roster eviction you must call this again with the token.

Source

pub async fn unsubscribe_channel( &self, publisher_node_id: u64, channel: &ChannelName, ) -> Result<()>

Mirror of Self::subscribe_channel. Idempotent on the publisher side — unsubscribing a non-subscriber still returns Ok(()).

Source

pub async fn publish( &self, channel: &ChannelName, payload: Bytes, config: PublishConfig, ) -> Result<PublishReport>

Publish one payload to every subscriber of channel. config.on_failure controls whether per-peer errors short-circuit the fan-out. Returns a PublishReport describing per-peer outcomes.

Source

pub async fn publish_many( &self, channel: &ChannelName, payloads: &[Bytes], config: PublishConfig, ) -> Result<PublishReport>

Fan multiple payloads to every subscriber of channel as one batch per subscriber. Semantics match Self::publish.

Source

pub fn add_route(&self, dest_node_id: u64, next_hop_addr: &str) -> Result<()>

Add a route to a destination node.

Packets sent to dest_node_id via send() will be forwarded through next_hop_addr.

Source

pub fn remove_route(&self, dest_node_id: u64)

Remove a route.

Source

pub fn block_peer(&self, peer_addr: &str) -> Result<()>

Block a peer (simulate network partition).

Source

pub fn unblock_peer(&self, peer_addr: &str) -> Result<()>

Unblock a peer.

Source

pub fn discovered_nodes(&self) -> usize

Number of nodes discovered via pingwave propagation.

Source

pub fn active_reroutes(&self) -> usize

Number of active reroutes (routes using alternates after failure).

Source

pub fn open_stream( &self, peer_node_id: u64, stream_id: u64, config: StreamConfig, ) -> Result<Stream>

Open (or look up) a logical stream to a peer. See net::adapter::net::MeshNode::open_stream for the full contract. Repeated calls for the same (peer, stream_id) are idempotent; the first open wins and subsequent configs are logged and ignored.

Source

pub fn close_stream(&self, peer_node_id: u64, stream_id: u64)

Close a stream: drop its StreamState and free the window. Idempotent.

Source

pub async fn send_on_stream( &self, stream: &Stream, events: &[Bytes], ) -> Result<()>

Send a batch of events on an explicit stream.

Returns SdkError::Backpressure when the stream’s per-stream in-flight window is full (no events were sent — the caller decides whether to drop, retry, or buffer). SdkError::NotConnected when the peer session is gone. All other failures surface as SdkError::Adapter.

Source

pub async fn send_with_retry( &self, stream: &Stream, events: &[Bytes], max_retries: usize, ) -> Result<()>

Send events, retrying on Backpressure with exponential backoff (5 ms → 200 ms, doubling) up to max_retries times. Transport errors and NotConnected are returned immediately.

Source

pub async fn send_blocking( &self, stream: &Stream, events: &[Bytes], ) -> Result<()>

Block the calling task until the send succeeds or a transport error occurs. See Mesh::send_with_retry for finer control.

Source

pub fn stream_stats( &self, peer_node_id: u64, stream_id: u64, ) -> Option<StreamStats>

Snapshot of per-stream stats (tx/rx seq, window, in-flight, backpressure count, activity).

Source

pub fn all_stream_stats(&self, peer_node_id: u64) -> Vec<(u64, StreamStats)>

Snapshot stats for every stream in the session to peer_node_id.

Source

pub async fn announce_capabilities(&self, caps: CapabilitySet) -> Result<()>

Announce this node’s capabilities to every directly-connected peer. Self-indexes too, so find_nodes called from this same node matches on the announcement. Multi-hop propagation is deferred — peers more than one hop away will not see the announcement.

Default TTL is 5 minutes; use Self::announce_capabilities_with to override.

Source

pub async fn announce_capabilities_with( &self, caps: CapabilitySet, ttl: Duration, sign: bool, ) -> Result<()>

Extended announce with explicit TTL and signing opt-in. sign = true is accepted but currently a no-op; signatures tie in with Stage E (channel auth), once node_idEntityId binding is wired.

Source

pub fn find_nodes(&self, filter: &CapabilityFilter) -> Vec<u64>

Query the capability index. Returns node ids whose latest announcement matches filter; includes our own node_id if our own announcement matches.

Source

pub fn find_nodes_scoped( &self, filter: &CapabilityFilter, scope: &ScopeFilter<'_>, ) -> Vec<u64>

Scoped variant of Self::find_nodes. Filters candidates through a crate::capabilities::ScopeFilter derived from each node’s scope:* reserved tags. Untagged nodes resolve to Global and remain visible under most filters by design; nodes tagged scope:subnet-local only show up under crate::capabilities::ScopeFilter::SameSubnet. See docs/SCOPED_CAPABILITIES_PLAN.md for the full table.

Source

pub fn find_best_node(&self, req: &CapabilityRequirement) -> Option<u64>

Pick the single best-scoring node for a placement requirement. Returns the winning node’s id, or None if no node matches.

Source

pub fn find_best_node_scoped( &self, req: &CapabilityRequirement, scope: &ScopeFilter<'_>, ) -> Option<u64>

Scoped variant of Self::find_best_node. Picks the highest- scoring node within the scope-filtered candidate set.

Source

pub fn capability_aggregate( &self, matcher: Option<TagMatcher>, group_by: GroupBy, agg: Aggregation, ) -> Vec<(String, u64)>

Bucketed aggregation over the local capability fold. Composes TagMatcher × GroupBy × Aggregation into a Vec<(bucket, value)> sorted lex by bucket key. Phase 6c-A of MULTIFOLD_PHASE_6C_CAPACITY_AGGREGATION.md.

matcher = None walks every entry. Returns an empty vec when no entries match.

Source

pub fn capability_capacity_ranking<R>( &self, query: CapacityQuery, rtt_lookup: R, ) -> Vec<CapacityRow>
where R: Fn(u64) -> Option<u32>,

Capacity-ranked materialized view. Wraps Self::capability_aggregate with per-bucket state breakdown (idle / busy / reserved), an RTT gate, and optional summed numeric capacity. Phase 6c-B of MULTIFOLD_PHASE_6C_CAPACITY_AGGREGATION.md.

rtt_lookup maps a publisher’s node_id to current RTT in milliseconds. When query.max_rtt_ms is None, the closure is never invoked; when set, publishers whose lookup returns None are dropped (fail-closed — never-pinged nodes don’t ride a “fastest available” filter as zero).

Faulty entries are always excluded. Rows return sorted by available descending; ties broken by bucket key ascending. Truncated to query.limit (0 = no truncation).

§Example
use net_sdk::capabilities::{
    CapabilitySet, CapacityQuery, GroupBy, TagMatcher,
};
use net_sdk::mesh::MeshBuilder;

let node = MeshBuilder::new("127.0.0.1:0", &[0x42u8; 32])?
    .build()
    .await?;
node.announce_capabilities(
    CapabilitySet::new()
        .add_tag("hardware.gpu")
        .add_tag("hardware.gpu.h100")
        .add_tag("hardware.gpu.count=8"),
)
.await?;

// Top GPU types by available capacity, no RTT filter,
// summed count column populated.
let view = node.capability_capacity_ranking(
    CapacityQuery {
        matcher: Some(TagMatcher::Prefix { value: "hardware.gpu".into() }),
        group_by: GroupBy::TagStem { prefix: "hardware.gpu".into() },
        max_rtt_ms: None,
        sum_axis_key: Some("hardware.gpu.count".into()),
        limit: 5,
    },
    |_node_id| None,
);
// Self-match: one bucket per stem this node carries.
assert!(view.iter().any(|row| row.bucket == "h100"));
Source

pub fn set_migration_handler( &mut self, handler: Arc<MigrationSubprotocolHandler>, )

Set a migration handler (for Mikoshi daemon migration).

Source

pub async fn shutdown(self) -> Result<()>

Gracefully shut down.

Source

pub fn inner(&self) -> &MeshNode

Get a reference to the underlying MeshNode.

Source

pub fn node_arc(&self) -> Arc<MeshNode>

Clone the Arc-shared MeshNode handle out of the mesh.

Used by FFI bindings (currently: NAPI) that need to hand the same live node to the net-sdk::compute::DaemonRuntime and to their own wrapper class without constructing a second UDP socket. All public MeshNode operations go through &MeshNode, so two Arc holders observe exactly the same state.

Source

pub fn from_node_arc( node: Arc<MeshNode>, channel_configs: Arc<ChannelConfigRegistry>, identity: Option<Identity>, ) -> Self

Construct a Mesh that shares an existing MeshNode with another owner. Used by FFI bindings that already hold an Arc<MeshNode> (e.g. NAPI’s NetMesh) and need a Mesh wrapper so the SDK’s DaemonRuntime can be built against the same live node.

Does not re-install channel_configs or a TokenCache — the owner of the original MeshNode is responsible for that wiring. Supplied channel_configs / identity arguments are held onto here purely so the Mesh’s own helpers (channel registration lookup, identity getter) have data to return.

Source

pub fn identity(&self) -> Option<&Identity>

Caller-owned identity bound to this mesh, if any. Returns None for meshes built without .identity(...) (ephemeral keypair).

Source

pub fn nat_type(&self) -> NatClass

Current NAT classification for this mesh’s public face, as observed against other peers during the classification sweep. One of Open, Cone, Symmetric, or Unknown (pre-sweep or insufficient data).

Optimization, not correctness. A Symmetric classification doesn’t prevent this mesh from communicating with any peer — it just means the direct- punch optimization is unlikely to succeed against some peers, and traffic will keep riding the routed path.

Requires the nat-traversal cargo feature.

Source

pub fn reflex_addr(&self) -> Option<SocketAddr>

This mesh’s public-facing SocketAddr as observed by a remote peer, or None before the first classification sweep has produced an observation.

Piggybacks on outbound CapabilityAnnouncements so peers can attempt a direct-connect without a separate discovery round-trip. Read by peers implementing the connect_direct rendezvous path.

Requires the nat-traversal cargo feature.

Source

pub fn peer_nat_type(&self, peer_node_id: u64) -> NatClass

The NAT classification most recently advertised by peer_node_id (parsed from the nat:* tag on their capability announcement). Returns NatClass::Unknown when the peer hasn’t announced or was compiled without NAT traversal — the pair-type matrix treats Unknown as “attempt direct, fall back on failure,” not as “don’t attempt.”

Requires the nat-traversal cargo feature.

Source

pub async fn probe_reflex(&self, peer_node_id: u64) -> Result<SocketAddr>

Send one reflex probe to peer_node_id and return the public SocketAddr the peer observed on the probe’s UDP envelope. Useful for tests and for operators diagnosing a NAT-type classification that seems off.

Times out after TraversalConfig::reflex_timeout (3 s default) on network delays, and fast-fails with peer-not-reachable on an unknown peer.

Requires the nat-traversal cargo feature.

Source

pub async fn reclassify_nat(&self)

Explicitly re-run the NAT classification sweep against this node’s currently-connected peers. Normally the background loop (spawned by start()) takes care of this; call this after a suspected NAT rebind (e.g. a gateway reboot) to accelerate the re-classification.

No-op when fewer than 2 peers are connected — the two-probe rule needs two distinct targets to produce a classification. Never returns an error; a failed sweep leaves the previous classification intact.

Requires the nat-traversal cargo feature.

Source

pub async fn connect_direct( &self, peer_node_id: u64, peer_pubkey: &[u8; 32], coordinator: u64, ) -> Result<()>

Establish a session to peer_node_id via the rendezvous path, using the pair-type matrix to decide between a direct handshake and a relay-coordinated punch. The returned session is equivalent in correctness to connect() — the only difference is that a connect_direct that lands on the punched path cuts relay hops out of the data plane.

Optimization, not correctness. connect_direct always resolves: on a punch-failed outcome, the session is established via the routed-handshake fallback. Inspect traversal_stats() afterward to distinguish a successful punch from a relay fallback.

coordinator names a peer we already have a session with — typically a stable relay-capable node. The coordinator mediates the introduction; it doesn’t carry user-plane traffic once the punch succeeds.

Fails with an SdkError::Traversal variant whose kind is peer-not-reachable (no cached reflex for peer), transport (socket-level error on the final handshake), or (internal, retried on fallback) punch-failed.

Requires the nat-traversal cargo feature.

Source

pub fn traversal_stats(&self) -> TraversalStatsSnapshot

Cumulative counters for this mesh’s NAT-traversal activity: punch attempts, successful punches, and relay fallbacks. Monotonic — counters never reset. Useful for diagnostics + telemetry (success rate, relay load trends).

Requires the nat-traversal cargo feature.

Source

pub fn set_reflex_override(&self, external: SocketAddr)

Install a runtime reflex override. Forces nat_type() = "open" and reflex_addr() = Some(external) immediately, short-circuiting any further classifier sweeps.

Intended for operator-driven updates — a port-forward that went live mid-session, or a stage-4 port-mapping task that just installed a UPnP / NAT-PMP mapping. Builder-level MeshBuilder::reflex_override covers the startup-time case; this is the runtime equivalent.

Optimization, not correctness. Nodes without an override still reach every peer via the routed-handshake path. The override pins the publicly-advertised address when it’s already known.

Requires the nat-traversal cargo feature.

Source

pub fn clear_reflex_override(&self)

Drop a previously-installed reflex override. The classifier resumes on its normal cadence; the next sweep repopulates reflex_addr and nat_type from real probe observations. reflex_addr clears to None immediately so a between-sweep read doesn’t return a stale override.

No-op when no override is active — safe to call unconditionally during shutdown or a port-mapper revoke path.

Requires the nat-traversal cargo feature.

Source§

impl Mesh

Source

pub fn serve_rpc<H: RpcHandler>( &self, service: &str, handler: Arc<H>, ) -> Result<ServeHandle, ServeError>

Register a raw-bytes RPC handler on service. The user handler receives the request body as Bytes and returns the response body as Bytes. Wire codec is the user’s concern.

Auto-registers two ChannelConfig entries so the per-caller subscribe + per-call publish work under the SDK’s default ChannelConfigRegistry (which fail-closes on unknown channels):

  1. Exact-match <service>.requests — the channel callers publish REQUESTs onto.
  2. Prefix-match <service>.replies. — admits every <service>.replies.<caller_origin> subscribe that arrives, no per-caller pre-registration needed.

Both entries default to permissive (no publish_caps, no require_token) — channel-level ACLs on RPC traffic are a Phase 3 concern (alongside the per-service token allowlist). Operators who need RPC ACLs today can call register_channel / register_channel_prefix themselves before serve_rpc to override.

For typed handlers (auto serde), use Self::serve_rpc_typed.

Source

pub async fn call( &self, target_node_id: u64, service: &str, payload: Bytes, opts: CallOptions, ) -> Result<RpcReply, RpcError>

Direct-addressed call. Caller specifies target_node_id; the SDK does NOT consult the capability index.

Source

pub async fn call_service( &self, service: &str, payload: Bytes, opts: CallOptions, ) -> Result<RpcReply, RpcError>

Service-name call. Consults the capability index for nodes advertising nrpc:<service>, picks one per opts.routing_policy, calls.

Source

pub fn find_service_nodes(&self, service: &str) -> Vec<u64>

All node ids currently advertising nrpc:<service> in the local capability index. Useful for diagnostics + custom caller-side routing logic.

Source

pub fn rpc_metrics_snapshot(&self) -> RpcMetricsSnapshot

Snapshot of caller-side nRPC metrics for this Mesh. Cheap (one DashMap iteration); call on every Prometheus scrape. Use RpcMetricsSnapshot::prometheus_text to format as text/plain; version=0.0.4 for a /metrics endpoint.

Source

pub fn serve_rpc_typed<Req, Resp, F, Fut>( &self, service: &str, codec: Codec, handler: F, ) -> Result<ServeHandle, ServeError>
where Req: DeserializeOwned + Send + Sync + 'static, Resp: Serialize + Send + Sync + 'static, F: Fn(Req) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<Resp, String>> + Send + 'static,

Register a typed RPC handler on service. The handler receives a deserialized Req and returns either an Ok(Resp) (encoded as the response body) or an Err(message) (surfaced as RpcStatus::Internal with the message as the body).

Codec is the Codec passed to the handler factory; the same codec must be used by the caller.

Source

pub async fn call_typed<Req, Resp>( &self, target_node_id: u64, service: &str, request: &Req, opts: CallOptionsTyped, ) -> Result<Resp, RpcError>
where Req: Serialize, Resp: DeserializeOwned,

Direct-addressed typed call. Encodes request via opts.codec, calls the underlying raw call, decodes the reply body into Resp.

Source

pub async fn call_service_typed<Req, Resp>( &self, service: &str, request: &Req, opts: CallOptionsTyped, ) -> Result<Resp, RpcError>
where Req: Serialize, Resp: DeserializeOwned,

Service-name typed call. Same as Self::call_typed but uses the capability index to pick the target.

Source

pub fn serve_rpc_streaming<H: RpcStreamingHandler>( &self, service: &str, handler: Arc<H>, ) -> Result<ServeHandle, ServeError>

Register a raw-bytes streaming RPC handler on service. The handler receives the request body plus an RpcResponseSink it writes raw chunks to via sink.send(body). Wire codec is the user’s concern.

Same auto-registration as Self::serve_rpc (request channel plus reply prefix). For typed handlers (auto serde), use Self::serve_rpc_streaming_typed instead.

Source

pub async fn call_streaming( &self, target_node_id: u64, service: &str, payload: Bytes, opts: CallOptions, ) -> Result<RpcStream, RpcError>

Direct-addressed streaming call. Returns an RpcStream that yields raw chunks as Result<Bytes, RpcError>. Dropping the stream emits CANCEL to the server.

Source

pub async fn call_service_streaming( &self, service: &str, payload: Bytes, opts: CallOptions, ) -> Result<RpcStream, RpcError>

Service-name streaming call. Consults the capability index for nodes advertising nrpc:<service>, picks one per opts.routing_policy, opens a streaming call. Mirror of Self::call_service for the streaming response shape.

Source

pub fn serve_rpc_streaming_typed<Req, Resp, F, Fut>( &self, service: &str, codec: Codec, handler: F, ) -> Result<ServeHandle, ServeError>
where Req: DeserializeOwned + Send + Sync + 'static, Resp: Serialize + Send + Sync + 'static, F: Fn(Req, ResponseSinkTyped<Resp>) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<(), String>> + Send + 'static,

Register a typed streaming RPC handler. The handler receives a deserialized Req plus a ResponseSinkTyped<Resp> that auto-encodes each send(&value) per the codec. Returning Ok(()) closes the stream cleanly; Err(message) closes it with RpcStatus::Application(NRPC_TYPED_HANDLER_ERROR) and the message in the terminal frame’s body.

Source

pub async fn call_streaming_typed<Req, Resp>( &self, target_node_id: u64, service: &str, request: &Req, opts: CallOptionsTyped, ) -> Result<RpcStreamTyped<Resp>, RpcError>
where Req: Serialize, Resp: DeserializeOwned,

Direct-addressed typed streaming call. Encodes request via opts.codec, opens the streaming call, returns an RpcStreamTyped<Resp> that decodes each chunk on the fly. Decode failures terminate the stream with a single RpcError::ServerError(Internal) carrying the decode diagnostic.

Source

pub async fn call_service_streaming_typed<Req, Resp>( &self, service: &str, request: &Req, opts: CallOptionsTyped, ) -> Result<RpcStreamTyped<Resp>, RpcError>
where Req: Serialize, Resp: DeserializeOwned,

Service-name typed streaming call. Mirror of Self::call_streaming_typed but routes via the capability index. Riding Self::call_service_streaming underneath.

Source

pub fn serve_rpc_client_stream<H: RpcClientStreamingHandler>( &self, service: &str, handler: Arc<H>, ) -> Result<ServeHandle, ServeError>

Register a raw-bytes client-streaming RPC handler on service. The handler receives the request stream (raw chunk bodies) and returns one terminal response payload. Wire codec is the user’s concern; for typed handlers use Self::serve_rpc_client_stream_typed.

Source

pub async fn call_client_stream( &self, target_node_id: u64, service: &str, opts: CallOptions, ) -> Result<ClientStreamCallRaw, RpcError>

Direct-addressed raw client-streaming call. Returns a ClientStreamCallRaw handle. Push chunks via send, then finish to await the terminal RESPONSE.

Source

pub fn serve_rpc_client_stream_typed<Req, Resp, F, Fut>( &self, service: &str, codec: Codec, handler: F, ) -> Result<ServeHandle, ServeError>
where Req: DeserializeOwned + Send + Sync + Unpin + 'static, Resp: Serialize + Send + Sync + 'static, F: Fn(RequestStreamTyped<Req>) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<Resp, String>> + Send + 'static,

Register a typed client-streaming handler. Mirror of Self::serve_rpc_typed for the multi-request shape. Receives a RequestStreamTyped<Req> (auto-decodes each inbound chunk via codec), returns one terminal Resp (auto-encoded). Err(String) surfaces as RpcError::ServerError(Application(NRPC_TYPED_HANDLER_ERROR)).

Source

pub async fn call_client_stream_typed<Req, Resp>( &self, target_node_id: u64, service: &str, opts: CallOptionsTyped, ) -> Result<ClientStreamCallTyped<Req, Resp>, RpcError>
where Req: Serialize, Resp: DeserializeOwned,

Direct-addressed typed client-streaming call. Returns a ClientStreamCallTyped<Req, Resp> handle.

Source

pub fn serve_rpc_duplex<H: RpcDuplexHandler>( &self, service: &str, handler: Arc<H>, ) -> Result<ServeHandle, ServeError>

Register a raw-bytes duplex RPC handler on service. The handler receives both a request stream AND a response sink for emitting multi-fire response chunks. Returns Ok(()) to close cleanly; Err(RpcHandlerError) for failure mapping. For typed handlers use Self::serve_rpc_duplex_typed.

Source

pub async fn call_duplex( &self, target_node_id: u64, service: &str, opts: CallOptions, ) -> Result<DuplexCallRaw, RpcError>

Direct-addressed raw duplex call. Returns a DuplexCallRaw handle with both send and receive surfaces. Use into_split to peel off the two halves.

Source

pub fn serve_rpc_duplex_typed<Req, Resp, F, Fut>( &self, service: &str, codec: Codec, handler: F, ) -> Result<ServeHandle, ServeError>
where Req: DeserializeOwned + Send + Sync + Unpin + 'static, Resp: Serialize + Send + Sync + 'static, F: Fn(RequestStreamTyped<Req>, ResponseSinkTyped<Resp>) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<(), String>> + Send + 'static,

Register a typed duplex handler. Receives a RequestStreamTyped<Req> (auto-decodes inbound chunks) and a ResponseSinkTyped<Resp> (auto-encodes outbound chunks). Returns Ok(()) for clean close.

Source

pub async fn call_duplex_typed<Req, Resp>( &self, target_node_id: u64, service: &str, opts: CallOptionsTyped, ) -> Result<DuplexCallTyped<Req, Resp>, RpcError>
where Req: Serialize, Resp: DeserializeOwned,

Direct-addressed typed duplex call. Returns a DuplexCallTyped<Req, Resp> handle.

Source§

impl Mesh

Source

pub async fn call_with_retry( &self, target_node_id: u64, service: &str, payload: Bytes, opts: CallOptions, policy: &RetryPolicy, ) -> Result<RpcReply, RpcError>

Direct-addressed raw call with retry. Re-issues on transient failures per policy; the last error from the final attempt is returned on exhaustion. The underlying CallOptions is re-used for every attempt — note that opts.deadline is an absolute Instant and does NOT advance across retries, so the total wall-clock window is bounded by the initial deadline plus the sum of backoffs.

Source

pub async fn call_service_with_retry( &self, service: &str, payload: Bytes, opts: CallOptions, policy: &RetryPolicy, ) -> Result<RpcReply, RpcError>

Service-name raw call with retry. Each attempt re-runs the capability-index lookup + routing-policy selection — useful when a server failover happens mid-retry-window: the next attempt naturally lands on a different node. To pin a single target across retries, use Self::call_with_retry.

Source

pub async fn call_typed_with_retry<Req, Resp>( &self, target_node_id: u64, service: &str, request: &Req, opts: CallOptionsTyped, policy: &RetryPolicy, ) -> Result<Resp, RpcError>
where Req: Serialize, Resp: DeserializeOwned,

Direct-addressed typed call with retry. Encodes once (the request bytes are reused across attempts), retries per policy, decodes the final reply.

Source

pub async fn call_service_typed_with_retry<Req, Resp>( &self, service: &str, request: &Req, opts: CallOptionsTyped, policy: &RetryPolicy, ) -> Result<Resp, RpcError>
where Req: Serialize, Resp: DeserializeOwned,

Service-name typed call with retry. Same caveat as Self::call_service_with_retry — each attempt re-resolves the candidate set, so failover is automatic.

Source§

impl Mesh

Source

pub async fn call_with_hedge_to( &self, targets: &[u64], service: &str, payload: Bytes, opts: CallOptions, policy: &HedgePolicy, ) -> Result<RpcReply, RpcError>

Hedge across an explicit set of target node ids. The first element of targets is the primary (fired immediately); subsequent elements are hedges fired at policy.delay * idx. Whichever call resolves first wins (Ok or Err). Losing in-flight calls are dropped on the caller side.

Returns RpcError::NoRoute if targets is empty. If every candidate fails, returns the LAST observed error (after all hedges have been awaited).

Source

pub async fn call_service_with_hedge( &self, service: &str, payload: Bytes, opts: CallOptions, policy: &HedgePolicy, ) -> Result<RpcReply, RpcError>

Hedge across 1 + policy.hedges candidates picked from the service registry. Candidates are sorted (so the picks are deterministic for a stable registry) and the prefix is taken. If fewer candidates exist than requested, hedges degrade to whatever’s available (no error if hedges=2 but only 1 candidate exists — you just get a straight call).

opts.routing_policy is ignored (hedge picks its own candidates from the service registry). opts.filter_unhealthy is also ignored: hedge’s whole premise is “be robust to per-node slowness” — filtering unhealthy candidates reduces the redundancy that hedge buys you. If you want health-aware single-target dispatch, use call_service directly with a routing policy.

Source

pub async fn call_typed_with_hedge_to<Req, Resp>( &self, targets: &[u64], service: &str, request: &Req, opts: CallOptionsTyped, policy: &HedgePolicy, ) -> Result<Resp, RpcError>
where Req: Serialize, Resp: DeserializeOwned,

Typed counterpart of Self::call_with_hedge_to. Encodes once, hedges, decodes the winner’s reply.

Source

pub async fn call_service_typed_with_hedge<Req, Resp>( &self, service: &str, request: &Req, opts: CallOptionsTyped, policy: &HedgePolicy, ) -> Result<Resp, RpcError>
where Req: Serialize, Resp: DeserializeOwned,

Typed counterpart of Self::call_service_with_hedge.

Source§

impl Mesh

Source

pub fn serve_tool<Req, Resp, F, Fut>( &self, descriptor: ToolDescriptor, handler: F, ) -> Result<ToolServeHandle, ServeError>
where Req: DeserializeOwned + Send + Sync + 'static, Resp: Serialize + Send + Sync + 'static, F: Fn(Req) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<Resp, String>> + Send + 'static,

Atomically register handler as an AI tool:

  1. The descriptor is inserted into the local tool_registry — subsequent announce_capabilities calls auto-emit the ai-tool:<name> tag, the typed ToolCapability, and the description / streaming / tags metadata keys (see A-2a).
  2. The handler is registered as an nRPC service at descriptor.tool_id via serve_rpc_typed — the substrate also tracks the service in rpc_local_services so subsequent announces include the nrpc:<name> tag.
  3. The first serve_tool call on this Mesh lazily installs the tool.metadata.fetch server handler so agents can pull the full descriptor for tools whose schemas were too large for the capability-fold payload budget. The install handle lives for the lifetime of the Mesh; subsequent serve_tool calls skip it.

If step 2 fails, step 1 is rolled back — the registry insert is paired-removed before the error returns, and the auto-install (if it happened in this call) stays in place (low cost; cleaning it up would race with concurrent serve_tool calls).

The returned ToolServeHandle reverses both registry insert (step 1) and handler registration (step 2) on Drop.

JSON codec is used unconditionally for AI tools — every provider (OpenAI, Anthropic, Gemini, MCP) consumes JSON for tool input/output. Wire-format consistency lets the adapter packages in M-* lower descriptors and dispatched tool-calls without per-tool codec negotiation.

Source

pub fn serve_tool_streaming<Req, F, Fut, St>( &self, descriptor: ToolDescriptor, handler: F, ) -> Result<ToolServeHandle, ServeError>
where Req: DeserializeOwned + Send + Sync + 'static, F: Fn(Req) -> Fut + Send + Sync + 'static, Fut: Future<Output = St> + Send + 'static, St: Stream<Item = ToolEvent> + Send + 'static,

Streaming variant of Self::serve_tool. The handler returns a futures::Stream of ToolEvents; the SDK serializes each item as one JSON-encoded chunk on the underlying serve_rpc_streaming_typed path.

Contract for handlers:

  • Emit one terminal event (ToolEvent::Result or ToolEvent::Error) to close the stream cleanly. The SDK stops driving the user’s stream the moment a terminal event is emitted — any items the handler tries to yield after a terminal are not transmitted.
  • If the stream ends without a terminal event, the SDK synthesizes ToolEvent::Error with code = "missing_terminal" so callers can rely on every stream ending with a terminal envelope.

descriptor.streaming is forced to true on registration — the tool::<id>::streaming metadata key emitted by the announce merge (A-2a) reflects the actual register path the host took, not the value the caller built into the descriptor.

Atomicity, Drop-reverses, and lazy tool.metadata.fetch install all behave the same as Self::serve_tool.

Source

pub async fn call_tool<Req, Resp>( &self, tool_id: &str, request: &Req, ) -> Result<Resp, RpcError>
where Req: Serialize, Resp: DeserializeOwned,

Capability-routed unary tool call. Encodes request as JSON, resolves a target node from nrpc:<tool_id> in the local capability fold (via net::adapter::net::MeshNode::call_service), awaits the typed Resp.

Codec is JSON unconditionally — every AI provider (OpenAI, Anthropic, Gemini, MCP) consumes JSON for tool input/output, so the substrate enforces one codec for the whole tool surface. Adapters can lower descriptors and dispatched calls without per-tool codec negotiation.

Returns RpcError::NoRoute if no host currently serves the tool. Bubbles handler errors as RpcError::ServerError with status NRPC_TYPED_HANDLER_ERROR carrying the handler’s error message.

Source

pub async fn call_tool_streaming<Req>( &self, tool_id: &str, request: &Req, ) -> Result<RpcStreamTyped<ToolEvent>, RpcError>
where Req: Serialize,

Capability-routed streaming tool call. Encodes request as JSON, opens a streaming call against nrpc:<tool_id> via the substrate’s call_service_streaming (S-1), returns an crate::mesh_rpc::RpcStreamTyped<ToolEvent> that decodes each chunk as a ToolEvent.

Stream lifecycle:

  • Server emits zero or more Start / Progress / Delta envelopes, then exactly one terminal Result or Error. The SDK does NOT enforce this contract on the caller side — it surfaces the wire events verbatim. Adapters (formats/anthropic, formats/openai, etc.) own the contract enforcement.
  • If the handler ends without a terminal event, the server- side wrapper synthesizes ToolEvent::Error { code: "missing_terminal", ... } — see Self::serve_tool_streaming.
  • Dropping the returned stream emits CANCEL to the server (substrate cancel-token contract).
Source

pub fn list_tools(&self, matcher: Option<&TagMatcher>) -> Vec<ToolDescriptor>

Walk the capability fold for every published AI tool and return one ToolDescriptor per (tool_id, version) with node_count filled in. One in-memory pass; no network.

matcher is the standard substrate TagMatcher — an entry is included if ANY of its tags match. Common shapes:

  • None — every tool the local fold has seen.
  • Some(TagMatcher::Prefix { value: "ai-tool:".into() }) — “every node advertising AT LEAST ONE AI tool” (filters out peers that don’t publish any tool but otherwise pass the fold).
  • Some(TagMatcher::Prefix { value: "region.eu".into() }) — tools served by EU-region hosts.

Delegates to net::adapter::net::MeshNode::list_tools.

Source

pub fn watch_tools( &self, matcher: Option<TagMatcher>, interval: Option<Duration>, ) -> ToolListWatch

Subscribe to a stream of ToolListChange events for every dynamic addition / removal / publisher-count change in the local capability fold’s tool view, filtered by matcher.

Event-driven: a change is delivered the moment the capability fold mutates (latency is bounded by fold-apply, not a timer), and an idle fold does zero periodic work.

interval is a debounce ceiling, not a poll cadence:

  • None — pure event-driven; the watch only wakes on a real mutation.
  • Some(d) — additionally guarantees a re-diff at least every d as a safety net, independent of the change signal.

The returned ToolListWatch implements futures::Stream<Item = ToolListChange>. Dropping it — or calling ToolListWatch::cancel — ends the stream and stops the underlying substrate task.

First event fires AFTER the initial baseline snapshot — call Self::list_tools first if you need the starting shape.

Delegates to net::adapter::net::MeshNode::watch_tools.

Auto Trait Implementations§

§

impl !RefUnwindSafe for Mesh

§

impl !UnwindSafe for Mesh

§

impl Freeze for Mesh

§

impl Send for Mesh

§

impl Sync for Mesh

§

impl Unpin for Mesh

§

impl UnsafeUnpin for Mesh

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more