Production infrastructure for AI agents
Website · Documentation · Guides · Core · Template · Discord
systemprompt-events
Event bus, SSE broadcasters, and fan-out routing for systemprompt.io AI governance infrastructure. A2A, analytics, and context stream wiring for the MCP governance pipeline. Manages connection lifecycles, routes events to appropriate channels, and handles automatic cleanup of disconnected clients.
Layer: Infra — infrastructure primitives (database, security, events, etc.) consumed by domain crates. Part of the systemprompt-core workspace.
Overview
This crate provides a type-safe, generic event broadcasting system for real-time communication with connected clients via SSE (Server-Sent Events). It manages connection lifecycles, routes events to appropriate channels, and handles automatic cleanup of disconnected clients.
Architecture
crates/infra/events/
├── Cargo.toml
├── README.md
├── status.md
└── src/
├── lib.rs # 27 lines - Trait definitions, type aliases, re-exports
└── services/
├── mod.rs # 10 lines - Module re-exports
├── broadcaster.rs # 191 lines - GenericBroadcaster implementation
└── routing.rs # 51 lines - EventRouter, global singletons
lib.rs
Entry point defining core abstractions:
Broadcastertrait - Type-safe async broadcasting with connection managementEventSendertype alias - Channel sender for SSE events (UnboundedSender<Result<Event, Infallible>>)
services/broadcaster.rs
Generic broadcaster implementation:
GenericBroadcaster<E>- Thread-safe broadcaster usingArc<RwLock<HashMap<UserId, HashMap<ConnId, Sender>>>>ConnectionGuard<E>- RAII guard for automatic connection cleanup on drop- Type aliases:
AgUiBroadcaster,A2ABroadcaster,ContextBroadcaster,AnalyticsBroadcaster - Keep-alive utilities:
standard_keep_alive(),HEARTBEAT_INTERVAL,HEARTBEAT_JSON
services/routing.rs
Event routing and global state:
EventRouter- Routes events to appropriate broadcaster(s)- Global singletons:
AGUI_BROADCASTER,A2A_BROADCASTER,CONTEXT_BROADCASTER,ANALYTICS_BROADCASTER
Event Flow
┌─────────────────┐
│ EventRouter │
└────────┬────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│AGUI_BROADCASTER│ │A2A_BROADCASTER│ │CONTEXT_BROADCASTER│
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
▼ ▼ ▼
SSE Clients SSE Clients SSE Clients
AgUI and A2A events are routed to both their primary broadcaster AND the context broadcaster for aggregation.
Usage
[]
= "0.2.1"
use ;
use UserId;
async
Public API
Traits
| Trait | Methods | Purpose |
|---|---|---|
Broadcaster |
register, unregister, broadcast, connection_count, total_connections |
Type-safe event broadcasting |
Types
| Type | Description |
|---|---|
EventSender |
UnboundedSender<Result<Event, Infallible>> |
GenericBroadcaster<E> |
Generic broadcaster for any ToSse + Clone + Send + Sync event |
AgUiBroadcaster |
GenericBroadcaster<AgUiEvent> |
A2ABroadcaster |
GenericBroadcaster<A2AEvent> |
ContextBroadcaster |
GenericBroadcaster<ContextEvent> |
AnalyticsBroadcaster |
GenericBroadcaster<AnalyticsEvent> |
ConnectionGuard<E> |
RAII guard for automatic unregistration |
EventRouter |
Routes events to appropriate broadcasters |
Constants
| Constant | Value | Purpose |
|---|---|---|
HEARTBEAT_INTERVAL |
15 seconds | SSE keep-alive interval |
HEARTBEAT_JSON |
{"type":"heartbeat"} |
Keep-alive payload |
Global Singletons
| Static | Type | Purpose |
|---|---|---|
AGUI_BROADCASTER |
LazyLock<AgUiBroadcaster> |
AG-UI event broadcasts |
A2A_BROADCASTER |
LazyLock<A2ABroadcaster> |
Agent-to-agent events |
CONTEXT_BROADCASTER |
LazyLock<ContextBroadcaster> |
Aggregated context events |
ANALYTICS_BROADCASTER |
LazyLock<AnalyticsBroadcaster> |
Analytics event tracking |
Tests
Tests are located in crates/tests/unit/infra/events/ following the project convention of separating tests from source files.
Dependencies
| Crate | Purpose |
|---|---|
systemprompt-models |
Event types (AgUiEvent, A2AEvent, ContextEvent, SystemEvent, ToSse trait) |
systemprompt-identifiers |
UserId type |
tokio |
Async runtime, channels, synchronization |
axum |
SSE Event and KeepAlive types |
async-trait |
Async trait support |
tracing |
Structured logging |
License
BSL-1.1 (Business Source License). Source-available for evaluation, testing, and non-production use. Production use requires a commercial license. Each version converts to Apache 2.0 four years after publication. See LICENSE.
systemprompt.io · Documentation · Guides · Live Demo · Template · crates.io · docs.rs · Discord
Infra layer · Own how your organization uses AI.