systemprompt_events/services/
bridge.rs1use std::time::Duration;
18
19use sqlx::PgPool;
20use sqlx::postgres::PgListener;
21use tokio::task::JoinHandle;
22use tracing::{debug, error, info, warn};
23
24use super::repository::EventOutboxRepository;
25use super::routing::{EventRouter, OUTBOX_CHANNEL, OutboxChannel};
26use systemprompt_identifiers::UserId;
27use systemprompt_models::{A2AEvent, AgUiEvent, AnalyticsEvent, SystemEvent};
28
29const OUTBOX_RETENTION: Duration = Duration::from_secs(3600);
30const PRUNE_INTERVAL: Duration = Duration::from_secs(300);
31
32#[derive(Debug, Clone)]
35pub struct PostgresEventBridge {
36 pool: PgPool,
37}
38
39impl PostgresEventBridge {
40 #[must_use]
41 pub const fn new(pool: PgPool) -> Self {
42 Self { pool }
43 }
44
45 pub fn start(self) -> JoinHandle<()> {
47 EventRouter::install_relay(self.pool.clone());
48 tokio::spawn(async move {
49 self.run().await;
50 })
51 }
52
53 async fn run(self) {
54 let mut prune_tick = tokio::time::interval(PRUNE_INTERVAL);
55 prune_tick.tick().await;
56
57 loop {
58 let mut listener = match PgListener::connect_with(&self.pool).await {
59 Ok(listener) => listener,
60 Err(e) => {
61 error!(error = %e, "event bridge: failed to open Postgres listener; retrying");
62 tokio::time::sleep(Duration::from_secs(5)).await;
63 continue;
64 },
65 };
66 if let Err(e) = listener.listen(OUTBOX_CHANNEL).await {
67 error!(error = %e, channel = OUTBOX_CHANNEL, "event bridge: LISTEN failed; retrying");
68 tokio::time::sleep(Duration::from_secs(5)).await;
69 continue;
70 }
71 info!(
72 channel = OUTBOX_CHANNEL,
73 "event bridge: listening for cross-replica events"
74 );
75
76 loop {
77 tokio::select! {
78 notification = listener.recv() => match notification {
79 Ok(notification) => {
80 self.deliver(notification.payload()).await;
81 },
82 Err(e) => {
83 warn!(error = %e, "event bridge: listener connection lost; reconnecting");
84 break;
85 },
86 },
87 _ = prune_tick.tick() => {
88 self.prune().await;
89 },
90 }
91 }
92 }
93 }
94
95 async fn deliver(&self, row_id: &str) {
96 let repo = EventOutboxRepository::new(self.pool.clone());
97 let row = match repo.find(row_id).await {
98 Ok(Some(row)) => row,
99 Ok(None) => {
100 debug!(row_id, "event bridge: outbox row already pruned; skipping");
101 return;
102 },
103 Err(e) => {
104 error!(error = %e, row_id, "event bridge: failed to load outbox row");
105 return;
106 },
107 };
108
109 let Some(channel) = OutboxChannel::parse(&row.channel) else {
110 error!(channel = %row.channel, row_id, "event bridge: unknown outbox channel");
111 return;
112 };
113 Self::fan_in(channel, &row.user_id, row.payload).await;
114 }
115
116 pub(super) async fn fan_in(
117 channel: OutboxChannel,
118 user_id: &UserId,
119 payload: serde_json::Value,
122 ) {
123 match channel {
124 OutboxChannel::AgUi => match serde_json::from_value::<AgUiEvent>(payload) {
125 Ok(event) => {
126 EventRouter::route_agui_local(user_id, event).await;
127 },
128 Err(e) => error!(error = %e, "event bridge: failed to decode AG-UI event"),
129 },
130 OutboxChannel::A2A => match serde_json::from_value::<A2AEvent>(payload) {
131 Ok(event) => {
132 EventRouter::route_a2a_local(user_id, event).await;
133 },
134 Err(e) => error!(error = %e, "event bridge: failed to decode A2A event"),
135 },
136 OutboxChannel::System => match serde_json::from_value::<SystemEvent>(payload) {
137 Ok(event) => {
138 EventRouter::route_system_local(user_id, event).await;
139 },
140 Err(e) => error!(error = %e, "event bridge: failed to decode system event"),
141 },
142 OutboxChannel::Analytics => match serde_json::from_value::<AnalyticsEvent>(payload) {
143 Ok(event) => {
144 EventRouter::route_analytics_local(user_id, event).await;
145 },
146 Err(e) => error!(error = %e, "event bridge: failed to decode analytics event"),
147 },
148 }
149 }
150
151 async fn prune(&self) {
152 let cutoff = chrono::Utc::now()
153 - chrono::Duration::from_std(OUTBOX_RETENTION)
154 .unwrap_or_else(|_| chrono::Duration::seconds(3600));
155 let repo = EventOutboxRepository::new(self.pool.clone());
156 match repo.prune(cutoff).await {
157 Ok(deleted) => {
158 if deleted > 0 {
159 debug!(deleted, "event bridge: pruned expired outbox rows");
160 }
161 },
162 Err(e) => error!(error = %e, "event bridge: outbox prune failed"),
163 }
164 }
165}