Skip to main content

systemprompt_events/services/
bridge.rs

1//! Cross-replica event relay over Postgres `LISTEN`/`NOTIFY`.
2//!
3//! In a multi-replica deployment the in-process [`crate::EventRouter`]
4//! broadcasters only reach SSE connections held by the current process.
5//! [`PostgresEventBridge`] closes that gap: every replica runs one bridge
6//! task that `LISTEN`s on [`OUTBOX_CHANNEL`]. When any replica routes an
7//! event it appends a row to `event_outbox` and emits a `NOTIFY` carrying
8//! that row's id. Each bridge receives the notification, loads the row,
9//! deserializes the payload by its `channel`, and re-injects the event
10//! through the router's *local-only* path — which deliberately does **not**
11//! touch the outbox, so the relay cannot loop.
12//!
13//! The notification payload is only the row id (a UUID string) to stay
14//! well under Postgres' ~8 KB `NOTIFY` limit; the event body lives in the
15//! `jsonb` column.
16
17use std::time::Duration;
18
19use sqlx::PgPool;
20use sqlx::postgres::PgListener;
21use tokio::task::JoinHandle;
22use tracing::{debug, error, info, warn};
23
24use super::repository::EventOutboxRepository;
25use super::routing::{EventRouter, OUTBOX_CHANNEL, OutboxChannel};
26use systemprompt_identifiers::UserId;
27use systemprompt_models::{A2AEvent, AgUiEvent, AnalyticsEvent, SystemEvent};
28
29const OUTBOX_RETENTION: Duration = Duration::from_secs(3600);
30const PRUNE_INTERVAL: Duration = Duration::from_secs(300);
31
32/// Background relay that mirrors `event_outbox` rows into the local
33/// broadcasters of the replica it runs on.
34#[derive(Debug, Clone)]
35pub struct PostgresEventBridge {
36    pool: PgPool,
37}
38
39impl PostgresEventBridge {
40    #[must_use]
41    pub const fn new(pool: PgPool) -> Self {
42        Self { pool }
43    }
44
45    /// Abort the returned handle to stop the relay.
46    pub fn start(self) -> JoinHandle<()> {
47        EventRouter::install_relay(self.pool.clone());
48        tokio::spawn(async move {
49            self.run().await;
50        })
51    }
52
53    async fn run(self) {
54        let mut prune_tick = tokio::time::interval(PRUNE_INTERVAL);
55        prune_tick.tick().await;
56
57        loop {
58            let mut listener = match PgListener::connect_with(&self.pool).await {
59                Ok(listener) => listener,
60                Err(e) => {
61                    error!(error = %e, "event bridge: failed to open Postgres listener; retrying");
62                    tokio::time::sleep(Duration::from_secs(5)).await;
63                    continue;
64                },
65            };
66            if let Err(e) = listener.listen(OUTBOX_CHANNEL).await {
67                error!(error = %e, channel = OUTBOX_CHANNEL, "event bridge: LISTEN failed; retrying");
68                tokio::time::sleep(Duration::from_secs(5)).await;
69                continue;
70            }
71            info!(
72                channel = OUTBOX_CHANNEL,
73                "event bridge: listening for cross-replica events"
74            );
75
76            loop {
77                tokio::select! {
78                    notification = listener.recv() => match notification {
79                        Ok(notification) => {
80                            self.deliver(notification.payload()).await;
81                        },
82                        Err(e) => {
83                            warn!(error = %e, "event bridge: listener connection lost; reconnecting");
84                            break;
85                        },
86                    },
87                    _ = prune_tick.tick() => {
88                        self.prune().await;
89                    },
90                }
91            }
92        }
93    }
94
95    async fn deliver(&self, row_id: &str) {
96        let repo = EventOutboxRepository::new(self.pool.clone());
97        let row = match repo.find(row_id).await {
98            Ok(Some(row)) => row,
99            Ok(None) => {
100                debug!(row_id, "event bridge: outbox row already pruned; skipping");
101                return;
102            },
103            Err(e) => {
104                error!(error = %e, row_id, "event bridge: failed to load outbox row");
105                return;
106            },
107        };
108
109        let Some(channel) = OutboxChannel::parse(&row.channel) else {
110            error!(channel = %row.channel, row_id, "event bridge: unknown outbox channel");
111            return;
112        };
113        Self::fan_in(channel, &row.user_id, row.payload).await;
114    }
115
116    pub(super) async fn fan_in(
117        channel: OutboxChannel,
118        user_id: &UserId,
119        // JSON: outbox payload is polymorphic by channel; decoded into the
120        // matching typed event immediately below.
121        payload: serde_json::Value,
122    ) {
123        match channel {
124            OutboxChannel::AgUi => match serde_json::from_value::<AgUiEvent>(payload) {
125                Ok(event) => {
126                    EventRouter::route_agui_local(user_id, event).await;
127                },
128                Err(e) => error!(error = %e, "event bridge: failed to decode AG-UI event"),
129            },
130            OutboxChannel::A2A => match serde_json::from_value::<A2AEvent>(payload) {
131                Ok(event) => {
132                    EventRouter::route_a2a_local(user_id, event).await;
133                },
134                Err(e) => error!(error = %e, "event bridge: failed to decode A2A event"),
135            },
136            OutboxChannel::System => match serde_json::from_value::<SystemEvent>(payload) {
137                Ok(event) => {
138                    EventRouter::route_system_local(user_id, event).await;
139                },
140                Err(e) => error!(error = %e, "event bridge: failed to decode system event"),
141            },
142            OutboxChannel::Analytics => match serde_json::from_value::<AnalyticsEvent>(payload) {
143                Ok(event) => {
144                    EventRouter::route_analytics_local(user_id, event).await;
145                },
146                Err(e) => error!(error = %e, "event bridge: failed to decode analytics event"),
147            },
148        }
149    }
150
151    async fn prune(&self) {
152        let cutoff = chrono::Utc::now()
153            - chrono::Duration::from_std(OUTBOX_RETENTION)
154                .unwrap_or_else(|_| chrono::Duration::seconds(3600));
155        let repo = EventOutboxRepository::new(self.pool.clone());
156        match repo.prune(cutoff).await {
157            Ok(deleted) => {
158                if deleted > 0 {
159                    debug!(deleted, "event bridge: pruned expired outbox rows");
160                }
161            },
162            Err(e) => error!(error = %e, "event bridge: outbox prune failed"),
163        }
164    }
165}