a3s-event 0.3.0

Pluggable event subscription, dispatch, and persistence for the A3S ecosystem
Documentation

A3S Event


Overview

A3S Event provides a provider-agnostic API for event subscription, dispatch, and persistence across the A3S ecosystem. All backends implement the EventProvider trait, so you can swap between NATS JetStream, in-memory, or any custom provider without changing application code.

Basic Usage

use a3s_event::{EventBus, Event};
use a3s_event::provider::memory::MemoryProvider;

#[tokio::main]
async fn main() -> a3s_event::Result<()> {
    // Create an event bus with any provider
    let bus = EventBus::new(MemoryProvider::default());

    // Publish an event
    let event = bus.publish(
        "market",
        "forex.usd_cny",
        "USD/CNY broke through 7.35",
        "reuters",
        serde_json::json!({"rate": 7.3521}),
    ).await?;

    println!("Published: {}", event.id);

    // Query history
    let events = bus.list_events(Some("market"), 50).await?;
    println!("Market events: {}", events.len());

    Ok(())
}

Features

  • Provider-Agnostic API: EventProvider trait abstracts all backends — publish, subscribe, query with a single interface
  • Pluggable Backends: Swap providers (NATS, in-memory, Redis, Kafka, etc.) without changing application code
  • Publish/Subscribe: Dot-separated subject hierarchy (events.<category>.<topic>)
  • Durable Subscriptions: Consumers survive disconnects and server restarts (provider-dependent)
  • At-Least-Once Delivery: Explicit ack/nak via PendingEvent with automatic redelivery on failure
  • Event History: Query past events with subject filtering from any provider
  • High-Level EventBus: Wraps any provider with subscription management and convenience methods
  • Manual or Auto Ack: Choose between auto-ack or manual ack for precise delivery control
  • Category-Based Routing: Subscribe to all events in a category with wildcard subjects (events.market.>)
  • In-Memory Provider: Zero-dependency provider for testing and single-process deployments
  • NATS JetStream Provider: Distributed, persistent event streaming with configurable retention
  • Payload Encryption: AES-256-GCM encrypt/decrypt with key rotation — protect sensitive payloads at the application layer
  • State Persistence: Subscription filters survive restarts via pluggable StateStore (JSON file or custom)
  • Observability: Lock-free EventMetrics counters for publish/subscribe/error/latency — scrape with metrics() or serialize to JSON
  • CloudEvents v1.0: Standard envelope with lossless EventCloudEvent conversion — A3S-specific fields stored as extensions
  • Broker/Trigger Routing: Knative-inspired event routing — publishers emit to a Broker, Triggers filter by type/source/subject/attributes and deliver to sinks in parallel
  • Event Sinks: Pluggable delivery targets — TopicSink (publish to provider), InProcessSink (call handler), LogSink (debug logging), CollectorSink (testing)
  • Event Sources: CronSource emits events on a schedule; WebhookSource and MetricsSource trait definitions for custom implementations
  • Scaling Events: Typed payloads for Gateway ↔ Box coordination — ScaleUpPayload, ScaleDownPayload, InstanceReadyPayload, InstanceStoppedPayload, InstanceHealthPayload
  • Sink-based DLQ: SinkDlqHandler forwards dead-lettered events through any EventSink with full DLQ metadata (original subject, delivery attempts, first failure timestamp)

Providers

Provider Use Case Persistence Distribution
MemoryProvider Testing, development, single-process In-process only Single process
NatsProvider Production, multi-service JetStream (file/memory) Distributed

Memory Provider

Zero-dependency, in-process event bus using tokio::sync::broadcast. Events are lost on restart.

use a3s_event::provider::memory::{MemoryProvider, MemoryConfig};

let provider = MemoryProvider::new(MemoryConfig {
    subject_prefix: "events".to_string(),
    max_events: 100_000,
    channel_capacity: 10_000,
});

// Or use defaults
let provider = MemoryProvider::default();

NATS JetStream Provider

Distributed event streaming with persistent storage, durable consumers, and at-least-once delivery.

use a3s_event::provider::nats::{NatsProvider, NatsConfig, StorageType};

let provider = NatsProvider::connect(NatsConfig {
    url: "nats://127.0.0.1:4222".to_string(),
    stream_name: "A3S_EVENTS".to_string(),
    subject_prefix: "events".to_string(),
    storage: StorageType::File,
    max_events: 100_000,
    max_age_secs: 604_800,  // 7 days
    ..Default::default()
}).await?;

Architecture

