smith-bus 0.1.2

NATS JetStream helpers for intent execution
Documentation

Smith NATS JetStream Bus Library

This crate provides high-level abstractions over NATS JetStream for the Smith platform, implementing reliable message patterns with automatic retries, work queue semantics, and comprehensive stream management.

Features

  • Type-Safe Subjects: Compile-time validated NATS subject patterns
  • Stream Management: Automatic stream and consumer lifecycle management
  • Work Queue Semantics: Fair distribution of work across multiple consumers
  • Retry Logic: Exponential backoff with jitter for failed operations
  • Health Monitoring: Connection health checks and stream lag monitoring
  • Sharding Support: Domain-based message routing for horizontal scaling

Architecture

Smith uses a Phase 2 JetStream architecture optimized for high throughput and reliability:

Publishers → Raw Streams → Admission → Vetted Streams → Consumers → Results
                     ↓
                Audit Streams (compliance & debugging)

Stream Topology

Stream Purpose Retention Configuration
SDLC_RAW Intent ingestion WorkQueue High-throughput (500MB)
ATOMS_VETTED Approved intents Interest Ordering guarantees (1GB)
ATOMS_RESULTS Execution results 48h limit Performance tracking (2GB)
AUDIT_SECURITY Security events 1 year Compliance retention (10GB)

Basic Usage

use smith_bus::{SmithBus, ConsumerConfig};
use smith_protocol::Intent;

# async fn example() -> anyhow::Result<()> {
// Connect to NATS JetStream
let bus = SmithBus::connect("nats://localhost:4222").await?;

// Publish an intent
let intent = Intent::new(/* ... */);
bus.publish("smith.intents.raw.fs.read.v1".to_string(), &intent).await?;

// Create a consumer for processing results
let config = ConsumerConfig::default();
let consumer = bus.consumer("fs.read.v1", config).await?;

// Process messages with automatic retry and backoff
while let Some(message) = consumer.next_message().await? {
    match process_message(&message.payload) {
        Ok(_) => message.ack().await?,
        Err(_) => message.nack().await?, // Will retry with exponential backoff
    }
}
# Ok(())
# }

Performance Characteristics

  • Throughput: 10,000+ messages/second per stream on commodity hardware
  • Latency: Sub-millisecond message delivery in optimal conditions
  • Reliability: At-least-once delivery with deduplication windows
  • Scalability: Horizontal scaling via consumer groups and domain sharding

Error Handling

The library implements comprehensive error handling with exponential backoff:

  • Connection failures: Automatic reconnection with circuit breaker
  • Message failures: Configurable retry counts with dead letter queues
  • Stream errors: Graceful degradation and health status reporting

For detailed stream configuration and subject patterns, see the [streams] and [subjects] modules.