1use std::time::Duration;
2
3use eventcore_types::{
4 CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
5 EventStreamReader, EventStreamSlice, Operation, StreamId, StreamPosition, StreamWriteEntry,
6 StreamWrites,
7};
8use nutype::nutype;
9use serde_json::{Value, json};
10use sqlx::types::Json;
11use sqlx::{Pool, Postgres, Row, postgres::PgPoolOptions, query};
12use thiserror::Error;
13use tracing::{error, info, instrument, warn};
14use uuid::Uuid;
15
16#[derive(Debug, Error)]
17pub enum PostgresEventStoreError {
18 #[error("failed to create postgres connection pool")]
19 ConnectionFailed(#[source] sqlx::Error),
20}
21
22#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, Display, AsRef, Into))]
41pub struct MaxConnections(std::num::NonZeroU32);
42
43#[derive(Debug, Clone)]
45pub struct PostgresConfig {
46 pub max_connections: MaxConnections,
48 pub acquire_timeout: Duration,
50 pub idle_timeout: Duration,
52}
53
54impl Default for PostgresConfig {
55 fn default() -> Self {
56 const DEFAULT_MAX_CONNECTIONS: std::num::NonZeroU32 = match std::num::NonZeroU32::new(10) {
57 Some(v) => v,
58 None => unreachable!(),
59 };
60
61 Self {
62 max_connections: MaxConnections::new(DEFAULT_MAX_CONNECTIONS),
63 acquire_timeout: Duration::from_secs(30),
64 idle_timeout: Duration::from_secs(600), }
66 }
67}
68
69#[derive(Debug, Clone)]
70pub struct PostgresEventStore {
71 pool: Pool<Postgres>,
72}
73
74impl PostgresEventStore {
75 pub async fn new<S: Into<String>>(
77 connection_string: S,
78 ) -> Result<Self, PostgresEventStoreError> {
79 Self::with_config(connection_string, PostgresConfig::default()).await
80 }
81
82 pub async fn with_config<S: Into<String>>(
84 connection_string: S,
85 config: PostgresConfig,
86 ) -> Result<Self, PostgresEventStoreError> {
87 let connection_string = connection_string.into();
88 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
89 let pool = PgPoolOptions::new()
90 .max_connections(max_connections.get())
91 .acquire_timeout(config.acquire_timeout)
92 .idle_timeout(config.idle_timeout)
93 .connect(&connection_string)
94 .await
95 .map_err(PostgresEventStoreError::ConnectionFailed)?;
96 Ok(Self { pool })
97 }
98
99 pub fn from_pool(pool: Pool<Postgres>) -> Self {
104 Self { pool }
105 }
106
107 #[cfg_attr(test, mutants::skip)] pub async fn ping(&self) {
109 query("SELECT 1")
110 .execute(&self.pool)
111 .await
112 .expect("postgres ping failed");
113 }
114
115 #[cfg_attr(test, mutants::skip)] pub async fn migrate(&self) {
117 sqlx::migrate!("./migrations")
118 .run(&self.pool)
119 .await
120 .expect("postgres migration failed");
121 }
122}
123
124impl EventStore for PostgresEventStore {
125 #[instrument(name = "postgres.read_stream", skip(self))]
126 async fn read_stream<E: Event>(
127 &self,
128 stream_id: StreamId,
129 ) -> Result<EventStreamReader<E>, EventStoreError> {
130 info!(
131 stream = %stream_id,
132 "[postgres.read_stream] reading events from postgres"
133 );
134
135 let rows = query(
136 "SELECT event_data FROM eventcore_events WHERE stream_id = $1 ORDER BY stream_version ASC",
137 )
138 .bind(stream_id.as_ref())
139 .fetch_all(&self.pool)
140 .await
141 .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
142
143 let mut events = Vec::with_capacity(rows.len());
144 for row in rows {
145 let payload: Value = row
146 .try_get("event_data")
147 .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
148 let event = serde_json::from_value(payload).map_err(|error| {
149 EventStoreError::DeserializationFailed {
150 stream_id: stream_id.clone(),
151 detail: error.to_string(),
152 }
153 })?;
154 events.push(event);
155 }
156
157 Ok(EventStreamReader::new(events))
158 }
159
160 #[instrument(name = "postgres.append_events", skip(self, writes))]
161 async fn append_events(
162 &self,
163 writes: StreamWrites,
164 ) -> Result<EventStreamSlice, EventStoreError> {
165 let expected_versions = writes.expected_versions().clone();
166 let entries = writes.into_entries();
167
168 if entries.is_empty() {
169 return Ok(EventStreamSlice);
170 }
171
172 info!(
173 stream_count = expected_versions.len(),
174 event_count = entries.len(),
175 "[postgres.append_events] appending events to postgres"
176 );
177
178 let expected_versions_json: Value = expected_versions
180 .iter()
181 .map(|(stream_id, version)| {
182 (stream_id.as_ref().to_string(), json!(version.into_inner()))
183 })
184 .collect();
185
186 let mut tx = self
187 .pool
188 .begin()
189 .await
190 .map_err(|error| map_sqlx_error(error, Operation::BeginTransaction))?;
191
192 query("SELECT set_config('eventcore.expected_versions', $1, true)")
194 .bind(expected_versions_json.to_string())
195 .execute(&mut *tx)
196 .await
197 .map_err(|error| map_sqlx_error(error, Operation::SetExpectedVersions))?;
198
199 for entry in entries {
201 let StreamWriteEntry {
202 stream_id,
203 event_type,
204 event_data,
205 ..
206 } = entry;
207
208 let event_id = Uuid::now_v7();
209 query(
210 "INSERT INTO eventcore_events (event_id, stream_id, event_type, event_data, metadata)
211 VALUES ($1, $2, $3, $4, $5)",
212 )
213 .bind(event_id)
214 .bind(stream_id.as_ref())
215 .bind(event_type)
216 .bind(Json(event_data))
217 .bind(Json(json!({})))
218 .execute(&mut *tx)
219 .await
220 .map_err(|error| map_sqlx_error(error, Operation::AppendEvents))?;
221 }
222
223 tx.commit()
224 .await
225 .map_err(|error| map_sqlx_error(error, Operation::CommitTransaction))?;
226
227 Ok(EventStreamSlice)
228 }
229}
230
231impl EventReader for PostgresEventStore {
232 type Error = EventStoreError;
233
234 async fn read_events<E: Event>(
235 &self,
236 filter: EventFilter,
237 page: EventPage,
238 ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
239 let after_event_id: Option<Uuid> = page.after_position().map(|p| p.into_inner());
242 let limit: i64 = page.limit().into_inner() as i64;
243
244 let rows = if let Some(prefix) = filter.stream_prefix() {
245 let prefix_str = prefix.as_ref();
246
247 if let Some(after_id) = after_event_id {
248 let query_str = r#"
249 SELECT event_id, event_data, stream_id
250 FROM eventcore_events
251 WHERE event_id > $1
252 AND stream_id LIKE $2 || '%'
253 ORDER BY event_id
254 LIMIT $3
255 "#;
256 query(query_str)
257 .bind(after_id)
258 .bind(prefix_str)
259 .bind(limit)
260 .fetch_all(&self.pool)
261 .await
262 } else {
263 let query_str = r#"
264 SELECT event_id, event_data, stream_id
265 FROM eventcore_events
266 WHERE stream_id LIKE $1 || '%'
267 ORDER BY event_id
268 LIMIT $2
269 "#;
270 query(query_str)
271 .bind(prefix_str)
272 .bind(limit)
273 .fetch_all(&self.pool)
274 .await
275 }
276 } else if let Some(after_id) = after_event_id {
277 let query_str = r#"
278 SELECT event_id, event_data, stream_id
279 FROM eventcore_events
280 WHERE event_id > $1
281 ORDER BY event_id
282 LIMIT $2
283 "#;
284 query(query_str)
285 .bind(after_id)
286 .bind(limit)
287 .fetch_all(&self.pool)
288 .await
289 } else {
290 let query_str = r#"
291 SELECT event_id, event_data, stream_id
292 FROM eventcore_events
293 ORDER BY event_id
294 LIMIT $1
295 "#;
296 query(query_str).bind(limit).fetch_all(&self.pool).await
297 }
298 .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
299
300 let events: Vec<(E, StreamPosition)> = rows
301 .into_iter()
302 .filter_map(|row| {
303 let event_data: Json<Value> = row.get("event_data");
304 let event_id: Uuid = row.get("event_id");
305 serde_json::from_value::<E>(event_data.0)
306 .ok()
307 .map(|e| (e, StreamPosition::new(event_id)))
308 })
309 .collect();
310
311 Ok(events)
312 }
313}
314
315fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError {
316 if let sqlx::Error::Database(db_error) = &error {
317 let code = db_error.code();
318 let code_str = code.as_deref();
319 if code_str == Some("P0001") || code_str == Some("23505") {
322 warn!(
323 error = %db_error,
324 "[postgres.version_conflict] optimistic concurrency check failed"
325 );
326 return EventStoreError::VersionConflict;
327 }
328 }
329
330 error!(
331 error = %error,
332 operation = %operation,
333 "[postgres.database_error] database operation failed"
334 );
335 EventStoreError::StoreFailure { operation }
336}
337
338#[derive(Debug, Error)]
340pub enum PostgresCheckpointError {
341 #[error("failed to create postgres connection pool")]
343 ConnectionFailed(#[source] sqlx::Error),
344
345 #[error("database operation failed: {0}")]
347 DatabaseError(#[source] sqlx::Error),
348}
349
350#[derive(Debug, Clone)]
363pub struct PostgresCheckpointStore {
364 pool: Pool<Postgres>,
365}
366
367impl PostgresCheckpointStore {
368 pub async fn new<S: Into<String>>(
370 connection_string: S,
371 ) -> Result<Self, PostgresCheckpointError> {
372 Self::with_config(connection_string, PostgresConfig::default()).await
373 }
374
375 pub async fn with_config<S: Into<String>>(
377 connection_string: S,
378 config: PostgresConfig,
379 ) -> Result<Self, PostgresCheckpointError> {
380 let connection_string = connection_string.into();
381 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
382 let pool = PgPoolOptions::new()
383 .max_connections(max_connections.get())
384 .acquire_timeout(config.acquire_timeout)
385 .idle_timeout(config.idle_timeout)
386 .connect(&connection_string)
387 .await
388 .map_err(PostgresCheckpointError::ConnectionFailed)?;
389
390 sqlx::migrate!("./migrations")
392 .run(&pool)
393 .await
394 .map_err(|e| {
395 PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e)))
396 })?;
397
398 Ok(Self { pool })
399 }
400
401 pub fn from_pool(pool: Pool<Postgres>) -> Self {
406 Self { pool }
407 }
408}
409
410impl CheckpointStore for PostgresCheckpointStore {
411 type Error = PostgresCheckpointError;
412
413 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
414 let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
415 .bind(name)
416 .fetch_optional(&self.pool)
417 .await
418 .map_err(PostgresCheckpointError::DatabaseError)?;
419
420 match row {
421 Some(row) => {
422 let position: Uuid = row.get("last_position");
423 Ok(Some(StreamPosition::new(position)))
424 }
425 None => Ok(None),
426 }
427 }
428
429 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
430 let position_uuid: Uuid = position.into_inner();
431 query(
432 "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
433 VALUES ($1, $2, NOW())
434 ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
435 )
436 .bind(name)
437 .bind(position_uuid)
438 .execute(&self.pool)
439 .await
440 .map_err(PostgresCheckpointError::DatabaseError)?;
441
442 Ok(())
443 }
444}