1use std::time::Duration;
2
3use eventcore_types::{
4 CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
5 EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
6 StreamVersion, StreamWriteEntry, StreamWrites,
7};
8use nutype::nutype;
9use serde_json::{Value, json};
10use sqlx::types::Json;
11use sqlx::{Pool, Postgres, Row, postgres::PgPoolOptions, query};
12use thiserror::Error;
13use tracing::{error, info, instrument, warn};
14use uuid::Uuid;
15
16#[derive(Debug, Error)]
17pub enum PostgresEventStoreError {
18 #[error("failed to create postgres connection pool")]
19 ConnectionFailed(#[source] sqlx::Error),
20}
21
22#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, Display, AsRef, Into))]
41pub struct MaxConnections(std::num::NonZeroU32);
42
43#[derive(Debug, Clone)]
45pub struct PostgresConfig {
46 pub max_connections: MaxConnections,
48 pub acquire_timeout: Duration,
50 pub idle_timeout: Duration,
52}
53
54impl Default for PostgresConfig {
55 fn default() -> Self {
56 const DEFAULT_MAX_CONNECTIONS: std::num::NonZeroU32 = match std::num::NonZeroU32::new(10) {
57 Some(v) => v,
58 None => unreachable!(),
59 };
60
61 Self {
62 max_connections: MaxConnections::new(DEFAULT_MAX_CONNECTIONS),
63 acquire_timeout: Duration::from_secs(30),
64 idle_timeout: Duration::from_secs(600), }
66 }
67}
68
69#[derive(Debug, Clone)]
70pub struct PostgresEventStore {
71 pool: Pool<Postgres>,
72}
73
74impl PostgresEventStore {
75 pub async fn new<S: Into<String>>(
77 connection_string: S,
78 ) -> Result<Self, PostgresEventStoreError> {
79 Self::with_config(connection_string, PostgresConfig::default()).await
80 }
81
82 pub async fn with_config<S: Into<String>>(
84 connection_string: S,
85 config: PostgresConfig,
86 ) -> Result<Self, PostgresEventStoreError> {
87 let connection_string = connection_string.into();
88 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
89 let pool = PgPoolOptions::new()
90 .max_connections(max_connections.get())
91 .acquire_timeout(config.acquire_timeout)
92 .idle_timeout(config.idle_timeout)
93 .connect(&connection_string)
94 .await
95 .map_err(PostgresEventStoreError::ConnectionFailed)?;
96 Ok(Self { pool })
97 }
98
99 pub fn from_pool(pool: Pool<Postgres>) -> Self {
104 Self { pool }
105 }
106
107 #[cfg_attr(test, mutants::skip)] pub async fn ping(&self) {
109 let _ = query("SELECT 1")
110 .execute(&self.pool)
111 .await
112 .expect("postgres ping failed");
113 }
114
115 #[cfg_attr(test, mutants::skip)] pub async fn migrate(&self) {
117 sqlx::migrate!("./migrations")
118 .run(&self.pool)
119 .await
120 .expect("postgres migration failed");
121 }
122}
123
124impl EventStore for PostgresEventStore {
125 #[instrument(name = "postgres.read_stream", skip(self))]
126 async fn read_stream<E: Event>(
127 &self,
128 stream_id: StreamId,
129 ) -> Result<EventStreamReader<E>, EventStoreError> {
130 info!(
131 stream = %stream_id,
132 "[postgres.read_stream] reading events from postgres"
133 );
134
135 let rows = query(
136 "SELECT event_data FROM eventcore_events WHERE stream_id = $1 ORDER BY stream_version ASC",
137 )
138 .bind(stream_id.as_ref())
139 .fetch_all(&self.pool)
140 .await
141 .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
142
143 let mut events = Vec::with_capacity(rows.len());
144 for row in rows {
145 let payload: Value = row
146 .try_get("event_data")
147 .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
148 let event = serde_json::from_value(payload).map_err(|error| {
149 EventStoreError::DeserializationFailed {
150 stream_id: stream_id.clone(),
151 detail: error.to_string(),
152 }
153 })?;
154 events.push(event);
155 }
156
157 Ok(EventStreamReader::new(events))
158 }
159
160 #[instrument(name = "postgres.append_events", skip(self, writes))]
161 async fn append_events(
162 &self,
163 writes: StreamWrites,
164 ) -> Result<EventStreamSlice, EventStoreError> {
165 let expected_versions = writes.expected_versions().clone();
166 let entries = writes.into_entries();
167
168 if entries.is_empty() {
169 return Ok(EventStreamSlice);
170 }
171
172 info!(
173 stream_count = expected_versions.len(),
174 event_count = entries.len(),
175 "[postgres.append_events] appending events to postgres"
176 );
177
178 let expected_versions_json: Value = expected_versions
180 .iter()
181 .map(|(stream_id, version)| {
182 (stream_id.as_ref().to_string(), json!(version.into_inner()))
183 })
184 .collect();
185
186 let mut tx = self
187 .pool
188 .begin()
189 .await
190 .map_err(|error| map_sqlx_error(error, Operation::BeginTransaction))?;
191
192 let _ = query("SELECT set_config('eventcore.expected_versions', $1, true)")
194 .bind(expected_versions_json.to_string())
195 .execute(&mut *tx)
196 .await
197 .map_err(|error| map_sqlx_error(error, Operation::SetExpectedVersions))?;
198
199 for entry in entries {
201 let StreamWriteEntry {
202 stream_id,
203 event_type,
204 event_data,
205 ..
206 } = entry;
207
208 let event_id = Uuid::now_v7();
209 let _ = query(
210 "INSERT INTO eventcore_events (event_id, stream_id, event_type, event_data, metadata)
211 VALUES ($1, $2, $3, $4, $5)",
212 )
213 .bind(event_id)
214 .bind(stream_id.as_ref())
215 .bind(event_type)
216 .bind(Json(event_data))
217 .bind(Json(json!({})))
218 .execute(&mut *tx)
219 .await
220 .map_err(|error| map_sqlx_error(error, Operation::AppendEvents))?;
221 }
222
223 tx.commit()
224 .await
225 .map_err(|error| map_sqlx_error(error, Operation::CommitTransaction))?;
226
227 Ok(EventStreamSlice)
228 }
229}
230
231impl CheckpointStore for PostgresEventStore {
232 type Error = PostgresCheckpointError;
233
234 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
235 let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
236 .bind(name)
237 .fetch_optional(&self.pool)
238 .await
239 .map_err(PostgresCheckpointError::DatabaseError)?;
240
241 match row {
242 Some(row) => {
243 let position: Uuid = row.get("last_position");
244 Ok(Some(StreamPosition::new(position)))
245 }
246 None => Ok(None),
247 }
248 }
249
250 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
251 let position_uuid: Uuid = position.into_inner();
252 let _ = query(
253 "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
254 VALUES ($1, $2, NOW())
255 ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
256 )
257 .bind(name)
258 .bind(position_uuid)
259 .execute(&self.pool)
260 .await
261 .map_err(PostgresCheckpointError::DatabaseError)?;
262
263 Ok(())
264 }
265}
266
267impl EventReader for PostgresEventStore {
268 type Error = EventStoreError;
269
270 async fn read_events<E: Event>(
271 &self,
272 filter: EventFilter,
273 page: EventPage,
274 ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
275 let after_event_id: Option<Uuid> = page.after_position().map(|p| p.into_inner());
278 let limit: i64 = page.limit().into_inner() as i64;
279
280 let type_filter = filter.event_type().unwrap_or_else(|| E::event_type_name());
284
285 let rows = if let Some(prefix) = filter.stream_prefix() {
286 let prefix_str = prefix.as_ref();
287
288 if let Some(after_id) = after_event_id {
289 let query_str = r#"
290 SELECT event_id, event_data, stream_id
291 FROM eventcore_events
292 WHERE event_type = $1
293 AND event_id > $2
294 AND stream_id LIKE $3 || '%'
295 ORDER BY event_id
296 LIMIT $4
297 "#;
298 query(query_str)
299 .bind(type_filter)
300 .bind(after_id)
301 .bind(prefix_str)
302 .bind(limit)
303 .fetch_all(&self.pool)
304 .await
305 } else {
306 let query_str = r#"
307 SELECT event_id, event_data, stream_id
308 FROM eventcore_events
309 WHERE event_type = $1
310 AND stream_id LIKE $2 || '%'
311 ORDER BY event_id
312 LIMIT $3
313 "#;
314 query(query_str)
315 .bind(type_filter)
316 .bind(prefix_str)
317 .bind(limit)
318 .fetch_all(&self.pool)
319 .await
320 }
321 } else if let Some(after_id) = after_event_id {
322 let query_str = r#"
323 SELECT event_id, event_data, stream_id
324 FROM eventcore_events
325 WHERE event_type = $1
326 AND event_id > $2
327 ORDER BY event_id
328 LIMIT $3
329 "#;
330 query(query_str)
331 .bind(type_filter)
332 .bind(after_id)
333 .bind(limit)
334 .fetch_all(&self.pool)
335 .await
336 } else {
337 let query_str = r#"
338 SELECT event_id, event_data, stream_id
339 FROM eventcore_events
340 WHERE event_type = $1
341 ORDER BY event_id
342 LIMIT $2
343 "#;
344 query(query_str)
345 .bind(type_filter)
346 .bind(limit)
347 .fetch_all(&self.pool)
348 .await
349 }
350 .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
351
352 let events: Vec<(E, StreamPosition)> = rows
353 .into_iter()
354 .filter_map(|row| {
355 let event_data: Json<Value> = row.get("event_data");
356 let event_id: Uuid = row.get("event_id");
357 serde_json::from_value::<E>(event_data.0)
358 .ok()
359 .map(|e| (e, StreamPosition::new(event_id)))
360 })
361 .collect();
362
363 Ok(events)
364 }
365}
366
367fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError {
368 if let sqlx::Error::Database(db_error) = &error {
369 let code = db_error.code();
370 let code_str = code.as_deref();
371 if code_str == Some("P0001") || code_str == Some("23505") {
374 warn!(
375 error = %db_error,
376 "[postgres.version_conflict] optimistic concurrency check failed"
377 );
378 return parse_version_conflict_from_db_error(db_error.message());
379 }
380 }
381
382 error!(
383 error = %error,
384 operation = %operation,
385 "[postgres.database_error] database operation failed"
386 );
387 EventStoreError::StoreFailure { operation }
388}
389
390fn parse_version_conflict_from_db_error(message: &str) -> EventStoreError {
398 if let Some(parsed) = try_parse_conflict_message(message) {
400 return parsed;
401 }
402
403 let fallback_stream_id =
406 StreamId::try_new("unknown-conflict-stream").expect("static stream id is valid");
407 EventStoreError::VersionConflict {
408 stream_id: fallback_stream_id,
409 expected: StreamVersion::new(0),
410 actual: StreamVersion::new(0),
411 }
412}
413
414fn try_parse_conflict_message(message: &str) -> Option<EventStoreError> {
415 let rest = message.strip_prefix("version_conflict: stream \"")?;
416 let stream_end = rest.find('"')?;
417 let stream_id_str = &rest[..stream_end];
418 let after_stream = &rest[stream_end..];
419
420 let expected_str = after_stream
421 .strip_prefix("\" expected version ")?
422 .split(',')
423 .next()?;
424 let actual_str = after_stream.rsplit("actual ").next()?;
425
426 let expected = expected_str.trim().parse::<usize>().ok()?;
427 let actual = actual_str.trim().parse::<usize>().ok()?;
428 let stream_id = StreamId::try_new(stream_id_str).ok()?;
429
430 Some(EventStoreError::VersionConflict {
431 stream_id,
432 expected: StreamVersion::new(expected),
433 actual: StreamVersion::new(actual),
434 })
435}
436
437#[derive(Debug, Error)]
439pub enum PostgresCheckpointError {
440 #[error("failed to create postgres connection pool")]
442 ConnectionFailed(#[source] sqlx::Error),
443
444 #[error("database operation failed: {0}")]
446 DatabaseError(#[source] sqlx::Error),
447}
448
449#[derive(Debug, Clone)]
462pub struct PostgresCheckpointStore {
463 pool: Pool<Postgres>,
464}
465
466impl PostgresCheckpointStore {
467 pub async fn new<S: Into<String>>(
469 connection_string: S,
470 ) -> Result<Self, PostgresCheckpointError> {
471 Self::with_config(connection_string, PostgresConfig::default()).await
472 }
473
474 pub async fn with_config<S: Into<String>>(
476 connection_string: S,
477 config: PostgresConfig,
478 ) -> Result<Self, PostgresCheckpointError> {
479 let connection_string = connection_string.into();
480 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
481 let pool = PgPoolOptions::new()
482 .max_connections(max_connections.get())
483 .acquire_timeout(config.acquire_timeout)
484 .idle_timeout(config.idle_timeout)
485 .connect(&connection_string)
486 .await
487 .map_err(PostgresCheckpointError::ConnectionFailed)?;
488
489 sqlx::migrate!("./migrations")
491 .run(&pool)
492 .await
493 .map_err(|e| {
494 PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e)))
495 })?;
496
497 Ok(Self { pool })
498 }
499
500 pub fn from_pool(pool: Pool<Postgres>) -> Self {
505 Self { pool }
506 }
507}
508
509impl CheckpointStore for PostgresCheckpointStore {
510 type Error = PostgresCheckpointError;
511
512 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
513 let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
514 .bind(name)
515 .fetch_optional(&self.pool)
516 .await
517 .map_err(PostgresCheckpointError::DatabaseError)?;
518
519 match row {
520 Some(row) => {
521 let position: Uuid = row.get("last_position");
522 Ok(Some(StreamPosition::new(position)))
523 }
524 None => Ok(None),
525 }
526 }
527
528 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
529 let position_uuid: Uuid = position.into_inner();
530 let _ = query(
531 "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
532 VALUES ($1, $2, NOW())
533 ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
534 )
535 .bind(name)
536 .bind(position_uuid)
537 .execute(&self.pool)
538 .await
539 .map_err(PostgresCheckpointError::DatabaseError)?;
540
541 Ok(())
542 }
543}
544
545#[derive(Debug, Error)]
551pub enum CoordinationError {
552 #[error(
554 "leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
555 )]
556 LeadershipNotAcquired { subscription_name: String },
557
558 #[error("database operation failed: {0}")]
560 DatabaseError(#[source] sqlx::Error),
561}
562
563pub struct CoordinationGuard {
593 lock_key: i64,
594 connection: Option<sqlx::pool::PoolConnection<Postgres>>,
597}
598
599impl std::fmt::Debug for CoordinationGuard {
600 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601 f.debug_struct("CoordinationGuard")
602 .field("lock_key", &self.lock_key)
603 .finish_non_exhaustive()
604 }
605}
606
607impl Drop for CoordinationGuard {
608 fn drop(&mut self) {
609 if let Some(mut connection) = self.connection.take() {
611 let lock_key = self.lock_key;
612
613 let handle = tokio::runtime::Handle::current();
616 let is_multi_thread =
617 handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread;
618
619 if is_multi_thread {
620 tokio::task::block_in_place(|| {
622 handle.block_on(async {
623 if let Err(e) = query("SELECT pg_advisory_unlock($1)")
625 .bind(lock_key)
626 .execute(&mut *connection)
627 .await
628 {
629 warn!(
630 lock_key = lock_key,
631 error = %e,
632 "failed to release advisory lock on drop"
633 );
634 }
635 });
637 });
638 } else {
639 drop(tokio::spawn(async move {
644 if let Err(e) = query("SELECT pg_advisory_unlock($1)")
645 .bind(lock_key)
646 .execute(&mut *connection)
647 .await
648 {
649 warn!(
650 lock_key = lock_key,
651 error = %e,
652 "failed to release advisory lock on drop (async)"
653 );
654 }
655 }));
656 }
657 }
658 }
659}
660
661#[derive(Debug, Clone)]
667pub struct PostgresProjectorCoordinator {
668 pool: Pool<Postgres>,
669}
670
671impl PostgresProjectorCoordinator {
672 pub async fn new<S: Into<String>>(connection_string: S) -> Result<Self, CoordinationError> {
674 Self::with_config(connection_string, PostgresConfig::default()).await
675 }
676
677 pub async fn with_config<S: Into<String>>(
679 connection_string: S,
680 config: PostgresConfig,
681 ) -> Result<Self, CoordinationError> {
682 let connection_string = connection_string.into();
683 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
684 let pool = PgPoolOptions::new()
685 .max_connections(max_connections.get())
686 .acquire_timeout(config.acquire_timeout)
687 .idle_timeout(config.idle_timeout)
688 .connect(&connection_string)
689 .await
690 .map_err(CoordinationError::DatabaseError)?;
691
692 Ok(Self { pool })
693 }
694
695 pub fn from_pool(pool: Pool<Postgres>) -> Self {
697 Self { pool }
698 }
699}
700
701fn advisory_lock_key(subscription_name: &str) -> i64 {
706 const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325;
707 const FNV_PRIME: u64 = 0x00000100000001B3;
708
709 let mut hash = FNV_OFFSET_BASIS;
710 for byte in subscription_name.as_bytes() {
711 hash ^= *byte as u64;
712 hash = hash.wrapping_mul(FNV_PRIME);
713 }
714 hash as i64
715}
716
717async fn try_acquire_advisory_lock(
720 pool: &Pool<Postgres>,
721 subscription_name: &str,
722) -> Result<CoordinationGuard, CoordinationError> {
723 let lock_key = advisory_lock_key(subscription_name);
724
725 let mut connection = pool
729 .acquire()
730 .await
731 .map_err(CoordinationError::DatabaseError)?;
732
733 let row = query("SELECT pg_try_advisory_lock($1)")
735 .bind(lock_key)
736 .fetch_one(&mut *connection)
737 .await
738 .map_err(CoordinationError::DatabaseError)?;
739
740 let acquired: bool = row.get(0);
741
742 if acquired {
743 Ok(CoordinationGuard {
744 lock_key,
745 connection: Some(connection),
746 })
747 } else {
748 Err(CoordinationError::LeadershipNotAcquired {
750 subscription_name: subscription_name.to_string(),
751 })
752 }
753}
754
755impl ProjectorCoordinator for PostgresProjectorCoordinator {
756 type Error = CoordinationError;
757 type Guard = CoordinationGuard;
758
759 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
760 try_acquire_advisory_lock(&self.pool, subscription_name).await
761 }
762}
763
764impl ProjectorCoordinator for PostgresEventStore {
765 type Error = CoordinationError;
766 type Guard = CoordinationGuard;
767
768 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
769 try_acquire_advisory_lock(&self.pool, subscription_name).await
770 }
771}