1use std::time::Duration;
2
3use eventcore_types::{
4 CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
5 EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
6 StreamVersion, StreamWriteEntry, StreamWrites,
7};
8use nutype::nutype;
9use serde_json::{Value, json};
10use sqlx::types::Json;
11use sqlx::{Pool, Postgres, Row, postgres::PgPoolOptions, query};
12use thiserror::Error;
13use tracing::{error, info, instrument, warn};
14use uuid::Uuid;
15
16#[derive(Debug, Error)]
17pub enum PostgresEventStoreError {
18 #[error("failed to create postgres connection pool")]
19 ConnectionFailed(#[source] sqlx::Error),
20}
21
22#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, Display, AsRef, Into))]
41pub struct MaxConnections(std::num::NonZeroU32);
42
43#[derive(Debug, Clone)]
45pub struct PostgresConfig {
46 pub max_connections: MaxConnections,
48 pub acquire_timeout: Duration,
50 pub idle_timeout: Duration,
52}
53
54impl Default for PostgresConfig {
55 fn default() -> Self {
56 const DEFAULT_MAX_CONNECTIONS: std::num::NonZeroU32 = match std::num::NonZeroU32::new(10) {
57 Some(v) => v,
58 None => unreachable!(),
59 };
60
61 Self {
62 max_connections: MaxConnections::new(DEFAULT_MAX_CONNECTIONS),
63 acquire_timeout: Duration::from_secs(30),
64 idle_timeout: Duration::from_secs(600), }
66 }
67}
68
69#[derive(Debug, Clone)]
70pub struct PostgresEventStore {
71 pool: Pool<Postgres>,
72}
73
74impl PostgresEventStore {
75 pub async fn new<S: Into<String>>(
77 connection_string: S,
78 ) -> Result<Self, PostgresEventStoreError> {
79 Self::with_config(connection_string, PostgresConfig::default()).await
80 }
81
82 pub async fn with_config<S: Into<String>>(
84 connection_string: S,
85 config: PostgresConfig,
86 ) -> Result<Self, PostgresEventStoreError> {
87 let connection_string = connection_string.into();
88 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
89 let pool = PgPoolOptions::new()
90 .max_connections(max_connections.get())
91 .acquire_timeout(config.acquire_timeout)
92 .idle_timeout(config.idle_timeout)
93 .connect(&connection_string)
94 .await
95 .map_err(PostgresEventStoreError::ConnectionFailed)?;
96 Ok(Self { pool })
97 }
98
99 pub fn from_pool(pool: Pool<Postgres>) -> Self {
104 Self { pool }
105 }
106
107 #[cfg_attr(test, mutants::skip)] pub async fn ping(&self) {
109 let _ = query("SELECT 1")
110 .execute(&self.pool)
111 .await
112 .expect("postgres ping failed");
113 }
114
115 #[cfg_attr(test, mutants::skip)] pub async fn migrate(&self) {
117 sqlx::migrate!("./migrations")
118 .run(&self.pool)
119 .await
120 .expect("postgres migration failed");
121 }
122}
123
124impl EventStore for PostgresEventStore {
125 #[instrument(name = "postgres.read_stream", skip(self))]
126 async fn read_stream<E: Event>(
127 &self,
128 stream_id: StreamId,
129 ) -> Result<EventStreamReader<E>, EventStoreError> {
130 info!(
131 stream = %stream_id,
132 "[postgres.read_stream] reading events from postgres"
133 );
134
135 let rows = query(
136 "SELECT event_data FROM eventcore_events WHERE stream_id = $1 ORDER BY stream_version ASC",
137 )
138 .bind(stream_id.as_ref())
139 .fetch_all(&self.pool)
140 .await
141 .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
142
143 let mut events = Vec::with_capacity(rows.len());
144 for row in rows {
145 let payload: Value = row
146 .try_get("event_data")
147 .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
148 let event = serde_json::from_value(payload).map_err(|error| {
149 EventStoreError::DeserializationFailed {
150 stream_id: stream_id.clone(),
151 detail: error.to_string(),
152 }
153 })?;
154 events.push(event);
155 }
156
157 Ok(EventStreamReader::new(events))
158 }
159
160 #[instrument(name = "postgres.append_events", skip(self, writes))]
161 async fn append_events(
162 &self,
163 writes: StreamWrites,
164 ) -> Result<EventStreamSlice, EventStoreError> {
165 let expected_versions = writes.expected_versions().clone();
166 let entries = writes.into_entries();
167
168 if entries.is_empty() {
169 return Ok(EventStreamSlice);
170 }
171
172 info!(
173 stream_count = expected_versions.len(),
174 event_count = entries.len(),
175 "[postgres.append_events] appending events to postgres"
176 );
177
178 let expected_versions_json: Value = expected_versions
180 .iter()
181 .map(|(stream_id, version)| {
182 (stream_id.as_ref().to_string(), json!(version.into_inner()))
183 })
184 .collect();
185
186 let mut tx = self
187 .pool
188 .begin()
189 .await
190 .map_err(|error| map_sqlx_error(error, Operation::BeginTransaction))?;
191
192 let _ = query("SELECT set_config('eventcore.expected_versions', $1, true)")
194 .bind(expected_versions_json.to_string())
195 .execute(&mut *tx)
196 .await
197 .map_err(|error| map_sqlx_error(error, Operation::SetExpectedVersions))?;
198
199 for entry in entries {
201 let StreamWriteEntry {
202 stream_id,
203 event_type,
204 event_data,
205 ..
206 } = entry;
207
208 let event_id = Uuid::now_v7();
209 let _ = query(
210 "INSERT INTO eventcore_events (event_id, stream_id, event_type, event_data, metadata)
211 VALUES ($1, $2, $3, $4, $5)",
212 )
213 .bind(event_id)
214 .bind(stream_id.as_ref())
215 .bind(event_type)
216 .bind(Json(event_data))
217 .bind(Json(json!({})))
218 .execute(&mut *tx)
219 .await
220 .map_err(|error| map_sqlx_error(error, Operation::AppendEvents))?;
221 }
222
223 tx.commit()
224 .await
225 .map_err(|error| map_sqlx_error(error, Operation::CommitTransaction))?;
226
227 Ok(EventStreamSlice)
228 }
229}
230
231impl CheckpointStore for PostgresEventStore {
232 type Error = PostgresCheckpointError;
233
234 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
235 let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
236 .bind(name)
237 .fetch_optional(&self.pool)
238 .await
239 .map_err(PostgresCheckpointError::DatabaseError)?;
240
241 match row {
242 Some(row) => {
243 let position: Uuid = row.get("last_position");
244 Ok(Some(StreamPosition::new(position)))
245 }
246 None => Ok(None),
247 }
248 }
249
250 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
251 let position_uuid: Uuid = position.into_inner();
252 let _ = query(
253 "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
254 VALUES ($1, $2, NOW())
255 ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
256 )
257 .bind(name)
258 .bind(position_uuid)
259 .execute(&self.pool)
260 .await
261 .map_err(PostgresCheckpointError::DatabaseError)?;
262
263 Ok(())
264 }
265}
266
267impl EventReader for PostgresEventStore {
268 type Error = EventStoreError;
269
270 async fn read_events<E: Event>(
271 &self,
272 filter: EventFilter,
273 page: EventPage,
274 ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
275 let after_event_id: Option<Uuid> = page.after_position().map(|p| p.into_inner());
278 let limit: i64 = page.limit().into_inner() as i64;
279
280 let rows = if let Some(prefix) = filter.stream_prefix() {
281 let prefix_str = prefix.as_ref();
282
283 if let Some(after_id) = after_event_id {
284 let query_str = r#"
285 SELECT event_id, event_data, stream_id
286 FROM eventcore_events
287 WHERE event_id > $1
288 AND stream_id LIKE $2 || '%'
289 ORDER BY event_id
290 LIMIT $3
291 "#;
292 query(query_str)
293 .bind(after_id)
294 .bind(prefix_str)
295 .bind(limit)
296 .fetch_all(&self.pool)
297 .await
298 } else {
299 let query_str = r#"
300 SELECT event_id, event_data, stream_id
301 FROM eventcore_events
302 WHERE stream_id LIKE $1 || '%'
303 ORDER BY event_id
304 LIMIT $2
305 "#;
306 query(query_str)
307 .bind(prefix_str)
308 .bind(limit)
309 .fetch_all(&self.pool)
310 .await
311 }
312 } else if let Some(after_id) = after_event_id {
313 let query_str = r#"
314 SELECT event_id, event_data, stream_id
315 FROM eventcore_events
316 WHERE event_id > $1
317 ORDER BY event_id
318 LIMIT $2
319 "#;
320 query(query_str)
321 .bind(after_id)
322 .bind(limit)
323 .fetch_all(&self.pool)
324 .await
325 } else {
326 let query_str = r#"
327 SELECT event_id, event_data, stream_id
328 FROM eventcore_events
329 ORDER BY event_id
330 LIMIT $1
331 "#;
332 query(query_str).bind(limit).fetch_all(&self.pool).await
333 }
334 .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
335
336 let events: Vec<(E, StreamPosition)> = rows
337 .into_iter()
338 .filter_map(|row| {
339 let event_data: Json<Value> = row.get("event_data");
340 let event_id: Uuid = row.get("event_id");
341 serde_json::from_value::<E>(event_data.0)
342 .ok()
343 .map(|e| (e, StreamPosition::new(event_id)))
344 })
345 .collect();
346
347 Ok(events)
348 }
349}
350
351fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError {
352 if let sqlx::Error::Database(db_error) = &error {
353 let code = db_error.code();
354 let code_str = code.as_deref();
355 if code_str == Some("P0001") || code_str == Some("23505") {
358 warn!(
359 error = %db_error,
360 "[postgres.version_conflict] optimistic concurrency check failed"
361 );
362 return parse_version_conflict_from_db_error(db_error.message());
363 }
364 }
365
366 error!(
367 error = %error,
368 operation = %operation,
369 "[postgres.database_error] database operation failed"
370 );
371 EventStoreError::StoreFailure { operation }
372}
373
374fn parse_version_conflict_from_db_error(message: &str) -> EventStoreError {
382 if let Some(parsed) = try_parse_conflict_message(message) {
384 return parsed;
385 }
386
387 let fallback_stream_id =
390 StreamId::try_new("unknown-conflict-stream").expect("static stream id is valid");
391 EventStoreError::VersionConflict {
392 stream_id: fallback_stream_id,
393 expected: StreamVersion::new(0),
394 actual: StreamVersion::new(0),
395 }
396}
397
398fn try_parse_conflict_message(message: &str) -> Option<EventStoreError> {
399 let rest = message.strip_prefix("version_conflict: stream \"")?;
400 let stream_end = rest.find('"')?;
401 let stream_id_str = &rest[..stream_end];
402 let after_stream = &rest[stream_end..];
403
404 let expected_str = after_stream
405 .strip_prefix("\" expected version ")?
406 .split(',')
407 .next()?;
408 let actual_str = after_stream.rsplit("actual ").next()?;
409
410 let expected = expected_str.trim().parse::<usize>().ok()?;
411 let actual = actual_str.trim().parse::<usize>().ok()?;
412 let stream_id = StreamId::try_new(stream_id_str).ok()?;
413
414 Some(EventStoreError::VersionConflict {
415 stream_id,
416 expected: StreamVersion::new(expected),
417 actual: StreamVersion::new(actual),
418 })
419}
420
421#[derive(Debug, Error)]
423pub enum PostgresCheckpointError {
424 #[error("failed to create postgres connection pool")]
426 ConnectionFailed(#[source] sqlx::Error),
427
428 #[error("database operation failed: {0}")]
430 DatabaseError(#[source] sqlx::Error),
431}
432
433#[derive(Debug, Clone)]
446pub struct PostgresCheckpointStore {
447 pool: Pool<Postgres>,
448}
449
450impl PostgresCheckpointStore {
451 pub async fn new<S: Into<String>>(
453 connection_string: S,
454 ) -> Result<Self, PostgresCheckpointError> {
455 Self::with_config(connection_string, PostgresConfig::default()).await
456 }
457
458 pub async fn with_config<S: Into<String>>(
460 connection_string: S,
461 config: PostgresConfig,
462 ) -> Result<Self, PostgresCheckpointError> {
463 let connection_string = connection_string.into();
464 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
465 let pool = PgPoolOptions::new()
466 .max_connections(max_connections.get())
467 .acquire_timeout(config.acquire_timeout)
468 .idle_timeout(config.idle_timeout)
469 .connect(&connection_string)
470 .await
471 .map_err(PostgresCheckpointError::ConnectionFailed)?;
472
473 sqlx::migrate!("./migrations")
475 .run(&pool)
476 .await
477 .map_err(|e| {
478 PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e)))
479 })?;
480
481 Ok(Self { pool })
482 }
483
484 pub fn from_pool(pool: Pool<Postgres>) -> Self {
489 Self { pool }
490 }
491}
492
493impl CheckpointStore for PostgresCheckpointStore {
494 type Error = PostgresCheckpointError;
495
496 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
497 let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
498 .bind(name)
499 .fetch_optional(&self.pool)
500 .await
501 .map_err(PostgresCheckpointError::DatabaseError)?;
502
503 match row {
504 Some(row) => {
505 let position: Uuid = row.get("last_position");
506 Ok(Some(StreamPosition::new(position)))
507 }
508 None => Ok(None),
509 }
510 }
511
512 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
513 let position_uuid: Uuid = position.into_inner();
514 let _ = query(
515 "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
516 VALUES ($1, $2, NOW())
517 ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
518 )
519 .bind(name)
520 .bind(position_uuid)
521 .execute(&self.pool)
522 .await
523 .map_err(PostgresCheckpointError::DatabaseError)?;
524
525 Ok(())
526 }
527}
528
529#[derive(Debug, Error)]
535pub enum CoordinationError {
536 #[error(
538 "leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
539 )]
540 LeadershipNotAcquired { subscription_name: String },
541
542 #[error("database operation failed: {0}")]
544 DatabaseError(#[source] sqlx::Error),
545}
546
547pub struct CoordinationGuard {
577 lock_key: i64,
578 connection: Option<sqlx::pool::PoolConnection<Postgres>>,
581}
582
583impl std::fmt::Debug for CoordinationGuard {
584 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
585 f.debug_struct("CoordinationGuard")
586 .field("lock_key", &self.lock_key)
587 .finish_non_exhaustive()
588 }
589}
590
591impl Drop for CoordinationGuard {
592 fn drop(&mut self) {
593 if let Some(mut connection) = self.connection.take() {
595 let lock_key = self.lock_key;
596
597 let handle = tokio::runtime::Handle::current();
600 let is_multi_thread =
601 handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread;
602
603 if is_multi_thread {
604 tokio::task::block_in_place(|| {
606 handle.block_on(async {
607 if let Err(e) = query("SELECT pg_advisory_unlock($1)")
609 .bind(lock_key)
610 .execute(&mut *connection)
611 .await
612 {
613 warn!(
614 lock_key = lock_key,
615 error = %e,
616 "failed to release advisory lock on drop"
617 );
618 }
619 });
621 });
622 } else {
623 drop(tokio::spawn(async move {
628 if let Err(e) = query("SELECT pg_advisory_unlock($1)")
629 .bind(lock_key)
630 .execute(&mut *connection)
631 .await
632 {
633 warn!(
634 lock_key = lock_key,
635 error = %e,
636 "failed to release advisory lock on drop (async)"
637 );
638 }
639 }));
640 }
641 }
642 }
643}
644
645#[derive(Debug, Clone)]
651pub struct PostgresProjectorCoordinator {
652 pool: Pool<Postgres>,
653}
654
655impl PostgresProjectorCoordinator {
656 pub async fn new<S: Into<String>>(connection_string: S) -> Result<Self, CoordinationError> {
658 Self::with_config(connection_string, PostgresConfig::default()).await
659 }
660
661 pub async fn with_config<S: Into<String>>(
663 connection_string: S,
664 config: PostgresConfig,
665 ) -> Result<Self, CoordinationError> {
666 let connection_string = connection_string.into();
667 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
668 let pool = PgPoolOptions::new()
669 .max_connections(max_connections.get())
670 .acquire_timeout(config.acquire_timeout)
671 .idle_timeout(config.idle_timeout)
672 .connect(&connection_string)
673 .await
674 .map_err(CoordinationError::DatabaseError)?;
675
676 Ok(Self { pool })
677 }
678
679 pub fn from_pool(pool: Pool<Postgres>) -> Self {
681 Self { pool }
682 }
683}
684
685fn advisory_lock_key(subscription_name: &str) -> i64 {
690 const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325;
691 const FNV_PRIME: u64 = 0x00000100000001B3;
692
693 let mut hash = FNV_OFFSET_BASIS;
694 for byte in subscription_name.as_bytes() {
695 hash ^= *byte as u64;
696 hash = hash.wrapping_mul(FNV_PRIME);
697 }
698 hash as i64
699}
700
701async fn try_acquire_advisory_lock(
704 pool: &Pool<Postgres>,
705 subscription_name: &str,
706) -> Result<CoordinationGuard, CoordinationError> {
707 let lock_key = advisory_lock_key(subscription_name);
708
709 let mut connection = pool
713 .acquire()
714 .await
715 .map_err(CoordinationError::DatabaseError)?;
716
717 let row = query("SELECT pg_try_advisory_lock($1)")
719 .bind(lock_key)
720 .fetch_one(&mut *connection)
721 .await
722 .map_err(CoordinationError::DatabaseError)?;
723
724 let acquired: bool = row.get(0);
725
726 if acquired {
727 Ok(CoordinationGuard {
728 lock_key,
729 connection: Some(connection),
730 })
731 } else {
732 Err(CoordinationError::LeadershipNotAcquired {
734 subscription_name: subscription_name.to_string(),
735 })
736 }
737}
738
739impl ProjectorCoordinator for PostgresProjectorCoordinator {
740 type Error = CoordinationError;
741 type Guard = CoordinationGuard;
742
743 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
744 try_acquire_advisory_lock(&self.pool, subscription_name).await
745 }
746}
747
748impl ProjectorCoordinator for PostgresEventStore {
749 type Error = CoordinationError;
750 type Guard = CoordinationGuard;
751
752 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
753 try_acquire_advisory_lock(&self.pool, subscription_name).await
754 }
755}