Skip to main content

seesaw_postgres/
event_store.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use seesaw_core::es::{ConcurrencyError, EventStore, NewEvent, StoredEvent};
5use sqlx::{FromRow, PgPool};
6use uuid::Uuid;
7
8/// PostgreSQL-backed event store with optimistic concurrency.
9pub struct PostgresEventStore {
10    pool: PgPool,
11}
12
13impl PostgresEventStore {
14    pub fn new(pool: PgPool) -> Self {
15        Self { pool }
16    }
17
18    pub fn pool(&self) -> &PgPool {
19        &self.pool
20    }
21}
22
23#[derive(FromRow)]
24struct EventRow {
25    id: Uuid,
26    position: i64,
27    aggregate_id: Uuid,
28    sequence: i64,
29    event_type: String,
30    event_data: serde_json::Value,
31    metadata: serde_json::Value,
32    schema_version: i32,
33    caused_by: Option<Uuid>,
34    created_at: DateTime<Utc>,
35}
36
37impl From<EventRow> for StoredEvent {
38    fn from(row: EventRow) -> Self {
39        StoredEvent {
40            id: row.id,
41            position: row.position as u64,
42            aggregate_id: row.aggregate_id,
43            sequence: row.sequence as u64,
44            event_type: row.event_type,
45            data: row.event_data,
46            metadata: row.metadata,
47            schema_version: row.schema_version as u32,
48            caused_by: row.caused_by,
49            created_at: row.created_at,
50        }
51    }
52}
53
54#[async_trait]
55impl EventStore for PostgresEventStore {
56    async fn load_events(
57        &self,
58        aggregate_id: Uuid,
59        from_version: u64,
60    ) -> Result<Vec<StoredEvent>> {
61        let rows: Vec<EventRow> = sqlx::query_as(
62            "SELECT id, position, aggregate_id, sequence, event_type, \
63                    event_data, metadata, schema_version, caused_by, created_at \
64             FROM seesaw_event_store \
65             WHERE aggregate_id = $1 AND sequence > $2 \
66             ORDER BY sequence ASC",
67        )
68        .bind(aggregate_id)
69        .bind(from_version as i64)
70        .fetch_all(&self.pool)
71        .await?;
72
73        Ok(rows.into_iter().map(StoredEvent::from).collect())
74    }
75
76    async fn append(
77        &self,
78        aggregate_id: Uuid,
79        aggregate_type: &str,
80        expected_version: u64,
81        events: Vec<NewEvent>,
82    ) -> Result<u64> {
83        if events.is_empty() {
84            return Ok(expected_version);
85        }
86
87        let mut tx = self.pool.begin().await?;
88
89        // Lock the aggregate row and check current version
90        let current_version: i64 = sqlx::query_scalar(
91            "SELECT COALESCE(MAX(sequence), 0) \
92             FROM seesaw_event_store \
93             WHERE aggregate_id = $1 \
94             FOR UPDATE",
95        )
96        .bind(aggregate_id)
97        .fetch_one(&mut *tx)
98        .await?;
99
100        if current_version as u64 != expected_version {
101            return Err(ConcurrencyError {
102                aggregate_id,
103                expected: expected_version,
104                actual: current_version as u64,
105            }
106            .into());
107        }
108
109        let mut seq = expected_version;
110        for event in &events {
111            seq += 1;
112            sqlx::query(
113                "INSERT INTO seesaw_event_store \
114                    (aggregate_id, aggregate_type, sequence, event_type, \
115                     event_data, metadata, schema_version, caused_by) \
116                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
117            )
118            .bind(aggregate_id)
119            .bind(aggregate_type)
120            .bind(seq as i64)
121            .bind(&event.event_type)
122            .bind(&event.data)
123            .bind(event.metadata.as_ref().unwrap_or(&serde_json::json!({})))
124            .bind(event.schema_version as i32)
125            .bind(event.caused_by)
126            .execute(&mut *tx)
127            .await?;
128        }
129
130        tx.commit().await?;
131        Ok(seq)
132    }
133
134    async fn exists(&self, aggregate_id: Uuid) -> Result<bool> {
135        let exists: bool = sqlx::query_scalar(
136            "SELECT EXISTS(SELECT 1 FROM seesaw_event_store WHERE aggregate_id = $1)",
137        )
138        .bind(aggregate_id)
139        .fetch_one(&self.pool)
140        .await?;
141
142        Ok(exists)
143    }
144}