Expand description
§Smith NATS JetStream Bus Library
This crate provides high-level abstractions over NATS JetStream for the Smith platform, implementing reliable message patterns with automatic retries, work queue semantics, and comprehensive stream management.
§Features
- Type-Safe Subjects: Compile-time validated NATS subject patterns
- Stream Management: Automatic stream and consumer lifecycle management
- Work Queue Semantics: Fair distribution of work across multiple consumers
- Retry Logic: Exponential backoff with jitter for failed operations
- Health Monitoring: Connection health checks and stream lag monitoring
- Sharding Support: Domain-based message routing for horizontal scaling
§Architecture
Smith uses a Phase 2 JetStream architecture optimized for high throughput and reliability:
Publishers → Raw Streams → Admission → Vetted Streams → Consumers → Results
↓
Audit Streams (compliance & debugging)§Stream Topology
| Stream | Purpose | Retention | Configuration |
|---|---|---|---|
SDLC_RAW | Intent ingestion | WorkQueue | High-throughput (500MB) |
ATOMS_VETTED | Approved intents | Interest | Ordering guarantees (1GB) |
ATOMS_RESULTS | Execution results | 48h limit | Performance tracking (2GB) |
AUDIT_SECURITY | Security events | 1 year | Compliance retention (10GB) |
§Basic Usage
ⓘ
use smith_bus::{SmithBus, ConsumerConfig};
use smith_protocol::Intent;
// Connect to NATS JetStream
let bus = SmithBus::connect("nats://localhost:4222").await?;
// Publish an intent
let intent = Intent::new(/* ... */);
bus.publish("smith.intents.raw.fs.read.v1".to_string(), &intent).await?;
// Create a consumer for processing results
let config = ConsumerConfig::default();
let consumer = bus.consumer("fs.read.v1", config).await?;
// Process messages with automatic retry and backoff
while let Some(message) = consumer.next_message().await? {
match process_message(&message.payload) {
Ok(_) => message.ack().await?,
Err(_) => message.nack().await?, // Will retry with exponential backoff
}
}§Performance Characteristics
- Throughput: 10,000+ messages/second per stream on commodity hardware
- Latency: Sub-millisecond message delivery in optimal conditions
- Reliability: At-least-once delivery with deduplication windows
- Scalability: Horizontal scaling via consumer groups and domain sharding
§Error Handling
The library implements comprehensive error handling with exponential backoff:
- Connection failures: Automatic reconnection with circuit breaker
- Message failures: Configurable retry counts with dead letter queues
- Stream errors: Graceful degradation and health status reporting
For detailed stream configuration and subject patterns, see the streams and subjects modules.
Re-exports§
pub use consumer::Consumer;pub use publisher::Publisher;pub use streams::StreamManager;pub use lag_monitor::*;pub use sharding::*;pub use subjects::*;
Modules§
- consumer
- lag_
monitor - Consumer lag monitoring and backpressure management
- publisher
- sharding
- Episode-based subject sharding for ordering guarantees
- streams
- subjects
- Canonical NATS subject patterns and builders for Smith platform
Macros§
- validate_
no_ raw_ subjects - Compile-time validation macros for preventing raw strings
Structs§
- Backoff
Config - Backoff strategy for handling failures and retries
- Consumer
Config - Configuration for creating consumers
- Health
Status - Health status of NATS/JetStream connectivity
- Message
- Message wrapper for JetStream messages with retry support
- Smith
Bus - High-level NATS JetStream client for Smith intent processing.
- Work
Queue - Work queue semantics helper for fair distribution of work
Enums§
- Consumer
Start Sequence - Where to start consuming messages from
Functions§
- create_
backoff_ strategy - Create an exponential backoff strategy from config