Skip to main content

Crate smith_bus

Crate smith_bus 

Source
Expand description

§Smith NATS JetStream Bus Library

This crate provides high-level abstractions over NATS JetStream for the Smith platform, implementing reliable message patterns with automatic retries, work queue semantics, and comprehensive stream management.

§Features

  • Type-Safe Subjects: Compile-time validated NATS subject patterns
  • Stream Management: Automatic stream and consumer lifecycle management
  • Work Queue Semantics: Fair distribution of work across multiple consumers
  • Retry Logic: Exponential backoff with jitter for failed operations
  • Health Monitoring: Connection health checks and stream lag monitoring
  • Sharding Support: Domain-based message routing for horizontal scaling

§Architecture

Smith uses a Phase 2 JetStream architecture optimized for high throughput and reliability:

Publishers → Raw Streams → Admission → Vetted Streams → Consumers → Results
                     ↓
                Audit Streams (compliance & debugging)

§Stream Topology

StreamPurposeRetentionConfiguration
SDLC_RAWIntent ingestionWorkQueueHigh-throughput (500MB)
ATOMS_VETTEDApproved intentsInterestOrdering guarantees (1GB)
ATOMS_RESULTSExecution results48h limitPerformance tracking (2GB)
AUDIT_SECURITYSecurity events1 yearCompliance retention (10GB)

§Basic Usage

use smith_bus::{SmithBus, ConsumerConfig};
use smith_protocol::Intent;

// Connect to NATS JetStream
let bus = SmithBus::connect("nats://localhost:4222").await?;

// Publish an intent
let intent = Intent::new(/* ... */);
bus.publish("smith.intents.raw.fs.read.v1".to_string(), &intent).await?;

// Create a consumer for processing results
let config = ConsumerConfig::default();
let consumer = bus.consumer("fs.read.v1", config).await?;

// Process messages with automatic retry and backoff
while let Some(message) = consumer.next_message().await? {
    match process_message(&message.payload) {
        Ok(_) => message.ack().await?,
        Err(_) => message.nack().await?, // Will retry with exponential backoff
    }
}

§Performance Characteristics

  • Throughput: 10,000+ messages/second per stream on commodity hardware
  • Latency: Sub-millisecond message delivery in optimal conditions
  • Reliability: At-least-once delivery with deduplication windows
  • Scalability: Horizontal scaling via consumer groups and domain sharding

§Error Handling

The library implements comprehensive error handling with exponential backoff:

  • Connection failures: Automatic reconnection with circuit breaker
  • Message failures: Configurable retry counts with dead letter queues
  • Stream errors: Graceful degradation and health status reporting

For detailed stream configuration and subject patterns, see the streams and subjects modules.

Re-exports§

pub use consumer::Consumer;
pub use publisher::Publisher;
pub use streams::StreamManager;
pub use lag_monitor::*;
pub use sharding::*;
pub use subjects::*;

Modules§

consumer
lag_monitor
Consumer lag monitoring and backpressure management
publisher
sharding
Episode-based subject sharding for ordering guarantees
streams
subjects
Canonical NATS subject patterns and builders for Smith platform

Macros§

validate_no_raw_subjects
Compile-time validation macros for preventing raw strings

Structs§

BackoffConfig
Backoff strategy for handling failures and retries
ConsumerConfig
Configuration for creating consumers
HealthStatus
Health status of NATS/JetStream connectivity
Message
Message wrapper for JetStream messages with retry support
SmithBus
High-level NATS JetStream client for Smith intent processing.
WorkQueue
Work queue semantics helper for fair distribution of work

Enums§

ConsumerStartSequence
Where to start consuming messages from

Functions§

create_backoff_strategy
Create an exponential backoff strategy from config