systemprompt-events 0.14.2

Event bus, SSE broadcasters, and fan-out routing for systemprompt.io AI governance infrastructure. A2A, analytics, and context stream wiring for the MCP governance pipeline.
Documentation

Production infrastructure for AI agents

Website · Documentation · Guides · Core · Template · Discord


systemprompt-events

Crates.io Docs.rs License: BSL-1.1

Event bus, SSE broadcasters, and fan-out routing for systemprompt.io AI governance infrastructure. A2A, analytics, and context stream wiring for the MCP governance pipeline. Manages connection lifecycles, routes events to appropriate channels, and handles automatic cleanup of disconnected clients.

Layer: Infra — infrastructure primitives (database, security, events, etc.) consumed by domain crates. Part of the systemprompt-core workspace.

Overview

This crate provides a type-safe, generic event broadcasting system for real-time communication with connected clients via SSE (Server-Sent Events). It manages connection lifecycles, routes events to appropriate channels, and handles automatic cleanup of disconnected clients.

Architecture

crates/infra/events/
├── Cargo.toml
├── README.md
├── CHANGELOG.md
└── src/
    ├── lib.rs              # Broadcaster trait, EventSender alias, top-level re-exports
    ├── error.rs            # EventError / EventResult (thiserror)
    ├── sse.rs              # ToSse trait and impls for systemprompt-models event types
    └── services/
        ├── mod.rs          # Module re-exports
        ├── broadcaster.rs  # GenericBroadcaster, ConnectionGuard, keep-alive utilities
        └── routing.rs      # EventRouter and global LazyLock broadcasters

lib.rs

Entry point defining core abstractions:

  • Broadcaster trait — type-safe async broadcasting with connection management.
  • EventSender type alias — tokio::sync::mpsc::Sender<Result<Event, Infallible>>.
  • SSE_BUFFER constant — default per-connection channel capacity.

error.rs

Public error surface:

  • EventErrorSerialization (#[from] serde_json::Error) and ChannelFull { target }.
  • EventResult<T>Result<T, EventError> alias.

sse.rs

SSE serialization:

  • ToSse trait — converts a typed event into an axum::response::sse::Event.
  • Implementations for AgUiEvent, A2AEvent, ContextEvent, SystemEvent, AnalyticsEvent, and CliOutputEvent (the last frames as event: cli).

services/broadcaster.rs

Generic broadcaster implementation:

  • GenericBroadcaster<E> — thread-safe broadcaster backed by Arc<RwLock<HashMap<UserId, HashMap<ConnId, Sender>>>>.
  • ConnectionGuard<E> — RAII guard for automatic connection cleanup on drop.
  • Type aliases: AgUiBroadcaster, A2ABroadcaster, ContextBroadcaster, AnalyticsBroadcaster.
  • Keep-alive utilities: standard_keep_alive(), HEARTBEAT_INTERVAL, HEARTBEAT_JSON.

services/routing.rs

Event routing and global state:

  • EventRouter — routes events to the appropriate broadcaster(s); AG-UI and A2A events also fan out to CONTEXT_BROADCASTER.
  • Global singletons: AGUI_BROADCASTER, A2A_BROADCASTER, CONTEXT_BROADCASTER, ANALYTICS_BROADCASTER.

Event Flow

                    ┌─────────────────┐
                    │   EventRouter   │
                    └────────┬────────┘
                             │
        ┌────────────────────┼────────────────────┐
        │                    │                    │
        ▼                    ▼                    ▼
┌───────────────┐    ┌───────────────┐    ┌───────────────┐
│AGUI_BROADCASTER│    │A2A_BROADCASTER│    │CONTEXT_BROADCASTER│
└───────┬───────┘    └───────┬───────┘    └───────┬───────┘
        │                    │                    │
        ▼                    ▼                    ▼
   SSE Clients          SSE Clients          SSE Clients

AgUI and A2A events are routed to both their primary broadcaster AND the context broadcaster for aggregation.

Usage

[dependencies]
systemprompt-events = "0.14.0"
use systemprompt_events::{AGUI_BROADCASTER, Broadcaster};
use systemprompt_identifiers::UserId;

async fn active_listeners(user_id: &UserId) -> usize {
    AGUI_BROADCASTER.connection_count(user_id).await
}

Public API

Traits

Trait Methods Purpose
Broadcaster register, unregister, broadcast, connection_count, total_connections Type-safe event broadcasting

Types

Type Description
EventSender tokio::sync::mpsc::Sender<Result<Event, Infallible>>
EventError / EventResult<T> thiserror-derived error and result alias
GenericBroadcaster<E> Generic broadcaster for any ToSse + Clone + Send + Sync event
AgUiBroadcaster GenericBroadcaster<AgUiEvent>
A2ABroadcaster GenericBroadcaster<A2AEvent>
ContextBroadcaster GenericBroadcaster<ContextEvent>
AnalyticsBroadcaster GenericBroadcaster<AnalyticsEvent>
ConnectionGuard<E> RAII guard for automatic unregistration
EventRouter Routes events to appropriate broadcasters

Constants

Constant Value Purpose
HEARTBEAT_INTERVAL 15 seconds SSE keep-alive interval
HEARTBEAT_JSON {"type":"heartbeat"} Keep-alive payload

Global Singletons

Static Type Purpose
AGUI_BROADCASTER LazyLock<AgUiBroadcaster> AG-UI event broadcasts
A2A_BROADCASTER LazyLock<A2ABroadcaster> Agent-to-agent events
CONTEXT_BROADCASTER LazyLock<ContextBroadcaster> Aggregated context events
ANALYTICS_BROADCASTER LazyLock<AnalyticsBroadcaster> Analytics event tracking

Tests

Tests are located in crates/tests/unit/infra/events/ following the project convention of separating tests from source files.

Dependencies

Crate Purpose
systemprompt-models Event types (AgUiEvent, A2AEvent, ContextEvent, SystemEvent, ToSse trait)
systemprompt-identifiers UserId type
tokio Async runtime, channels, synchronization
axum SSE Event and KeepAlive types
async-trait Async trait support
tracing Structured logging

License

BSL-1.1 (Business Source License). Source-available for evaluation, testing, and non-production use. Production use requires a commercial license. Each version converts to Apache 2.0 four years after publication. See LICENSE.


systemprompt.io · Documentation · Guides · Live Demo · Template · crates.io · docs.rs · Discord

Infra layer · Own how your organization uses AI.