seesaw_postgres/
event_store.rs1use anyhow::Result;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use seesaw_core::es::{ConcurrencyError, EventStore, NewEvent, StoredEvent};
5use sqlx::{FromRow, PgPool};
6use uuid::Uuid;
7
8pub struct PostgresEventStore {
10 pool: PgPool,
11}
12
13impl PostgresEventStore {
14 pub fn new(pool: PgPool) -> Self {
15 Self { pool }
16 }
17
18 pub fn pool(&self) -> &PgPool {
19 &self.pool
20 }
21}
22
23#[derive(FromRow)]
24struct EventRow {
25 id: Uuid,
26 position: i64,
27 aggregate_id: Uuid,
28 sequence: i64,
29 event_type: String,
30 event_data: serde_json::Value,
31 metadata: serde_json::Value,
32 schema_version: i32,
33 caused_by: Option<Uuid>,
34 created_at: DateTime<Utc>,
35}
36
37impl From<EventRow> for StoredEvent {
38 fn from(row: EventRow) -> Self {
39 StoredEvent {
40 id: row.id,
41 position: row.position as u64,
42 aggregate_id: row.aggregate_id,
43 sequence: row.sequence as u64,
44 event_type: row.event_type,
45 data: row.event_data,
46 metadata: row.metadata,
47 schema_version: row.schema_version as u32,
48 caused_by: row.caused_by,
49 created_at: row.created_at,
50 }
51 }
52}
53
54#[async_trait]
55impl EventStore for PostgresEventStore {
56 async fn load_events(
57 &self,
58 aggregate_id: Uuid,
59 from_version: u64,
60 ) -> Result<Vec<StoredEvent>> {
61 let rows: Vec<EventRow> = sqlx::query_as(
62 "SELECT id, position, aggregate_id, sequence, event_type, \
63 event_data, metadata, schema_version, caused_by, created_at \
64 FROM seesaw_event_store \
65 WHERE aggregate_id = $1 AND sequence > $2 \
66 ORDER BY sequence ASC",
67 )
68 .bind(aggregate_id)
69 .bind(from_version as i64)
70 .fetch_all(&self.pool)
71 .await?;
72
73 Ok(rows.into_iter().map(StoredEvent::from).collect())
74 }
75
76 async fn append(
77 &self,
78 aggregate_id: Uuid,
79 aggregate_type: &str,
80 expected_version: u64,
81 events: Vec<NewEvent>,
82 ) -> Result<u64> {
83 if events.is_empty() {
84 return Ok(expected_version);
85 }
86
87 let mut tx = self.pool.begin().await?;
88
89 let current_version: i64 = sqlx::query_scalar(
91 "SELECT COALESCE(MAX(sequence), 0) \
92 FROM seesaw_event_store \
93 WHERE aggregate_id = $1 \
94 FOR UPDATE",
95 )
96 .bind(aggregate_id)
97 .fetch_one(&mut *tx)
98 .await?;
99
100 if current_version as u64 != expected_version {
101 return Err(ConcurrencyError {
102 aggregate_id,
103 expected: expected_version,
104 actual: current_version as u64,
105 }
106 .into());
107 }
108
109 let mut seq = expected_version;
110 for event in &events {
111 seq += 1;
112 sqlx::query(
113 "INSERT INTO seesaw_event_store \
114 (aggregate_id, aggregate_type, sequence, event_type, \
115 event_data, metadata, schema_version, caused_by) \
116 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
117 )
118 .bind(aggregate_id)
119 .bind(aggregate_type)
120 .bind(seq as i64)
121 .bind(&event.event_type)
122 .bind(&event.data)
123 .bind(event.metadata.as_ref().unwrap_or(&serde_json::json!({})))
124 .bind(event.schema_version as i32)
125 .bind(event.caused_by)
126 .execute(&mut *tx)
127 .await?;
128 }
129
130 tx.commit().await?;
131 Ok(seq)
132 }
133
134 async fn exists(&self, aggregate_id: Uuid) -> Result<bool> {
135 let exists: bool = sqlx::query_scalar(
136 "SELECT EXISTS(SELECT 1 FROM seesaw_event_store WHERE aggregate_id = $1)",
137 )
138 .bind(aggregate_id)
139 .fetch_one(&self.pool)
140 .await?;
141
142 Ok(exists)
143 }
144}