use sqlx::PgPool;
use systemprompt_identifiers::{Actor, EventOutboxId, UserId};
use super::routing::{OUTBOX_CHANNEL, OutboxChannel};
pub(super) struct OutboxRow {
pub channel: String,
pub user_id: UserId,
pub payload: serde_json::Value,
}
#[derive(Debug, Clone)]
pub(super) struct EventOutboxRepository {
pool: PgPool,
}
impl EventOutboxRepository {
pub(super) const fn new(pool: PgPool) -> Self {
Self { pool }
}
pub(super) async fn insert(
&self,
id: &EventOutboxId,
channel: OutboxChannel,
actor: &Actor,
payload: &serde_json::Value,
) -> Result<(), sqlx::Error> {
let (actor_kind, actor_id) = actor.audit_columns();
sqlx::query!(
"INSERT INTO event_outbox (id, channel, user_id, payload, actor_kind, actor_id) \
VALUES ($1, $2, $3, $4, $5, $6)",
id.as_str(),
channel.as_str(),
actor.user_id.as_str(),
payload,
actor_kind,
actor_id,
)
.execute(&self.pool)
.await
.map(|_| ())
}
pub(super) async fn notify(&self, id: &EventOutboxId) -> Result<(), sqlx::Error> {
sqlx::query!("SELECT pg_notify($1, $2)", OUTBOX_CHANNEL, id.as_str())
.execute(&self.pool)
.await
.map(|_| ())
}
pub(super) async fn find(&self, id: &str) -> Result<Option<OutboxRow>, sqlx::Error> {
sqlx::query_as!(
OutboxRow,
r#"SELECT channel, user_id as "user_id!: UserId", payload FROM event_outbox WHERE id = $1"#,
id,
)
.fetch_optional(&self.pool)
.await
}
pub(super) async fn prune(
&self,
cutoff: chrono::DateTime<chrono::Utc>,
) -> Result<u64, sqlx::Error> {
sqlx::query!("DELETE FROM event_outbox WHERE created_at < $1", cutoff)
.execute(&self.pool)
.await
.map(|result| result.rows_affected())
}
}