use std::time::Duration;
use sqlx::PgPool;
use sqlx::postgres::PgListener;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
use super::repository::EventOutboxRepository;
use super::routing::{EventRouter, OUTBOX_CHANNEL, OutboxChannel};
use systemprompt_identifiers::UserId;
use systemprompt_models::{A2AEvent, AgUiEvent, AnalyticsEvent, SystemEvent};
const OUTBOX_RETENTION: Duration = Duration::from_secs(3600);
const PRUNE_INTERVAL: Duration = Duration::from_secs(300);
#[derive(Debug, Clone)]
pub struct PostgresEventBridge {
pool: PgPool,
}
impl PostgresEventBridge {
#[must_use]
pub const fn new(pool: PgPool) -> Self {
Self { pool }
}
pub fn start(self) -> JoinHandle<()> {
EventRouter::install_relay(self.pool.clone());
tokio::spawn(async move {
self.run().await;
})
}
async fn run(self) {
let mut prune_tick = tokio::time::interval(PRUNE_INTERVAL);
prune_tick.tick().await;
loop {
let mut listener = match PgListener::connect_with(&self.pool).await {
Ok(listener) => listener,
Err(e) => {
error!(error = %e, "event bridge: failed to open Postgres listener; retrying");
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
},
};
if let Err(e) = listener.listen(OUTBOX_CHANNEL).await {
error!(error = %e, channel = OUTBOX_CHANNEL, "event bridge: LISTEN failed; retrying");
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
info!(
channel = OUTBOX_CHANNEL,
"event bridge: listening for cross-replica events"
);
loop {
tokio::select! {
notification = listener.recv() => match notification {
Ok(notification) => {
self.deliver(notification.payload()).await;
},
Err(e) => {
warn!(error = %e, "event bridge: listener connection lost; reconnecting");
break;
},
},
_ = prune_tick.tick() => {
self.prune().await;
},
}
}
}
}
async fn deliver(&self, row_id: &str) {
let repo = EventOutboxRepository::new(self.pool.clone());
let row = match repo.find(row_id).await {
Ok(Some(row)) => row,
Ok(None) => {
debug!(row_id, "event bridge: outbox row already pruned; skipping");
return;
},
Err(e) => {
error!(error = %e, row_id, "event bridge: failed to load outbox row");
return;
},
};
let Some(channel) = OutboxChannel::parse(&row.channel) else {
error!(channel = %row.channel, row_id, "event bridge: unknown outbox channel");
return;
};
Self::fan_in(channel, &row.user_id, row.payload).await;
}
pub(super) async fn fan_in(
channel: OutboxChannel,
user_id: &UserId,
payload: serde_json::Value,
) {
match channel {
OutboxChannel::AgUi => match serde_json::from_value::<AgUiEvent>(payload) {
Ok(event) => {
EventRouter::route_agui_local(user_id, event).await;
},
Err(e) => error!(error = %e, "event bridge: failed to decode AG-UI event"),
},
OutboxChannel::A2A => match serde_json::from_value::<A2AEvent>(payload) {
Ok(event) => {
EventRouter::route_a2a_local(user_id, event).await;
},
Err(e) => error!(error = %e, "event bridge: failed to decode A2A event"),
},
OutboxChannel::System => match serde_json::from_value::<SystemEvent>(payload) {
Ok(event) => {
EventRouter::route_system_local(user_id, event).await;
},
Err(e) => error!(error = %e, "event bridge: failed to decode system event"),
},
OutboxChannel::Analytics => match serde_json::from_value::<AnalyticsEvent>(payload) {
Ok(event) => {
EventRouter::route_analytics_local(user_id, event).await;
},
Err(e) => error!(error = %e, "event bridge: failed to decode analytics event"),
},
}
}
async fn prune(&self) {
let cutoff = chrono::Utc::now()
- chrono::Duration::from_std(OUTBOX_RETENTION)
.unwrap_or_else(|_| chrono::Duration::seconds(3600));
let repo = EventOutboxRepository::new(self.pool.clone());
match repo.prune(cutoff).await {
Ok(deleted) => {
if deleted > 0 {
debug!(deleted, "event bridge: pruned expired outbox rows");
}
},
Err(e) => error!(error = %e, "event bridge: outbox prune failed"),
}
}
}