┌─────────────────────────────────────────────────────────────┐
│                        EventBus                             │
│  High-level API: publish, subscribe, history, manage subs   │
│                                                             │
│  ┌───────────────────────────────────────────────────────┐  │
│  │              dyn EventProvider                        │  │
│  │  publish() | subscribe() | history() | info()        │  │
│  └───────────────────────────────────────────────────────┘  │
│         │                │                │                  │
│  ┌──────┴──────┐  ┌──────┴──────┐  ┌──────┴──────┐        │
│  │   Memory    │  │    NATS     │  │   Custom    │        │
│  │  Provider   │  │  Provider   │  │  Provider   │        │
│  │ (broadcast) │  │ (JetStream) │  │ (your impl) │        │
│  └─────────────┘  └─────────────┘  └─────────────┘        │
└─────────────────────────────────────────────────────────────┘
        ▲                │               │
        │ publish        │ subscribe     │ subscribe
┌───────┴───────┐  ┌─────▼─────┐  ┌─────▼─────┐
│   SafeClaw    │  │  Persona   │  │  Gateway   │
│   Backend     │  │  Agent     │  │  Monitor   │
└───────────────┘  └───────────┘  └───────────┘

Subject Hierarchy

Events follow a dot-separated naming convention:

events.<category>.<topic>[.<subtopic>...]

Examples:
  events.market.forex.usd_cny     — forex rate change
  events.system.deploy.gateway    — service deployment
  events.task.completed           — task completion
  events.compliance.audit.login   — audit event

Wildcard patterns:

  • events.market.> — all market events (any depth)
  • events.*.forex — forex events from any category

Core Types

Type Description
EventProvider Core trait — all backends implement this
Subscription Async event stream from any provider
PendingEvent Event with ack/nak callbacks for manual acknowledgement
EventBus High-level API with subscription management
Event Provider-agnostic message envelope (id, subject, category, payload)
ReceivedEvent Event with delivery context (sequence, num_delivered, stream)
ProviderInfo Backend status (message count, bytes, consumers)
EventEncryptor Trait for payload encrypt/decrypt
Aes256GcmEncryptor AES-256-GCM encryptor with key rotation
EncryptedPayload Encrypted envelope (key_id, nonce, ciphertext)
StateStore Trait for persisting subscription state
FileStateStore JSON file-based state persistence
EventMetrics Lock-free atomic counters for publish, subscribe, error, latency
MetricsSnapshot Serializable point-in-time view of all metrics
CloudEvent CloudEvents v1.0 envelope with lossless A3S conversion
Broker Event router — evaluates triggers and delivers to matching sinks
Trigger Pairs a TriggerFilter with an EventSink for routing
TriggerFilter Match by event type, source, subject pattern, metadata attributes
EventSink Trait for event delivery targets (topic, handler, log, etc.)
TopicSink Sink that publishes to an EventProvider topic
InProcessSink Sink that calls an async handler closure
EventSource Trait for event generators (cron, webhook, metrics)
CronSource Emits events on a fixed interval with graceful shutdown
ScalingEvent Trait for typed scaling payloads with to_event() conversion
SinkDlqHandler DLQ handler that forwards dead letters through an EventSink

API Reference

EventBus

use a3s_event::{EventBus, SubscriptionFilter};
use a3s_event::provider::memory::MemoryProvider;

// Create bus with any provider
let bus = EventBus::new(MemoryProvider::default());

// Publish with convenience parameters
let event = bus.publish("market", "forex", "Rate change", "reuters", payload).await?;

// Publish a pre-built event
let seq = bus.publish_event(&event).await?;

// List events (optionally filtered by category)
let events = bus.list_events(Some("market"), 50).await?;

// Get event counts by category
let counts = bus.counts(1000).await?;

// Manage subscriptions
bus.update_subscription(SubscriptionFilter {
    subscriber_id: "analyst".to_string(),
    subjects: vec!["events.market.>".to_string()],
    durable: true,
}).await?;

let subs = bus.create_subscriber("analyst").await?;
bus.remove_subscription("analyst").await?;

// Provider info
let info = bus.info().await?;
println!("{}: {} messages", info.provider, info.messages);

EventProvider Trait

use a3s_event::provider::EventProvider;

// All providers implement:
provider.publish(&event).await?;
provider.subscribe("events.market.>").await?;
provider.subscribe_durable("consumer-1", "events.market.>").await?;
provider.history(Some("events.market.>"), 100).await?;
provider.unsubscribe("consumer-1").await?;
provider.info().await?;
provider.build_subject("market", "forex.usd");  // → "events.market.forex.usd"
provider.category_subject("market");             // → "events.market.>"
provider.name();                                 // → "memory" | "nats"

