Production infrastructure for AI agents
Website · Documentation · Guides · Core · Template · Discord
systemprompt-events
Event bus, SSE broadcasters, and fan-out routing for systemprompt.io AI governance infrastructure. A2A, analytics, and context stream wiring for the MCP governance pipeline. Manages connection lifecycles, routes events to appropriate channels, and handles automatic cleanup of disconnected clients.
Layer: Infra — infrastructure primitives (database, security, events, etc.) consumed by domain crates. Part of the systemprompt-core workspace.
Overview
This crate provides a type-safe, generic event broadcasting system for real-time communication with connected clients via SSE (Server-Sent Events). It manages connection lifecycles, routes events to appropriate channels, and handles automatic cleanup of disconnected clients.
Architecture
crates/infra/events/
├── Cargo.toml
├── README.md
├── CHANGELOG.md
└── src/
├── lib.rs # Broadcaster trait, EventSender alias, top-level re-exports
├── error.rs # EventError / EventResult (thiserror)
├── sse.rs # ToSse trait and impls for systemprompt-models event types
└── services/
├── mod.rs # Module re-exports
├── broadcaster.rs # GenericBroadcaster, ConnectionGuard, keep-alive utilities
└── routing.rs # EventRouter and global LazyLock broadcasters
lib.rs
Entry point defining core abstractions:
Broadcastertrait — type-safe async broadcasting with connection management.EventSendertype alias —tokio::sync::mpsc::Sender<Result<Event, Infallible>>.SSE_BUFFERconstant — default per-connection channel capacity.
error.rs
Public error surface:
EventError—Serialization(#[from] serde_json::Error) andChannelFull { target }.EventResult<T>—Result<T, EventError>alias.
sse.rs
SSE serialization:
ToSsetrait — converts a typed event into anaxum::response::sse::Event.- Implementations for
AgUiEvent,A2AEvent,ContextEvent,SystemEvent,AnalyticsEvent, andCliOutputEvent(the last frames asevent: cli).
services/broadcaster.rs
Generic broadcaster implementation:
GenericBroadcaster<E>— thread-safe broadcaster backed byArc<RwLock<HashMap<UserId, HashMap<ConnId, Sender>>>>.ConnectionGuard<E>— RAII guard for automatic connection cleanup on drop.- Type aliases:
AgUiBroadcaster,A2ABroadcaster,ContextBroadcaster,AnalyticsBroadcaster. - Keep-alive utilities:
standard_keep_alive(),HEARTBEAT_INTERVAL,HEARTBEAT_JSON.
services/routing.rs
Event routing and global state:
EventRouter— routes events to the appropriate broadcaster(s); AG-UI and A2A events also fan out toCONTEXT_BROADCASTER.- Global singletons:
AGUI_BROADCASTER,A2A_BROADCASTER,CONTEXT_BROADCASTER,ANALYTICS_BROADCASTER.
Event Flow
┌─────────────────┐
│ EventRouter │
└────────┬────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│AGUI_BROADCASTER│ │A2A_BROADCASTER│ │CONTEXT_BROADCASTER│
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
▼ ▼ ▼
SSE Clients SSE Clients SSE Clients
AgUI and A2A events are routed to both their primary broadcaster AND the context broadcaster for aggregation.
Usage
[]
= "0.9.2"
use ;
use UserId;
async
Public API
Traits
| Trait | Methods | Purpose |
|---|---|---|
Broadcaster |
register, unregister, broadcast, connection_count, total_connections |
Type-safe event broadcasting |
Types
| Type | Description |
|---|---|
EventSender |
tokio::sync::mpsc::Sender<Result<Event, Infallible>> |
EventError / EventResult<T> |
thiserror-derived error and result alias |
GenericBroadcaster<E> |
Generic broadcaster for any ToSse + Clone + Send + Sync event |
AgUiBroadcaster |
GenericBroadcaster<AgUiEvent> |
A2ABroadcaster |
GenericBroadcaster<A2AEvent> |
ContextBroadcaster |
GenericBroadcaster<ContextEvent> |
AnalyticsBroadcaster |
GenericBroadcaster<AnalyticsEvent> |
ConnectionGuard<E> |
RAII guard for automatic unregistration |
EventRouter |
Routes events to appropriate broadcasters |
Constants
| Constant | Value | Purpose |
|---|---|---|
HEARTBEAT_INTERVAL |
15 seconds | SSE keep-alive interval |
HEARTBEAT_JSON |
{"type":"heartbeat"} |
Keep-alive payload |
Global Singletons
| Static | Type | Purpose |
|---|---|---|
AGUI_BROADCASTER |
LazyLock<AgUiBroadcaster> |
AG-UI event broadcasts |
A2A_BROADCASTER |
LazyLock<A2ABroadcaster> |
Agent-to-agent events |
CONTEXT_BROADCASTER |
LazyLock<ContextBroadcaster> |
Aggregated context events |
ANALYTICS_BROADCASTER |
LazyLock<AnalyticsBroadcaster> |
Analytics event tracking |
Tests
Tests are located in crates/tests/unit/infra/events/ following the project convention of separating tests from source files.
Dependencies
| Crate | Purpose |
|---|---|
systemprompt-models |
Event types (AgUiEvent, A2AEvent, ContextEvent, SystemEvent, ToSse trait) |
systemprompt-identifiers |
UserId type |
tokio |
Async runtime, channels, synchronization |
axum |
SSE Event and KeepAlive types |
async-trait |
Async trait support |
tracing |
Structured logging |
License
BSL-1.1 (Business Source License). Source-available for evaluation, testing, and non-production use. Production use requires a commercial license. Each version converts to Apache 2.0 four years after publication. See LICENSE.
systemprompt.io · Documentation · Guides · Live Demo · Template · crates.io · docs.rs · Discord
Infra layer · Own how your organization uses AI.