A3S Event
Overview
A3S Event provides a provider-agnostic API for event subscription, dispatch, and persistence across the A3S ecosystem. All backends implement the EventProvider trait, so you can swap between NATS JetStream, in-memory, or any custom provider without changing application code.
Basic Usage
use ;
use MemoryProvider;
async
Features
- Provider-Agnostic API:
EventProvidertrait abstracts all backends — publish, subscribe, query with a single interface - Pluggable Backends: Swap providers (NATS, in-memory, Redis, Kafka, etc.) without changing application code
- Publish/Subscribe: Dot-separated subject hierarchy (
events.<category>.<topic>) - Durable Subscriptions: Consumers survive disconnects and server restarts (provider-dependent)
- At-Least-Once Delivery: Explicit ack/nak via
PendingEventwith automatic redelivery on failure - Event History: Query past events with subject filtering from any provider
- High-Level EventBus: Wraps any provider with subscription management and convenience methods
- Manual or Auto Ack: Choose between auto-ack or manual ack for precise delivery control
- Category-Based Routing: Subscribe to all events in a category with wildcard subjects (
events.market.>) - In-Memory Provider: Zero-dependency provider for testing and single-process deployments
- NATS JetStream Provider: Distributed, persistent event streaming with configurable retention
- Payload Encryption: AES-256-GCM encrypt/decrypt with key rotation — protect sensitive payloads at the application layer
- State Persistence: Subscription filters survive restarts via pluggable
StateStore(JSON file or custom) - Observability: Lock-free
EventMetricscounters for publish/subscribe/error/latency — scrape withmetrics()or serialize to JSON - CloudEvents v1.0: Standard envelope with lossless
Event↔CloudEventconversion — A3S-specific fields stored as extensions - Broker/Trigger Routing: Knative-inspired event routing — publishers emit to a Broker, Triggers filter by type/source/subject/attributes and deliver to sinks in parallel
- Event Sinks: Pluggable delivery targets —
TopicSink(publish to provider),InProcessSink(call handler),LogSink(debug logging),CollectorSink(testing) - Event Sources:
CronSourceemits events on a schedule;WebhookSourceandMetricsSourcetrait definitions for custom implementations - Scaling Events: Typed payloads for Gateway ↔ Box coordination —
ScaleUpPayload,ScaleDownPayload,InstanceReadyPayload,InstanceStoppedPayload,InstanceHealthPayload - Sink-based DLQ:
SinkDlqHandlerforwards dead-lettered events through anyEventSinkwith full DLQ metadata (original subject, delivery attempts, first failure timestamp)
Providers
| Provider | Use Case | Persistence | Distribution |
|---|---|---|---|
MemoryProvider |
Testing, development, single-process | In-process only | Single process |
NatsProvider |
Production, multi-service | JetStream (file/memory) | Distributed |
Memory Provider
Zero-dependency, in-process event bus using tokio::sync::broadcast. Events are lost on restart.
use ;
let provider = new;
// Or use defaults
let provider = default;
NATS JetStream Provider
Distributed event streaming with persistent storage, durable consumers, and at-least-once delivery.
use ;
let provider = connect.await?;
Architecture
┌─────────────────────────────────────────────────────────────┐
│ EventBus │
│ High-level API: publish, subscribe, history, manage subs │
│ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ dyn EventProvider │ │
│ │ publish() | subscribe() | history() | info() │ │
│ └───────────────────────────────────────────────────────┘ │
│ │ │ │ │
│ ┌──────┴──────┐ ┌──────┴──────┐ ┌──────┴──────┐ │
│ │ Memory │ │ NATS │ │ Custom │ │
│ │ Provider │ │ Provider │ │ Provider │ │
│ │ (broadcast) │ │ (JetStream) │ │ (your impl) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
▲ │ │
│ publish │ subscribe │ subscribe
┌───────┴───────┐ ┌─────▼─────┐ ┌─────▼─────┐
│ SafeClaw │ │ Persona │ │ Gateway │
│ Backend │ │ Agent │ │ Monitor │
└───────────────┘ └───────────┘ └───────────┘
Subject Hierarchy
Events follow a dot-separated naming convention:
events.<category>.<topic>[.<subtopic>...]
Examples:
events.market.forex.usd_cny — forex rate change
events.system.deploy.gateway — service deployment
events.task.completed — task completion
events.compliance.audit.login — audit event
Wildcard patterns:
events.market.>— all market events (any depth)events.*.forex— forex events from any category
Core Types
| Type | Description |
|---|---|
EventProvider |
Core trait — all backends implement this |
Subscription |
Async event stream from any provider |
PendingEvent |
Event with ack/nak callbacks for manual acknowledgement |
EventBus |
High-level API with subscription management |
Event |
Provider-agnostic message envelope (id, subject, category, payload) |
ReceivedEvent |
Event with delivery context (sequence, num_delivered, stream) |
ProviderInfo |
Backend status (message count, bytes, consumers) |
EventEncryptor |
Trait for payload encrypt/decrypt |
Aes256GcmEncryptor |
AES-256-GCM encryptor with key rotation |
EncryptedPayload |
Encrypted envelope (key_id, nonce, ciphertext) |
StateStore |
Trait for persisting subscription state |
FileStateStore |
JSON file-based state persistence |
EventMetrics |
Lock-free atomic counters for publish, subscribe, error, latency |
MetricsSnapshot |
Serializable point-in-time view of all metrics |
CloudEvent |
CloudEvents v1.0 envelope with lossless A3S conversion |
Broker |
Event router — evaluates triggers and delivers to matching sinks |
Trigger |
Pairs a TriggerFilter with an EventSink for routing |
TriggerFilter |
Match by event type, source, subject pattern, metadata attributes |
EventSink |
Trait for event delivery targets (topic, handler, log, etc.) |
TopicSink |
Sink that publishes to an EventProvider topic |
InProcessSink |
Sink that calls an async handler closure |
EventSource |
Trait for event generators (cron, webhook, metrics) |
CronSource |
Emits events on a fixed interval with graceful shutdown |
ScalingEvent |
Trait for typed scaling payloads with to_event() conversion |
SinkDlqHandler |
DLQ handler that forwards dead letters through an EventSink |
API Reference
EventBus
use ;
use MemoryProvider;
// Create bus with any provider
let bus = new;
// Publish with convenience parameters
let event = bus.publish.await?;
// Publish a pre-built event
let seq = bus.publish_event.await?;
// List events (optionally filtered by category)
let events = bus.list_events.await?;
// Get event counts by category
let counts = bus.counts.await?;
// Manage subscriptions
bus.update_subscription.await?;
let subs = bus.create_subscriber.await?;
bus.remove_subscription.await?;
// Provider info
let info = bus.info.await?;
println!;
EventProvider Trait
use EventProvider;
// All providers implement:
provider.publish.await?;
provider.subscribe.await?;
provider.subscribe_durable.await?;
provider.history.await?;
provider.unsubscribe.await?;
provider.info.await?;
provider.build_subject; // → "events.market.forex.usd"
provider.category_subject; // → "events.market.>"
provider.name; // → "memory" | "nats"
Subscription
// Auto-ack mode
let mut sub = provider.subscribe.await?;
while let Some = sub.next.await?
// Manual ack mode
while let Some = sub.next_manual_ack.await?
NATS Configuration
let config = NatsConfig ;
Custom Providers
Implement EventProvider and Subscription to add any backend:
use ;
use ;
use Result;
use async_trait;
Then use it like any other provider:
let bus = new;
bus.publish.await?;
Development
Prerequisites
- Rust 1.75+
- NATS Server with JetStream enabled (for NATS provider tests:
nats-server -js) cargo-llvm-covfor coverage (cargo install cargo-llvm-cov)lcovfor coverage reports (brew install lcov)
Commands
Test Modules
| Module | Description |
|---|---|
types |
Event creation, serialization, metadata |
error |
Error type construction and display |
schema |
Schema registry, validation, compatibility |
dlq |
Dead letter queue handler |
provider::memory |
In-memory provider: publish, subscribe, history, wildcards |
provider::nats |
NATS provider: client, config, subscriber (requires NATS) |
store |
EventBus high-level operations |
metrics |
Lock-free counters, latency tracking, concurrent access |
cloudevents |
CloudEvent creation, serialization, Event ↔ CloudEvent conversion |
sink |
EventSink trait: TopicSink, InProcessSink, LogSink, CollectorSink, FailingSink |
broker |
TriggerFilter matching, Broker routing, parallel delivery, error handling |
source |
CronSource interval emission, graceful shutdown |
scaling |
Scaling payload serialization, ScalingEvent trait, into_event conversion |
subject |
Subject wildcard matching (shared utility) |
memory_integration |
End-to-end memory provider: publish, subscribe, encryption, schema, DLQ, state, metrics, concurrency, broker/trigger, CloudEvents, scaling, sink DLQ |
nats_integration |
End-to-end NATS tests: publish, dedup, durable sub, manual ack |
Running Tests
# Unit tests (no external dependencies)
# NATS integration tests (requires running NATS server)
# Performance benchmarks
# Test specific modules
Roadmap
A3S Event is the application-level event abstraction. It does NOT re-implement capabilities that providers (NATS, Kafka, etc.) already offer natively. The roadmap focuses on what only the abstraction layer should own.
Responsibility Boundary
| Capability | Owner | Notes |
|---|---|---|
| Retry / backoff | Provider | NATS: MaxDeliver + BackOff. Kafka: consumer retry topic. |
| Backpressure | Provider | NATS: pull consumer + MaxAckPending. Kafka: consumer poll. |
| Connection resilience | Provider | NATS: async-nats auto-reconnect. Kafka: librdkafka reconnect. |
| Consumer lag monitoring | Provider | NATS: consumer.info().num_pending. Kafka: consumer group lag. |
| Event replay by timestamp | Provider | NATS: DeliverPolicy::ByStartTime. Kafka: offsetsForTimes. |
| Exactly-once delivery | Provider | NATS: Nats-Msg-Id dedup + double ack. Kafka: idempotent producer + transactions. |
| Partitioning / sharding | Provider | NATS: subject-based routing. Kafka: partition key. |
| Stream mirroring | Provider | NATS: Mirror/Source config. Kafka: MirrorMaker. |
| Metrics (server-side) | Provider | NATS: /metrics endpoint. Kafka: JMX metrics. |
| Transport encryption | Provider | NATS/Kafka: TLS configuration. |
| Event versioning / schema | A3S Event | Provider-agnostic, application-level concern. |
| Payload encryption | A3S Event | Application-level encrypt/decrypt before publish. |
| Dead letter queue | A3S Event | Unified DLQ abstraction across providers. |
| EventBus state persistence | A3S Event | Subscription filter durability across restarts. |
| Observability integration | A3S Event | Bridge provider metrics into app-level tracing/metrics. |
| Provider config passthrough | A3S Event | Expose provider-native knobs (MaxDeliver, BackOff, etc.) |
| Integration tests | A3S Event | End-to-end verification with real providers. |
Phase 1: Provider Config Passthrough ✅
Expose provider-native capabilities through the abstraction layer without re-implementing them.
-
SubscribeOptionsstruct —max_deliver,backoff,max_ack_pending,deliver_policy,ack_wait -
PublishOptionsstruct —msg_id(dedup),expected_sequence,timeout -
DeliverPolicyenum —All,Last,New,ByStartSequence,ByStartTime,LastPerSubject -
EventProvidertrait extended withpublish_with_options(),subscribe_with_options(),subscribe_durable_with_options()(default impls for backward compatibility) - NatsProvider maps options to JetStream consumer/publish config (headers, backoff, max_deliver, deliver_policy, etc.)
- MemoryProvider uses default impls (ignores unsupported options gracefully)
-
SubscriptionFiltercarries optionalSubscribeOptions -
EventBusthreads options throughcreate_subscriber()
Phase 2: Event Versioning & Schema ✅
Application-level schema management that no provider handles.
- Add
event_typeandversionfields toEventstruct (backward-compatible defaults) -
Event::typed()constructor for versioned events -
SchemaRegistrytrait — register, validate, query schemas -
MemorySchemaRegistry— in-memory registry for development -
EventSchema— required fields validation per event type + version - Publish-time validation (optional, via
EventBus::with_schema_registry()) -
Compatibilityenum — Backward, Forward, Full, None - Schema evolution checks (
check_compatibility()) between versions
Phase 3: Operational Hardening ✅
Production reliability features that live above the provider layer.
- Dead Letter Queue —
DlqHandlertrait +MemoryDlqHandlerimpl -
DeadLetterEventwith reason and timestamp,should_dead_letter()helper -
EventBus::set_dlq_handler()for DLQ integration - Observability —
tracing::info_span!on publish and subscribe lifecycle in EventBus - Health check API —
EventProvider::health()with default impl,EventBus::health()
Phase 4: Testing & Documentation ✅
Confidence and onboarding.
- Integration tests with real NATS (9 tests — publish, history, dedup, durable subscription, options, concurrent, manual ack, health, info)
- EventBus unit tests (publish, subscribe, lifecycle, schema validation, DLQ integration — 17 tests)
- Concurrent publish/subscribe stress tests (50 concurrent publishers)
- Error path tests (all error variants display, From conversion, not-found, schema validation — 7 tests)
- Performance benchmarks (
criterion— event creation, serialization, publish throughput, history query) - Deployment guide and configuration reference (
docs/deployment.md) - Provider implementation guide (
docs/custom-providers.md)
Test summary: 190 unit tests + 30 memory integration tests + 9 NATS integration tests across 16 modules
Phase 5: Payload Encryption ✅
Application-level encrypt/decrypt for sensitive event payloads.
-
EventEncryptortrait —encrypt(payload) → Value,decrypt(encrypted) → Value -
Aes256GcmEncryptor— AES-256-GCM with random nonce per message -
EncryptedPayloadenvelope — key_id, nonce, ciphertext (base64), encrypted marker - Key rotation —
add_key(),rotate_to(), decrypt with any registered key -
EventBus::set_encryptor()— transparent encrypt on publish, decrypt onlist_events() -
EncryptedPayload::is_encrypted()— detect encrypted payloads for selective decryption - Schema validation runs on plaintext before encryption
- 10 crypto tests + 4 EventBus encryption integration tests
Phase 6: EventBus State Persistence ✅
Subscription filter durability across restarts.
-
StateStoretrait — save/load subscription filters -
FileStateStore— JSON file persistence with atomic writes (temp + rename) -
MemoryStateStore— in-memory store for testing -
EventBus::set_state_store()— auto-loads persisted subscriptions on setup - Auto-save on
update_subscription()andremove_subscription() - 7 state store tests + 5 EventBus persistence integration tests
Phase 7: Observability Integration ✅
Bridge provider metrics into application-level tracing/metrics.
-
EventMetricsstruct — lock-free atomic counters for publish, subscribe, error, DLQ, encrypt/decrypt, latency (cumulative + max) -
MetricsSnapshot— serializable point-in-time view of all counters (#[derive(Serialize)]with camelCase) -
EventBusemits metrics on publish (timing + errors), subscribe/unsubscribe, validation errors, encrypt/decrypt counts, DLQ -
EventBus::metrics()accessor for scraping - Lock-free CAS loop for max latency tracking
-
reset()to zero all counters - Integration with
tracingspans on publish and subscribe lifecycle - 10 metrics unit tests + 6 EventBus metrics integration tests
Phase 8: Knative Eventing — Event Nervous System ✅
A3S Event acts as the "nervous system" connecting Gateway (traffic brain) and Box (instance executor). It standardizes event-driven communication across the ecosystem using CloudEvents. In standalone mode, events are the primary coordination channel between Gateway and Box. In K8s mode, events complement K8s-native mechanisms (Endpoints watch, HPA) with richer application-level signals.
- CloudEvents envelope:
CloudEventstruct with CloudEvents v1.0 required + optional attributes, losslessFrom<Event>andTryFrom<CloudEvent>conversion with A3S-specific fields stored asa3s-prefixed extensions - Scaling events (standalone mode): Typed payloads with
ScalingEventtrait for Gateway ↔ Box autoscaler coordination:a3s.gateway.scale.up—ScaleUpPayload(service, desired_replicas, reason)a3s.gateway.scale.down—ScaleDownPayload(service, target_replicas, drain_timeout_secs)a3s.box.instance.ready—InstanceReadyPayload(service, instance_id, endpoint)a3s.box.instance.stopped—InstanceStoppedPayload(service, instance_id)a3s.box.instance.health—InstanceHealthPayload(instance_id, cpu_percent, memory_bytes, in_flight)
- Broker/Trigger pattern:
Brokerreceives events and evaluatesTriggerfilters (by type, source, subject pattern, metadata attributes). Matching events delivered toEventSinktargets in parallel. Fire-and-forget — delivery errors logged but don't block publishers. - Event source adapters:
EventSourcetrait +CronSource(tokio interval + Notify-based shutdown).WebhookSourceandMetricsSourcetrait-only definitions for application-level implementations. - Event sink interface:
EventSinktrait with implementations —TopicSink(publish to provider),InProcessSink(async handler closure),LogSink(tracing),CollectorSink(testing),FailingSink(error path testing) - Dead letter routing:
SinkDlqHandlerwraps anEventSink, forwards dead-lettered events with DLQ metadata (original_subject, delivery_attempts, first_failure_at).DeadLetterEventextended with optional fields (backward-compatible).EventBusstoresArc<dyn EventProvider>enablingTopicSinkprovider sharing.
License
MIT License — see LICENSE for details.