Subscription

// Auto-ack mode
let mut sub = provider.subscribe("events.>").await?;
while let Some(received) = sub.next().await? {
    println!("{}: {}", received.event.id, received.event.summary);
}

// Manual ack mode
while let Some(pending) = sub.next_manual_ack().await? {
    match process(&pending.received.event) {
        Ok(_) => pending.ack().await?,
        Err(_) => pending.nak().await?,  // request redelivery
    }
}

NATS Configuration

let config = NatsConfig {
    url: "nats://127.0.0.1:4222".to_string(),
    token: None,
    credentials_path: None,
    stream_name: "A3S_EVENTS".to_string(),
    subject_prefix: "events".to_string(),
    storage: StorageType::File,
    max_events: 100_000,
    max_age_secs: 604_800,  // 7 days
    max_bytes: 0,            // unlimited
    connect_timeout_secs: 5,
    request_timeout_secs: 10,
};

Custom Providers

Implement EventProvider and Subscription to add any backend:

use a3s_event::provider::{EventProvider, Subscription, PendingEvent, ProviderInfo};
use a3s_event::types::{Event, ReceivedEvent};
use a3s_event::Result;
use async_trait::async_trait;

pub struct RedisProvider { /* ... */ }

#[async_trait]
impl EventProvider for RedisProvider {
    async fn publish(&self, event: &Event) -> Result<u64> {
        // Publish to Redis Streams
        todo!()
    }

    async fn subscribe_durable(
        &self,
        consumer_name: &str,
        filter_subject: &str,
    ) -> Result<Box<dyn Subscription>> {
        // Create Redis consumer group
        todo!()
    }

    async fn subscribe(&self, filter_subject: &str) -> Result<Box<dyn Subscription>> {
        todo!()
    }

    async fn history(
        &self,
        filter_subject: Option<&str>,
        limit: usize,
    ) -> Result<Vec<Event>> {
        todo!()
    }

    async fn unsubscribe(&self, consumer_name: &str) -> Result<()> {
        todo!()
    }

    async fn info(&self) -> Result<ProviderInfo> {
        todo!()
    }

    fn build_subject(&self, category: &str, topic: &str) -> String {
        format!("events.{}.{}", category, topic)
    }

    fn category_subject(&self, category: &str) -> String {
        format!("events.{}.>", category)
    }

    fn name(&self) -> &str {
        "redis"
    }
}

Then use it like any other provider:

let bus = EventBus::new(RedisProvider::new(config));
bus.publish("market", "forex", "Rate change", "source", payload).await?;

Development

Prerequisites

  • Rust 1.75+
  • NATS Server with JetStream enabled (for NATS provider tests: nats-server -js)
  • cargo-llvm-cov for coverage (cargo install cargo-llvm-cov)
  • lcov for coverage reports (brew install lcov)

Commands

just build              # Build the project
just test               # Run all tests with progress display
just test-v             # Run tests with verbose output
just test-one NAME      # Run a specific test
just test-integration   # NATS integration tests (requires nats-server -js)
just bench              # Run performance benchmarks
just bench-one NAME     # Run specific benchmark
just test-cov           # Run tests with coverage report
just cov                # Coverage with lcov summary
just cov-html           # Coverage with HTML report (opens browser)
just cov-table          # Coverage with file-by-file table
just cov-ci             # Generate lcov.info for CI
just lint               # Run clippy
just fmt                # Format code
just ci                 # Full CI check (fmt + lint + test)
just doc                # Generate and open docs

Test Modules

Module Description
types Event creation, serialization, metadata
error Error type construction and display
schema Schema registry, validation, compatibility
dlq Dead letter queue handler
provider::memory In-memory provider: publish, subscribe, history, wildcards
provider::nats NATS provider: client, config, subscriber (requires NATS)
store EventBus high-level operations
metrics Lock-free counters, latency tracking, concurrent access
cloudevents CloudEvent creation, serialization, Event ↔ CloudEvent conversion
sink EventSink trait: TopicSink, InProcessSink, LogSink, CollectorSink, FailingSink
broker TriggerFilter matching, Broker routing, parallel delivery, error handling
source CronSource interval emission, graceful shutdown
scaling Scaling payload serialization, ScalingEvent trait, into_event conversion
subject Subject wildcard matching (shared utility)
memory_integration End-to-end memory provider: publish, subscribe, encryption, schema, DLQ, state, metrics, concurrency, broker/trigger, CloudEvents, scaling, sink DLQ
nats_integration End-to-end NATS tests: publish, dedup, durable sub, manual ack

