Net Rust SDK
Ergonomic Rust SDK for the Net mesh network.
The core net crate is the engine. This SDK is what Rust developers import.
Install
Or in Cargo.toml:
[]
= "0.18.0"
The crate publishes as ai2070-net-sdk on crates.io but imports as use net_sdk::... (the in-source crate name is preserved via package aliasing).
Features: redis, jetstream, net (mesh transport), nat-traversal (classifier + connect_direct, opt-in), port-mapping (NAT-PMP + UPnP, opt-in), cortex (event-sourced tasks/memories + NetDb), compute (daemons + migration), groups (replica / fork / standby), meshos (cluster-behavior engine + daemon supervision SDK; implies compute), deck (operator command surface; implies meshos), local (bundles net + cortex + compute + groups), full (bundles local + redis + jetstream + meshos + deck). NAT features stay opt-in — they are not pulled in by full.
Quick Start
use ;
use StreamExt;
async
Typed Streams
use Deserialize;
use StreamExt;
let mut stream = node.;
while let Some = stream.next.await
Ingestion Methods
| Method | Input | Speed | Returns |
|---|---|---|---|
emit(&T) |
Any Serialize |
Fast | Receipt |
emit_raw(bytes) |
impl Into<Bytes> |
Fastest | Receipt |
emit_str(json) |
&str |
Fast | Receipt |
emit_batch(&[T]) |
Slice of Serialize |
Bulk | usize |
emit_raw_batch(Vec<Bytes>) |
Raw byte vecs | Bulk fastest | usize |
Transports
// In-memory (default, single process)
builder.memory
// Redis Streams
builder.redis
// NATS JetStream
builder.jetstream
// Encrypted UDP mesh
builder.mesh
Persistent producer nonce (cross-restart dedup)
The JetStream and Redis adapters key dedup on a (producer_nonce, shard, sequence_start, i) tuple. Without persistence, the nonce
is fresh per process — a producer that crashes mid-batch and
restarts gets a new nonce, retransmits look fresh to the
backend, and the partial-batch's accepted half is persisted
twice.
Configure EventBusConfig::producer_nonce_path to make the
nonce survive restart:
let cfg = builder
.num_shards
.redis
.producer_nonce_path
.build?;
The bus loads (or creates on first run) a u64 nonce at this
path. JetStream gets server-side dedup automatically (the
existing Nats-Msg-Id format absorbs the persistent nonce);
Redis Streams ships the same id as a dedup_id field on every
XADD, filterable consumer-side via the helper below.
Redis Streams consumer-side dedup helper
use RedisStreamDedup;
// Sizing: ~10k events/sec * 1 min dedup window → ~600,000.
let mut dedup = with_capacity;
// Read entries from your Redis client of choice; pull the
// `dedup_id` field from each XADD entry's field map.
for entry in stream
The helper is transport-agnostic — it answers a test-and-insert
question against an in-memory LRU. The producer-side
MULTI/EXEC-timeout race can otherwise produce duplicate stream
entries with distinct server-generated * ids that consumers
can't dedupe; the dedup_id field is stable across retries
(and across process restart when producer_nonce_path is
configured) so this filter cleanly removes them.
The helper is also re-exported as net_sdk::RedisStreamDedup;
the canonical impl lives in net::adapter::RedisStreamDedup.
Cross-language wrappers (NAPI, PyO3, cgo, C) ship in the
respective bindings.
NAT Traversal (optimization, not correctness)
Two NATed peers already reach each other through the mesh's routed-handshake path. NAT traversal opens a shorter direct path when the NAT shape allows it, cutting the per-packet relay tax. Everything in this section is disabled unless the core is built with --features nat-traversal; without it the routed path keeps working unchanged and the five reader methods below return Unsupported.
// Run a reflex probe + peer-probed classification.
mesh.reclassify_nat.await;
// Read the current classification + public reflex the mesh
// advertises to peers. NatClass is Open | Cone | Symmetric | Unknown.
let class = mesh.nat_type;
let reflex = mesh.reflex_addr; // Option<SocketAddr>
// Directly query any connected peer's reflex.
let observed = mesh.probe_reflex.await?; // -> SocketAddr
// Attempt a direct connection via the pair-type matrix.
// `coordinator` mediates the punch when the matrix picks one.
// Returns Ok regardless of path — inspect stats to learn which.
mesh.connect_direct.await?;
// Cumulative counters partition real activity.
let stats = mesh.traversal_stats;
stats.punches_attempted; // coordinator mediated a PunchRequest + Introduce
stats.punches_succeeded; // ack arrived AND direct handshake landed
stats.relay_fallbacks; // landed on the routed path after skip/fail
Operators with a known-public address — port-forwarded servers, successful UPnP / NAT-PMP installs — can skip the classifier sweep entirely. A runtime override forces "open" and the supplied SocketAddr on every capability announcement from this node; call announce_capabilities after to propagate to peers (the setter resets the rate-limit floor so the next announce is guaranteed to broadcast).
mesh.set_reflex_override;
mesh.announce_capabilities.await?;
// ... later, if the mapping drops:
mesh.clear_reflex_override;
mesh.announce_capabilities.await?;
Opt into automatic UPnP-IGD / NAT-PMP port mapping via MeshBuilder::try_port_mapping(true) (requires --features port-mapping). The mesh spawns a task that probes NAT-PMP first, falls back to UPnP, installs a mapping on success, and renews every 30 minutes; on install it calls set_reflex_override(external) for you. A router that doesn't speak either protocol leaves the node on the classifier path — that's fine.
SdkError::Traversal wraps every TraversalError variant with a stable kind discriminator (reflex-timeout | peer-not-reachable | transport | rendezvous-no-relay | rendezvous-rejected | punch-failed | port-map-unavailable | unsupported). None of these is a connectivity failure — the routed path is always available regardless.
Mesh Streams (multi-peer + back-pressure)
For direct peer-to-peer messaging outside the event bus — open a typed stream to a specific peer, send batches, and react to back-pressure:
use ;
use SdkError;
use Bytes;
let node = new?
.build
.await?;
// ... handshake with a peer via node.inner().connect(...) ...
// Open a per-peer stream with explicit reliability + back-pressure window.
let stream = node.open_stream?;
// Three canonical daemon patterns:
// 1. Drop on pressure — best for telemetry / sampled streams.
//
// `SdkError` is `#[non_exhaustive]`. Always include a wildcard arm
// when matching: future variant additions (e.g. `Sampled`, `Unrouted`)
// will be a minor-version change, but a closed match would stop
// compiling. Match by variant where the remediation differs;
// fall through with `Err(e)` for the rest.
match node.send_on_stream.await
// 2. Retry with exponential backoff — best for important events.
node.send_with_retry.await?;
// 3. Block until the network lets up (bounded retry, ~13 min worst case).
node.send_blocking.await?;
// Live stats — per-stream tx/rx seq, in-flight, window, backpressure count.
// Returns `None` if the stream was closed or never opened.
let stats = node.stream_stats;
SdkError::Backpressure is a signal, not a policy — the transport never
retries or buffers on its own behalf. StreamStats.backpressure_events
counts cumulative rejections for observability. See
docs/TRANSPORT.md for the full contract and
docs/STREAM_BACKPRESSURE_PLAN.md
for the design.
Security (identity, tokens, capabilities, subnets)
Identity, capabilities, and subnets ride the net feature as a
single security unit — they share the mesh's subprotocol dispatch
and operate together at runtime (subnet enforcement reuses the
capability broadcast; channel auth threads identity + capabilities
- subnets together), so
--features netgives you the whole surface:
use Duration;
use ;
use ;
use ChannelName;
# async
What's wired in this release:
Identitygeneration / seed round-trip / signing / token issuance- verification + install + lookup.
MeshBuilder::identity(...)pins the keypair used by the mesh's Noise handshake sonode_id()is stable.- Capability announcements — cross-node (direct-peer). See the subsection below.
- Re-exports of
SubnetId/SubnetPolicy/SubnetRule(builder hook + gateway wiring land next).
Treat Identity::to_bytes() as secret material — it's the
32-byte ed25519 seed. The SDK never touches a hardcoded path; where
you put the bytes (disk, vault, enclave, k8s secret) is your call.
Capability announcements
Mesh::announce_capabilities(caps) pushes a CapabilityAnnouncement
to every directly-connected peer and self-indexes locally.
Mesh::find_nodes(filter) queries the local index — results include
this node's own id when self matches.
use ;
use MeshBuilder;
# async
Scoped discovery (reserved scope:* tags)
A provider can narrow who their query result reaches by tagging
its CapabilitySet with reserved scope:* tags. Queries call
find_nodes_scoped(filter, scope) (or find_best_node_scoped)
to filter candidates. The wire format and forwarders are
untouched — enforcement is purely query-side.
use ;
# async
Reserved tag forms: scope:subnet-local (visible only under
ScopeFilter::SameSubnet), scope:tenant:<id>,
scope:region:<name>. Strictest scope wins —
subnet-local dominates tenant/region tags on the same set.
Untagged peers resolve to Global and stay visible under
permissive queries (matches the v1 default; you opt in to
narrowing, never out by accident). Full design:
docs/SCOPED_CAPABILITIES_PLAN.md.
Scope today:
- Multi-hop fan-out bounded by
MAX_CAPABILITY_HOPS = 16. Forwarders re-broadcast every received announcement to their other peers (minus the sender and any split-horizon peer), bumpinghop_countoutside the signed envelope so the origin's signature keeps verifying end-to-end. Dedup on(origin, version)drops duplicates at diamond-topology converge points. Seedocs/MULTIHOP_CAPABILITY_PLAN.md. - Origin-side rate limiting:
min_announce_interval(default 10s) coalesces rapidannounce_capabilitiescalls into a single broadcast, preventing a busy-loop announcer from flooding the mesh. Self-index + late-joiner session-open push still reflect the latest caps inside the window. - TTL + GC eviction: per-announcement
ttl_secsdrivesCapabilityIndex::gc()on a configurable tick (capability_gc_interval, default 60 s). - Signatures are advisory. The
require_signed_capabilitiesconfig knob rejects unsigned announcements at the receiver, but signature validity is not enforced end-to-end yet — it requires anode_id → entity_idbinding that lands with channel auth.
Wire-level details and the subprotocol layout live in
docs/CAPABILITY_BROADCAST_PLAN.md.
Capability enhancements (typed taxonomy + predicates + validation)
The substrate's CapabilitySet is a { tags, metadata } wire shape
post-Phase A.5.N. Beyond announce_capabilities / find_nodes, the
SDK exposes the caller-local enhancement layer mirroring
CAPABILITY_ENHANCEMENTS_PLAN.md:
use ;
# async
For host-side placement-filter callbacks, implement
PlacementFilter
directly and register the impl with
global_placement_filter_registry();
the TS / Python / Go bindings auto-wrap closures via
placement_filter_from_fn for the same registry.
The wire format is byte-identical across all five bindings (Rust /
TS / Python / Go / C) — pinned by the JSON fixtures under
tests/cross_lang_capability/. A worked-examples guide for each
enhancement API:
CAPABILITY_ENHANCEMENTS_USAGE.md.
Subnets (visibility partitioning)
MeshBuilder::subnet(id) pins a node to one of 2³² possible 4-level
subnet ids; subnet_policy(policy) derives each peer's subnet by
applying a shared tag-matching policy to their inbound
CapabilityAnnouncement. Channel visibility then gates publish
fan-out and subscribe authorization against that geometry.
use Arc;
use CapabilitySet;
use MeshBuilder;
use ;
# async
Visibility semantics (from Visibility enum):
| Variant | Delivery |
|---|---|
Global |
every peer |
SubnetLocal |
peers with an identical SubnetId |
ParentVisible |
same subnet OR either side is an ancestor of the other |
Exported |
per-channel export table — deferred, drops in v1 |
Scope today:
- Enforcement is end-to-end through the publish + subscribe gates.
Filtered subscribers do not appear in
PublishReport.attempted. - Peer subnets are derived locally from each peer's capability
announcement via
SubnetPolicy::assign. No dedicated subnet subprotocol; announcements piggyback on the capability broadcast from Stage C. - Multi-hop subnet-aware routing (forwarding filters at the packet header) is a follow-up.
Wire-level details and the enforcement matrix live in
docs/SUBNET_ENFORCEMENT_PLAN.md.
Channel authentication
ChannelConfig carries three auth knobs that are now enforced
end-to-end at both the subscribe gate and the publish path:
publish_caps: CapabilityFilter— publisher must satisfy before fan-out. Failing publishes return anAdapterError; no peers are attempted.subscribe_caps: CapabilityFilter— subscribers must satisfy before being added to the roster. Failures surface asSdkError::ChannelRejected(Some(Unauthorized)).require_token: bool— subscribers must present a validPermissionTokenwhose subject matches their entity id. The token rides on the subscribe message; the publisher verifies the ed25519 signature on arrival, installs it in its localTokenCache, then runscan_subscribe.
use Arc;
use Duration;
use ;
use MeshBuilder;
use ;
# async
Scope today:
- Full enforcement at subscribe + publish; empty-caps / missing-
entity defaults fail closed when
require_tokenis set. - Every publish fan-out consults the
AuthGuardfast path (4 KB bloom filter + verified-subscribe cache) so revocations apply on the next publish without a roster refresh. Single-threaded microbenchmark: ~20 ns percheck_fastcall. - Periodic token-expiry sweep (default 30 s,
MeshNodeConfig::with_token_sweep_interval) evicts subscribers whose tokens age out of their TTL — they stop receiving events within one sweep tick instead of staying on the roster forever. - Per-peer auth-failure rate limiter (
with_auth_failure_limit, default 16 failures per 60 s window → 30 s throttle) short- circuits bad-token subscribe storms withAckReason::RateLimitedbefore ed25519 verification runs. Successful subscribes clear the counter. CapabilityAnnouncementnow carries the sender'sentity_idand is signed — verified end-to-end (closes the "signature advisory" caveat from the capability section above).node_id → entity_idis pinned on first sight (TOFU); rebind attempts in later announcements are silently rejected.- Any auth-rule denial surfaces as
AckReason::Unauthorized; throttled bursts surface asAckReason::RateLimited. Sub-reasons within the auth rejection (cap-failed vs token-failed vs subnet-failed) are not split yet.
Wire-format details and the token presentation flow live in
docs/CHANNEL_AUTH_PLAN.md; the
fast-path / sweep / rate-limit design lives in
docs/CHANNEL_AUTH_GUARD_PLAN.md.
Channels (distributed pub/sub)
Named pub/sub over the encrypted mesh. Publishers register channels
with access policy; subscribers ask to join via a membership
subprotocol with an Ack round-trip. publish / publish_many fan
payloads out to every current subscriber.
use Bytes;
use ;
use ;
# async
register_channel stores into a shared ChannelConfigRegistry
installed on the underlying MeshNode at build time — so multiple
register_channel calls are just inserts and require only &Mesh,
not &mut.
Subscribers today receive payloads via the existing recv /
recv_shard surface. A dedicated on_channel(&ChannelName) stream
is a follow-up.
CortEX & NetDb (event-sourced state)
For typed, event-sourced state — tasks and memories with filterable
queries and reactive watches — enable the cortex feature and import
from net_sdk::cortex:
use ;
use StreamExt;
async
Persistence
With redex-disk (pulled in by cortex), point Redex at a directory
and flip persistent(true) on the builder:
let redex = new.with_persistent_dir;
let db = builder
.origin
.persistent
.with_tasks
.build?;
Use RedexFileConfig + FsyncPolicy (both re-exported from
net_sdk::cortex) to tune per-file fsync semantics.
Raw RedEX file
For domain-agnostic persistent logs (no CortEX, no fold, no typed
state), use the Redex manager directly via Redex::open_file. This
unlocks RedexFile::append / tail for custom event pipelines.
Cross-node RedEX replication
RedEX channels can replicate across the mesh. Opt in per channel via
RedexFileConfig::with_replication(Some(ReplicationConfig::new()));
the default None keeps the channel single-node and adds zero wire
traffic. Replicated channels carry N copies of the log; the leader is
the single writer, replicas catch up via pull-based sync. Failover
uses a deterministic nearest-RTT election with NodeId tie-break — no
broadcast / no epoch / no collection window; every node computes the
same winner from the same inputs.
use Arc;
use Mesh;
use ;
async
ReplicationConfig knobs: factor (1–16, default 3), heartbeat_ms
(min 100, default 500), placement (Standard / Pinned(Vec<NodeId>) /
ColocationStrict), leader_pinned: Option<NodeId>,
on_under_capacity (Withdraw drops the replica role on disk
pressure; EvictOldest runs retention sweep and retries — requires
retention_max_* caps), replication_budget_fraction (sync I/O cap
as fraction of measured NIC peak, default 0.5).
Operator surface on Redex:
enable_replication(mesh)— install replication wiring. Required beforeopen_filewithreplication: Some(_).replication_runtime_count() -> usize— registered per-channel runtimes.replication_metrics_snapshot() -> Option<ReplicationMetricsSnapshot>— per-channel atomic counters (lag, sync_bytes, leader_changes, under_capacity, skip_ahead, election_thrash, witness_withdrawals).replication_status_snapshot() -> Option<Vec<ReplicationChannelStatus>>— per-channel{channel_name, role, tail_seq}.replication_prometheus_text() -> String— Prometheus-text render of the metrics snapshot. Returns the empty string when replication isn't enabled; pipe straight into an HTTP scrape body without branching.replication_coordinator_for(name) -> Option<Arc<ReplicationCoordinator>>— per-channel handle for inspection or forced transitions during recovery / debugging.
Failover takes one heartbeat-detection window (3 × heartbeat_ms)
plus the election (microseconds). Disk-pressure replicas withdraw
their causal: capability tag so peers re-route to a healthy holder.
Replication overhead is ~1× of single-node append throughput in
steady state — the runtime task runs on tokio at the heartbeat
cadence; per-append work is unchanged.
Dataforts (greedy cache, gravity, blob refs, read-your-writes)
Dataforts is the compositional data plane on top of RedEX / CortEX /
capability-index / proximity-graph. Enable the dataforts feature
on the underlying ai2070-net crate (the SDK is a thin wrapper —
Dataforts surfaces are consumed via net::adapter::net::dataforts::*
and Redex::enable_greedy_dataforts / Redex::enable_gravity_for_greedy
on the Redex handle). Four phases:
- Phase 1 — Greedy-LRU caching. Per-node speculative caching
of in-scope chains observed via the tail-subscription path.
Five-axis admission (scope + proximity + capability-preference
- colocation + storage-cap) plus a bandwidth budget gate decide
whether to admit each inbound event into a per-channel cache
file. Cold channels evict under cluster-cap pressure and
withdraw their
causal:<hex>advertisement. The runtime also observesBlobRef-shaped payloads and asksshould_pull_blob; on admit, the wiredBlobAdapter::prefetchspawns a best- effort pull via the per-chunk replication runtime and the chunk hash bumps a refcount table for chain-fold GC.
- colocation + storage-cap) plus a bandwidth budget gate decide
whether to admit each inbound event into a per-channel cache
file. Cold channels evict under cluster-cap pressure and
withdraw their
- Phase 3 —
BlobRef+BlobAdapter. Two shapes:- External-hook variant (v0.15): a 4-byte-magic + version
- 32-byte BLAKE3 + size + URI reference whose bytes live in
the caller's storage (S3 / Ceph / IPFS / local FS). Adapters
implement
fetch/store/delete/stat/prefetchwith defaultfetch_stream/store_streamshims for multi-GB payloads.
- 32-byte BLAKE3 + size + URI reference whose bytes live in
the caller's storage (S3 / Ceph / IPFS / local FS). Adapters
implement
- Substrate-owned variant (v0.2):
BlobRef::Manifestfor multi-chunk blobs (4 MiB fixed chunking).MeshBlobAdapterstores each chunk as a content-addressedRedexFileriding the existing replication runtime. Wraps aBlobRefcountTablefor GC + pinning,BlobMetricsfor Prometheus, and an optionalAuthGuardfor*_authorizedpeer-facing pin / unpin / delete variants. Atomicstore → wait → publishviapublish_with_blob+BlobDurability::{BestEffort, DurableOnLocal, ReplicatedTo(n)}. Operator CLI:cargo run --features cli --bin net-blob -- --help.
- External-hook variant (v0.15): a 4-byte-magic + version
- Phase 4 — Data gravity. Per-chain read-rate counters with
exponential decay. Threshold-crossing emissions stamp
heat:<hex>=<rate>onto the chain's existing capability announcement; the greedy admission gate weights cache pulls by heat × scope-match × proximity-rank. Cold chains evict first; hot chains migrate toward the readers that drive the heat. The v0.2 blob track adds a parallelBlobHeatRegistrykeyed on the chunk's BLAKE3 hash (fetch-path bumps viaMeshBlobAdapter::with_blob_heat),heat:blob:<hex>=<rate>reserved-tag emission via theBlobHeatSinktrait (MeshNodeis the production impl), anddrive_blob_migration_tick— observes peer-advertised heat, runsshould_migrate_blob_to, and on admit callsadapter.prefetchon the chosen target. Manifest-aware variantdrive_blob_migration_tick_with_manifest_resolverproactively prefetches every sibling chunk when one chunk of a manifest gets hot. - Phase 5 — Read-your-writes. A
WriteToken { origin_hash, seq }returned from every successfulTasks/Memorieswrite. Pass it totasks.wait_for_token(token, deadline)(or the memories counterpart) and the call blocks until the local fold has actually applied that sequence number — tracking bothapplied_through_seqandfolded_through_seqso a stalled fold surfacesWaitForTokenError::FoldStoppedrather than a silentOk(()).
Capability projections feed admission: BlobCapability /
GreedyCapability / GravityCapability / TopologyScope
types read from CapabilitySet tags. Producer-side typed
setters (CapabilitySet::with_blob_capability(BlobCapability:: storage_participating(100, 50)) + with_greedy_capability /
with_gravity_capability) round-trip back to wire-form tags.
use ;
use ;
use CapabilitySet;
use ;
use Arc;
# async
Phase 3.5 — Active blob overflow (v0.3)
Push-side complement of the pull-driven gravity migration.
Disabled by default; opt in with MeshBlobAdapter::with_overflow(...)
at construction or set_overflow_enabled(true) at runtime.
When active, a node above the configured high-water disk ratio
walks its BlobHeatRegistry coldest-first + pushes to overflow-
enabled peers via the MeshNode::send_overflow_push nRPC.
#
# async #
Operators dashboard via the new dataforts_blob_overflow_*
counter family in the adapter's prometheus_text() body
(see the release notes
for the full metric list). CLI: net-blob overflow status.
The canonical ChannelHash is u32 substrate-wide for ACL / config
/ storage / RYW; the per-packet wire NetHeader::channel_hash stays
u16 (fast-path filter hint). Wire-bucket collisions are benign —
the substrate disambiguates via the registry's by_wire_hash
reverse index and re-keys on the canonical 32-bit hash for all
non-fast-path decisions. The publisher's wire origin_hash
resolves to the announcement-side node_id via a
CapabilityIndex::get_by_origin_hash side index — the same lookup
the greedy + migration admission gates use for chain_caps.
See docs/misc/DATAFORTS_FEATURES.md
for the original audit and
docs/plans/DATAFORTS_BLOB_STORAGE_PLAN.md
for the v0.2 substrate-owned blob CAS plan + shipping status.
nRPC (request / response over the mesh)
nRPC is the request/response convention layer riding on top of the
pub/sub mesh + CortEX folds. It turns a directed channel pair
(<service>.requests / <service>.replies.<caller_origin>) into a
typed RPC surface with deadlines, queue-group fan-out, response
streaming, and end-to-end cancellation. Enable the cortex feature
(nRPC depends on the CortEX rpc.rs fold).
Typed serve + call
use ;
use CallOptions;
use ;
use Duration;
# async
call_typed and call_service_typed (service-discovery variant)
default to JSON. Use the raw-bytes path (call / call_service)
when you own the encoding.
Streaming responses
use StreamExt;
use CallOptions;
# async
#
#
# ;
RpcStream::grant(amount) issues an explicit credit publish
when batched cadence is preferable to the per-chunk auto-grant
default (no-op on streams that didn't opt into flow control).
Resilience helpers
Mesh::call_with_retry wraps a unary call in exponential backoff
with jitter; the default RetryPolicy::default() retries
no_route + transport and skips terminal errors:
use ;
use CallOptions;
use Bytes;
use Duration;
# async
CircuitBreaker (in mesh_rpc_resilience) tracks consecutive
failures and trips open after a threshold; open breakers reject
calls outright until the cooldown allows a half-open probe.
Errors
RpcError is the unified failure surface. Variants: NoRoute,
Timeout, ServerError { status, message }, Transport,
Codec { direction, message }. Status codes use u16; the
application-defined band is 0x8000..=0xFFFF. Two stable
constants ship in net_sdk::mesh_rpc:
| Status hex | Constant | Trigger |
|---|---|---|
0x0000 |
RpcStatus::Ok |
Normal response. |
0x8000 |
NRPC_TYPED_BAD_REQUEST |
Typed handler couldn't decode the request body. |
0x8001 |
NRPC_TYPED_HANDLER_ERROR |
Typed handler ran but returned an exception. |
Cross-binding contract spec — including the canonical
cross_lang_echo_sum service used by every binding's wire-format
compat test — lives in ../README.md#nrpc.
MeshDB (federated query layer)
MeshDB is the typed query layer above the RedEX / CortEX / capability-index substrate. Enable the meshdb feature on ai2070-net and import the operators + executor directly from
net::adapter::net::behavior::meshdb. Architectural overview: ../README.md#meshdb.
Local executor
#
# async
Composite operators
OperatorPlan covers the full surface — Filter (synthetic-tag
predicate), Window (tumbling-on-seq), AggregateCount /
AggregateNumeric (sum / avg) / AggregateReduction (min / max /
percentile) / AggregateDistinct, HashJoin (inner / outer; hash-
broadcast + sort-merge strategies), LineageEmit (pre-walked
entries). Compose by nesting OperatorNodes: each composite
operator takes a Box<OperatorNode> input plus its own knobs.
Sentinel rows (aggregate / joined / window) carry postcard-encoded
envelopes — decode via the typed AggregateRowPayload /
JoinedRowPayload / WindowBoundary types re-exported from the
same module.
Phase F cache
LocalMeshQueryExecutor::with_cache(reader, cache, version_fn)
wires the bounded LRU result cache. Per-call policy lives on
ExecuteOptions:
use Duration;
use ;
// `executor` is a `LocalMeshQueryExecutor<R>` built via
// `with_cache(reader, cache, version_fn)`; `plan` is an
// ExecutionPlan from the `OperatorPlan` builder above.
let opts = ExecuteOptions ;
let _running = executor.execute_with.await?;
CachePolicy::Permanent skips TTL expiry — use only for queries
whose result is immutable under substrate semantics.
Federated executor
FederatedMeshQueryExecutor<T: MeshDbTransport> fans atomic
operators out to remote nodes via a pluggable transport with
proximity-ordered failover. The in-tree LoopbackTransport
drives in-process N-node integration tests without a real wire;
the production transport rides SUBPROTOCOL_MESHDB on
MeshNode (wire envelopes in
net::adapter::net::behavior::meshdb::protocol).
Errors
MeshError carries planner / executor / transport failures —
PlannerError { detail }, HistoricalRangeUnavailable { origin, requested, available }, BudgetExceeded { metric, limit },
and transport-level variants. Every variant is Clone + PartialEq + Debug so cache keys + diagnostics work without
clone-on-error gymnastics.
Compute (daemons + migration)
Enable the compute feature to run MeshDaemons from your SDK
code. A daemon is a stateful event processor with a deterministic
causal chain; DaemonRuntime owns the factory table, the per-
daemon hosts, the lifecycle gate (Registering → Ready → ShuttingDown), and the migration subprotocol plumbing. The full
staging and design notes live in
docs/SDK_COMPUTE_SURFACE_PLAN.md;
the runtime readiness fence in
docs/DAEMON_RUNTIME_READINESS_PLAN.md.
use Arc;
use Bytes;
use ;
use CapabilityFilter;
use ;
;
# async
The MeshDaemon trait is intentionally minimal:
requirements() feeds the PlacementScheduler — a GPU daemon
advertises require_gpu() and only lands on nodes whose
CapabilityAnnouncement matches. snapshot / restore are opt-in:
leave the defaults for stateless daemons; implement them to enable
live migration of stateful ones.
Migration
Once a daemon is up, start_migration orchestrates the six-phase
cutover to another node: Snapshot → Transfer → Restore → Replay → Cutover → Complete. The source seals the daemon's seed into the
outbound snapshot (sealed with the target's X25519 pubkey); the
target rebuilds the daemon via the factory registered under the same
kind, replays any events that arrived during transfer, then
activates.
use ;
// Caller side: start a migration to `target_node`. Returns as soon
// as the SNAPSHOT phase has begun; `wait()` drives to completion.
let mig: MigrationHandle = rt
.start_migration
.await?;
assert_eq!;
println!; // Some(MigrationPhase::Snapshot)
mig.wait.await?; // blocks to Complete
start_migration_with(origin, src, dst, MigrationOpts { seal_seed, .. })toggles seed-sealing and other advanced knobs.- On the target side,
DaemonRuntime::register_migration_target_identity(...)pins the X25519 keypair used to unseal inbound seeds. If unset, the runtime rejects inbound migrations withMigrationFailureReason::SealedSeedMissing. - Failures from any of the six phases surface as a
MigrationFailureReasonvariant onMigrationHandle::wait()(or on the receivingexpect_migrationhook), mirroring the wire- levelMigrationFailureMessage.
Stop / snapshot / inspect
| Method | Description |
|---|---|
rt.spawn(kind, identity, cfg) |
Launch a daemon from a registered kind |
rt.spawn_from_snapshot(...) |
Bootstrap from a previously captured StateSnapshot |
rt.stop(origin) |
Gracefully stop a local daemon |
rt.snapshot(origin) |
Capture a StateSnapshot for persistence / migration |
rt.deliver(origin, &event) |
Feed the daemon an event (returns produced payloads) |
rt.daemon_count() / rt.is_ready() |
Runtime introspection |
rt.start_migration(origin, src, dst) |
Orchestrate a live migration |
rt.subscribe_channel(origin, &name, ...) |
Attach a daemon to a mesh channel |
handle.stats() / handle.snapshot() |
Per-daemon observability |
Errors surface as ComputeDaemonError (NotReady before start,
FactoryNotFound(kind), FactoryAlreadyRegistered(kind),
ShuttingDown after shutdown, plus Core(_) for the underlying
scheduler / registry failures).
Groups (replica / fork / standby)
Enable the groups feature (implies compute) to spawn logical
clusters of daemons from a single DaemonRuntime. Three flavours
share one coordination layer:
ReplicaGroup— N interchangeable copies. Each replica gets a deterministic identity fromgroup_seed + index, so a replacement respawned on another node has a stableorigin_hash. Load-balances inbound events across healthy members; auto-replaces on node failure.ForkGroup— N independent daemons forked from a common parent atfork_seq. Unique keypairs, shared ancestry via a verifiableForkRecord(sentinel hash linking each fork to the parent chain).StandbyGroup— active-passive replication. One member processes events; standbys hold snapshots and catch up viasync_standbys(). On active failure, the most-synced standby promotes and replays the events buffered since the last sync.
use ;
use ;
use Strategy;
# async
Errors surface as GroupError: NotReady (runtime not started),
FactoryNotFound(kind) (kind was never registered), Core(_)
wrapping InvalidConfig / PlacementFailed / RegistryFailed, and
Daemon(_) for runtime-level failures. Match on the variant to
dispatch — the wire form through the FFI is
daemon: group: <kind>[: detail] and stays consistent across all
language bindings.
Full staging, wire formats, and rationale:
docs/SDK_GROUPS_SURFACE_PLAN.md.
Core semantics (placement spread, health aggregation, failure
domains) live in ../README.md#daemons.
MeshOS (daemon supervision SDK)
Enable the meshos feature (implies compute) to author
daemons that participate in MeshOS — the cluster-behavior
engine that supervises daemons, enforces replica placement,
applies operator intent, and folds the result into a snapshot
the operator UI renders. The SDK is daemon-side only: a
daemon written against this surface receives control events
and publishes capabilities, but it cannot mutate the cluster's
view of itself. Operator-facing surfaces (drain, cordon, ICE)
live in the deck SDK.
use Arc;
use Duration;
use ;
;
# async
The daemon_main! macro collapses the lifecycle boilerplate
into one block — register, drain control events, graceful-
shutdown on Shutdown / DrainFinish:
daemon_main!
Locked decisions (daemon-side only)
The SDK refuses every operator-side action by design — these are non-goals, not deferred work:
🚫 No placement / replica / scheduler APIs. Daemons advertise capabilities; MeshOS scores them. The daemon never sees the score and cannot request a placement.
🚫 No admin-event issuance. Drain / cordon / maintenance / ICE force-operations are operator-signed chain commits — the SDK has no path to emit them, in any language.
🚫 No "control MeshOS" surfaces. Avoid lists, backpressure
flags, drift signals, maintenance transitions, action
admission — all opaque. The daemon receives
BackpressureOn { level }; it cannot read the queue depth
that triggered it or override the threshold.
The full plan, including the cross-binding contract that
keeps every language SDK on the same trait shape, lives at
docs/plans/MESHOS_SDK_PLAN.md.
Deck (operator SDK)
Enable the deck feature (implies meshos) for the operator-
facing surface — what the Deck binary (the terminal-UI
cyberdeck) and tenant operator tooling import. Every cluster-
control action lives here, gated by operator-key signing +
channel-auth verification + admin-chain commit.
use Arc;
use Duration;
use ;
use ;
# async
ICE (break-glass surface)
ICE force-operations (ForceDrain, ForceEvictReplica,
ForceRestartDaemon, ForceCutover, KillMigration,
FreezeCluster { ttl }, ThawCluster, FlushAvoidLists) go
through the IceProposal discipline: simulate() returns a
[BlastRadius] preview against the local snapshot;
commit(signatures) enforces an M-of-N operator-signature
threshold substrate-side. Sub-threshold bundles surface
IceError::InsufficientSignatures without folding the
inner AdminEvent.
let proposal = deck.ice.freeze_cluster;
// 1. Pre-execution preview. Reports affected nodes, replicas,
// daemons + warnings (e.g. `ForcedEvictionBypassesCooldown`).
let blast = proposal.simulate.await?;
println!;
// 2. Collect operator signatures (one per operator, signing
// the proposal's canonical payload).
let sig_a = deck.identity.sign_proposal;
// let sig_b = peer_operator.sign_proposal(proposal.action());
// 3. Commit. Substrate-side verification enforces M-of-N;
// sub-threshold bundles return InsufficientSignatures.
let commit = proposal.commit.await?;
Runtime wiring (production extensions)
Operator deployments wire the chain seams + dispatcher seams
through one constructor — every extension is Option<Arc<dyn ...>>
so test + bootstrap callers leave them None:
use ;
let runtime = start_with_full_extensions;
| Seam | Purpose |
|---|---|
RedexAdminAuditAppender |
Dual-write AdminAuditRecord to a TypedRedexFile so security review replays every admin commit |
RedexLogAppender |
Dual-write LogRecord to a TypedRedexFile for cluster-lifetime log replay |
RedexFailureAppender |
Dual-write FailureRecord for cluster-lifetime failure replay |
OrchestratorMigrationAborter |
Routes KillMigration commits to MigrationOrchestrator::abort_migration |
OrchestratorMigrationSnapshotSource |
Embeds the local orchestrator's in-flight migrations in every published snapshot so the ICE simulator can enumerate affected daemons |
The error surface uses the <<deck-sdk-kind:KIND>>MSG
discriminator format every cross-language SDK shares — parse
once, route on KIND. The full plan (including the
intentional non-goals — no topology/identity management, no
chain-mutation outside signed admin commits, no UI rendering)
lives at docs/plans/DECK_SDK_PLAN.md.
API
| Method | Description |
|---|---|
Net::builder() |
Create a configuration builder |
emit(&T) |
Emit a serializable event |
emit_raw(bytes) |
Emit raw bytes (fastest) |
emit_str(json) |
Emit a JSON string |
emit_batch(&[T]) |
Batch emit |
emit_raw_batch(vecs) |
Batch emit raw bytes |
poll(request) |
One-shot poll |
subscribe(opts) |
Async event stream |
subscribe_typed::<T>(opts) |
Typed async stream |
stats() |
Ingestion statistics |
shards() |
Number of active shards |
health() |
Check node health |
flush() |
Flush pending batches |
shutdown() |
Graceful shutdown |
bus() |
Access underlying EventBus |
SdkError is #[non_exhaustive]; structured ingestion failures
(Sampled, Unrouted, Backpressure) and stream-side rejections
(ChannelRejected) surface as their own variants rather than being
funnelled through Ingestion(String). Always include a wildcard
arm when matching so a future variant addition is a minor-version
change, not a breaking one.
Channel surface (feature net)
| Method | Description |
|---|---|
mesh.register_channel(config) |
Install / replace a channel's access config |
mesh.subscribe_channel(peer_id, &name) |
Ask peer_id to add us as a subscriber |
mesh.unsubscribe_channel(peer_id, &name) |
Leave a channel (idempotent) |
mesh.publish(&name, bytes, cfg) |
Fan one payload to all subscribers |
mesh.publish_many(&name, &[bytes], cfg) |
Fan a batch to all subscribers |
SdkError::ChannelRejected(reason) |
Typed subscribe/unsubscribe rejection |
CortEX surface (feature cortex)
| Entry point | Description |
|---|---|
cortex::Redex::new() |
In-memory event-log manager |
cortex::Redex::with_persistent_dir(path) |
Disk-backed manager |
cortex::NetDb::builder(redex) |
Fluent NetDb construction |
cortex::TasksAdapter::open(redex, origin) |
Open tasks model standalone |
cortex::MemoriesAdapter::open(redex, origin) |
Open memories model standalone |
db.tasks() / db.memories() |
Typed adapter handles on NetDb |
adapter.snapshot_and_watch(watcher) |
Atomic initial-result + delta stream |
db.snapshot() |
NetDbSnapshot bundle for persistence |
NetDb::builder(...).build_from_snapshot(&bundle) |
Restore from bundle |
License
Apache-2.0