noesis-ship
Rust NATS communication platform for multi-agent AI systems.
Features
Seven building blocks over NATS:
| Primitive | Transport | Use Case |
|---|---|---|
| PubSub | NATS Core | Fire-and-forget broadcast (heartbeats, status) |
| EventBus | JetStream | Durable event streaming with 24h replay |
| Channels | JetStream | Point-to-point messaging with history |
| KV Store | NATS KV | Shared state with watch, TTL, history |
| Object Store | NATS Object Store | Large blob storage with SHA-256 |
| JobQueue | In-memory (NATS KV planned) | Generic job lifecycle (queued → running → complete/failed) |
| NatsServiceBuilder | NATS Core | Build a request-reply service in ~20 lines |
Add to your Cargo.toml:
[]
= "0.14"
Quick Start — ConnectionManager
All primitives start with a connection:
use ConnectionManager;
use NatsConfig;
let config = new;
let mut conn = new;
conn.connect.await?;
let client = conn.client?; // NATS Core client
let js = conn.jetstream?; // JetStream context
conn.ensure_stream.await?; // Create stream if missing
PubSub — Fire-and-Forget
Raw NATS Core publish/subscribe. No persistence — if nobody is listening, the message is lost.
use ;
use NatsConfig;
// Publisher
let mut pub_ = new;
pub_.connect.await?;
pub_.emit.await?;
pub_.emit.await?;
// Subscriber
let mut sub = new;
sub.connect.await?;
sub.subscribe;
sub.run.await?;
EventBus — Durable Events (JetStream)
Events are persisted to a JetStream stream. Late-joining consumers replay
recent history. Default: SHIP_EVENTS stream, 24h retention, 100k max.
use EventBus;
use NatsConfig;
let bus = new
.with_source;
bus.connect.await?;
// Emit — persisted to JetStream
bus.emit_event.await?;
// Subscribe with durable consumer — replays missed events
bus.subscribe.await?;
Custom stream
use StreamConfig;
let stream = new
.with_max_age // 1 hour retention
.with_max_msgs
.with_memory_storage;
let bus = with_stream;
Channels — Point-to-Point Messaging
JetStream-backed channels with history replay and own-message filtering.
use ChannelService;
use NatsConfig;
let mut ch = new;
ch.connect.await?;
// Send
ch.send_message.await?;
ch.send_message.await?;
// Subscribe (replay_history = true to get past messages)
ch.subscribe.await?;
// Fetch history
let history = ch.get_channel_history.await?;
KV Store — Shared State
NATS KV buckets with watch, TTL, and history. Three built-in specializations:
use KvStore;
use ;
// Generic KV
let config = new
.with_history
.with_ttl_secs;
let kv = new;
kv.connect.await?;
kv.put.await?;
let val = kv.get.await?; // Some({"count": 42})
let keys = kv.keys.await?; // ["key1"]
kv.delete.await?;
// Watch for changes (real-time)
let mut stream = kv.watch.await?;
while let Some = stream.next.await
Built-in specializations
use ;
// Being registry — track agent status
let registry = new;
registry.connect.await?;
registry.register.await?;
registry.update_status.await?;
registry.heartbeat.await?;
let online = registry.get_online.await?;
// Ship config — shared configuration
let config = new;
config.connect.await?;
config.set.await?;
let level = config.get.await?;
// Health metrics — TTL-based health reporting
let health = new;
health.connect.await?;
health.report.await?;
Object Store — Large Blobs
NATS Object Store for files and snapshots with SHA-256 integrity.
use ShipObjectStore;
use NatsConfig;
let store = new;
store.connect.await?;
// Store bytes
let meta = store.put.await?;
println!;
// Store file
let meta = store.put_file.await?;
// Retrieve
let data = store.get.await?; // Option<Vec<u8>>
// List
let objects = store.list.await?;
Built-in specializations
use ;
// Being snapshots
let snaps = new;
snaps.connect.await?;
let id = snaps.take.await?;
let state = snaps.restore.await?;
// Artifact store (code, logs, docs)
let arts = new;
arts.connect.await?;
arts.store_artifact.await?;
JobQueue — Generic Job Lifecycle
Track work items through queued → running → complete | failed. Workers claim
jobs atomically (filtered by worker name). Any serde-able payload type works.
use ;
use ;
let mut queue = new;
// Submit a job targeted at a specific worker
let id = queue.submit;
// Worker claims next available job
let job = queue.claim.unwrap;
let job_id = job.id.clone;
// Complete with result (or fail with error)
queue.complete;
// queue.fail(&job_id, "compilation error");
// List and filter
let queued = queue.list;
let = queue.counts;
Real-world usage: NuSy's training queue uses JobQueue<TrainingPayload> to
coordinate GPU training runs across a fleet of machines — any agent queues jobs,
DGX claims and executes them.
NatsServiceBuilder — Request-Reply Services
Build a NATS service with routing, mutation events, and graceful shutdown:
use NatsServiceBuilder;
new
.nats_url
.handler
.handler
.on_shutdown
.run
.await?;
Catch-all dispatch + JetStream events
For services with many commands, use default_handler() to delegate to an
existing dispatch function. Add event_bus_stream() for durable event publishing:
use NatsServiceBuilder;
use StreamConfig;
let events = new;
new
.nats_url
.default_handler
.mutation_callback
.event_prefix
.event_bus_stream
.on_shutdown
.run
.await?;
Named handlers take precedence over the default handler. Events published via
event_bus_stream use JetStream with ShipEvent envelopes; without it, events
use fire-and-forget PubSub.
ServiceArgs
Standard CLI arguments for services:
use ServiceArgs;
use Parser;
let args = parse;
// args.data_dir — PathBuf, default "."
// args.nats_url — String, default "nats://localhost:4222"
Error Handling
All operations return noesis_ship::types::Result<T>:
use Error;
match result
Requirements
- Rust 1.85+ (edition 2024)
- NATS server 2.10+ (with JetStream enabled for EventBus, Channels, KV, Object Store)
License
MIT — Copyright (c) Hank Head / Congruent Systems PBC