Running Tests

# Unit tests (no external dependencies)
just test

# NATS integration tests (requires running NATS server)
nats-server -js
just test-integration

# Performance benchmarks
just bench

# Test specific modules
just test-memory     # In-memory provider tests
just test-nats       # NATS provider tests
just test-types      # Event type tests
just test-store      # EventBus tests

Roadmap

A3S Event is the application-level event abstraction. It does NOT re-implement capabilities that providers (NATS, Kafka, etc.) already offer natively. The roadmap focuses on what only the abstraction layer should own.

Responsibility Boundary

Capability Owner Notes
Retry / backoff Provider NATS: MaxDeliver + BackOff. Kafka: consumer retry topic.
Backpressure Provider NATS: pull consumer + MaxAckPending. Kafka: consumer poll.
Connection resilience Provider NATS: async-nats auto-reconnect. Kafka: librdkafka reconnect.
Consumer lag monitoring Provider NATS: consumer.info().num_pending. Kafka: consumer group lag.
Event replay by timestamp Provider NATS: DeliverPolicy::ByStartTime. Kafka: offsetsForTimes.
Exactly-once delivery Provider NATS: Nats-Msg-Id dedup + double ack. Kafka: idempotent producer + transactions.
Partitioning / sharding Provider NATS: subject-based routing. Kafka: partition key.
Stream mirroring Provider NATS: Mirror/Source config. Kafka: MirrorMaker.
Metrics (server-side) Provider NATS: /metrics endpoint. Kafka: JMX metrics.
Transport encryption Provider NATS/Kafka: TLS configuration.
Event versioning / schema A3S Event Provider-agnostic, application-level concern.
Payload encryption A3S Event Application-level encrypt/decrypt before publish.
Dead letter queue A3S Event Unified DLQ abstraction across providers.
EventBus state persistence A3S Event Subscription filter durability across restarts.
Observability integration A3S Event Bridge provider metrics into app-level tracing/metrics.
Provider config passthrough A3S Event Expose provider-native knobs (MaxDeliver, BackOff, etc.)
Integration tests A3S Event End-to-end verification with real providers.

Phase 1: Provider Config Passthrough ✅

Expose provider-native capabilities through the abstraction layer without re-implementing them.

  • SubscribeOptions struct — max_deliver, backoff, max_ack_pending, deliver_policy, ack_wait
  • PublishOptions struct — msg_id (dedup), expected_sequence, timeout
  • DeliverPolicy enum — All, Last, New, ByStartSequence, ByStartTime, LastPerSubject
  • EventProvider trait extended with publish_with_options(), subscribe_with_options(), subscribe_durable_with_options() (default impls for backward compatibility)
  • NatsProvider maps options to JetStream consumer/publish config (headers, backoff, max_deliver, deliver_policy, etc.)
  • MemoryProvider uses default impls (ignores unsupported options gracefully)
  • SubscriptionFilter carries optional SubscribeOptions
  • EventBus threads options through create_subscriber()

Phase 2: Event Versioning & Schema ✅

Application-level schema management that no provider handles.

  • Add event_type and version fields to Event struct (backward-compatible defaults)
  • Event::typed() constructor for versioned events
  • SchemaRegistry trait — register, validate, query schemas
  • MemorySchemaRegistry — in-memory registry for development
  • EventSchema — required fields validation per event type + version
  • Publish-time validation (optional, via EventBus::with_schema_registry())
  • Compatibility enum — Backward, Forward, Full, None
  • Schema evolution checks (check_compatibility()) between versions

Phase 3: Operational Hardening ✅

Production reliability features that live above the provider layer.

  • Dead Letter Queue — DlqHandler trait + MemoryDlqHandler impl
  • DeadLetterEvent with reason and timestamp, should_dead_letter() helper
  • EventBus::set_dlq_handler() for DLQ integration
  • Observability — tracing::info_span! on publish and subscribe lifecycle in EventBus
  • Health check API — EventProvider::health() with default impl, EventBus::health()

Phase 4: Testing & Documentation ✅

Confidence and onboarding.

  • Integration tests with real NATS (9 tests — publish, history, dedup, durable subscription, options, concurrent, manual ack, health, info)
  • EventBus unit tests (publish, subscribe, lifecycle, schema validation, DLQ integration — 17 tests)
  • Concurrent publish/subscribe stress tests (50 concurrent publishers)
  • Error path tests (all error variants display, From conversion, not-found, schema validation — 7 tests)
  • Performance benchmarks (criterion — event creation, serialization, publish throughput, history query)
  • Deployment guide and configuration reference (docs/deployment.md)
  • Provider implementation guide (docs/custom-providers.md)

Test summary: 190 unit tests + 30 memory integration tests + 9 NATS integration tests across 16 modules

Phase 5: Payload Encryption ✅

Application-level encrypt/decrypt for sensitive event payloads.

  • EventEncryptor trait — encrypt(payload) → Value, decrypt(encrypted) → Value
  • Aes256GcmEncryptor — AES-256-GCM with random nonce per message
  • EncryptedPayload envelope — key_id, nonce, ciphertext (base64), encrypted marker
  • Key rotation — add_key(), rotate_to(), decrypt with any registered key
  • EventBus::set_encryptor() — transparent encrypt on publish, decrypt on list_events()
  • EncryptedPayload::is_encrypted() — detect encrypted payloads for selective decryption
  • Schema validation runs on plaintext before encryption
  • 10 crypto tests + 4 EventBus encryption integration tests

Phase 6: EventBus State Persistence ✅

Subscription filter durability across restarts.

  • StateStore trait — save/load subscription filters
  • FileStateStore — JSON file persistence with atomic writes (temp + rename)
  • MemoryStateStore — in-memory store for testing
  • EventBus::set_state_store() — auto-loads persisted subscriptions on setup
  • Auto-save on update_subscription() and remove_subscription()
  • 7 state store tests + 5 EventBus persistence integration tests

Phase 7: Observability Integration ✅

Bridge provider metrics into application-level tracing/metrics.

  • EventMetrics struct — lock-free atomic counters for publish, subscribe, error, DLQ, encrypt/decrypt, latency (cumulative + max)
  • MetricsSnapshot — serializable point-in-time view of all counters (#[derive(Serialize)] with camelCase)
  • EventBus emits metrics on publish (timing + errors), subscribe/unsubscribe, validation errors, encrypt/decrypt counts, DLQ
  • EventBus::metrics() accessor for scraping
  • Lock-free CAS loop for max latency tracking
  • reset() to zero all counters
  • Integration with tracing spans on publish and subscribe lifecycle
  • 10 metrics unit tests + 6 EventBus metrics integration tests

Phase 8: Knative Eventing — Event Nervous System ✅

A3S Event acts as the "nervous system" connecting Gateway (traffic brain) and Box (instance executor). It standardizes event-driven communication across the ecosystem using CloudEvents. In standalone mode, events are the primary coordination channel between Gateway and Box. In K8s mode, events complement K8s-native mechanisms (Endpoints watch, HPA) with richer application-level signals.

  • CloudEvents envelope: CloudEvent struct with CloudEvents v1.0 required + optional attributes, lossless From<Event> and TryFrom<CloudEvent> conversion with A3S-specific fields stored as a3s-prefixed extensions
  • Scaling events (standalone mode): Typed payloads with ScalingEvent trait for Gateway ↔ Box autoscaler coordination:
    • a3s.gateway.scale.upScaleUpPayload (service, desired_replicas, reason)
    • a3s.gateway.scale.downScaleDownPayload (service, target_replicas, drain_timeout_secs)
    • a3s.box.instance.readyInstanceReadyPayload (service, instance_id, endpoint)
    • a3s.box.instance.stoppedInstanceStoppedPayload (service, instance_id)
    • a3s.box.instance.healthInstanceHealthPayload (instance_id, cpu_percent, memory_bytes, in_flight)
  • Broker/Trigger pattern: Broker receives events and evaluates Trigger filters (by type, source, subject pattern, metadata attributes). Matching events delivered to EventSink targets in parallel. Fire-and-forget — delivery errors logged but don't block publishers.
  • Event source adapters: EventSource trait + CronSource (tokio interval + Notify-based shutdown). WebhookSource and MetricsSource trait-only definitions for application-level implementations.
  • Event sink interface: EventSink trait with implementations — TopicSink (publish to provider), InProcessSink (async handler closure), LogSink (tracing), CollectorSink (testing), FailingSink (error path testing)
  • Dead letter routing: SinkDlqHandler wraps an EventSink, forwards dead-lettered events with DLQ metadata (original_subject, delivery_attempts, first_failure_at). DeadLetterEvent extended with optional fields (backward-compatible). EventBus stores Arc<dyn EventProvider> enabling TopicSink provider sharing.

License

MIT License — see LICENSE for details.