1use std::time::Duration;
2
3use eventcore_types::{
4 CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
5 EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
6 StreamWriteEntry, StreamWrites,
7};
8use nutype::nutype;
9use serde_json::{Value, json};
10use sqlx::types::Json;
11use sqlx::{Pool, Postgres, Row, postgres::PgPoolOptions, query};
12use thiserror::Error;
13use tracing::{error, info, instrument, warn};
14use uuid::Uuid;
15
16#[derive(Debug, Error)]
17pub enum PostgresEventStoreError {
18 #[error("failed to create postgres connection pool")]
19 ConnectionFailed(#[source] sqlx::Error),
20}
21
22#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, Display, AsRef, Into))]
41pub struct MaxConnections(std::num::NonZeroU32);
42
43#[derive(Debug, Clone)]
45pub struct PostgresConfig {
46 pub max_connections: MaxConnections,
48 pub acquire_timeout: Duration,
50 pub idle_timeout: Duration,
52}
53
54impl Default for PostgresConfig {
55 fn default() -> Self {
56 const DEFAULT_MAX_CONNECTIONS: std::num::NonZeroU32 = match std::num::NonZeroU32::new(10) {
57 Some(v) => v,
58 None => unreachable!(),
59 };
60
61 Self {
62 max_connections: MaxConnections::new(DEFAULT_MAX_CONNECTIONS),
63 acquire_timeout: Duration::from_secs(30),
64 idle_timeout: Duration::from_secs(600), }
66 }
67}
68
69#[derive(Debug, Clone)]
70pub struct PostgresEventStore {
71 pool: Pool<Postgres>,
72}
73
74impl PostgresEventStore {
75 pub async fn new<S: Into<String>>(
77 connection_string: S,
78 ) -> Result<Self, PostgresEventStoreError> {
79 Self::with_config(connection_string, PostgresConfig::default()).await
80 }
81
82 pub async fn with_config<S: Into<String>>(
84 connection_string: S,
85 config: PostgresConfig,
86 ) -> Result<Self, PostgresEventStoreError> {
87 let connection_string = connection_string.into();
88 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
89 let pool = PgPoolOptions::new()
90 .max_connections(max_connections.get())
91 .acquire_timeout(config.acquire_timeout)
92 .idle_timeout(config.idle_timeout)
93 .connect(&connection_string)
94 .await
95 .map_err(PostgresEventStoreError::ConnectionFailed)?;
96 Ok(Self { pool })
97 }
98
99 pub fn from_pool(pool: Pool<Postgres>) -> Self {
104 Self { pool }
105 }
106
107 #[cfg_attr(test, mutants::skip)] pub async fn ping(&self) {
109 query("SELECT 1")
110 .execute(&self.pool)
111 .await
112 .expect("postgres ping failed");
113 }
114
115 #[cfg_attr(test, mutants::skip)] pub async fn migrate(&self) {
117 sqlx::migrate!("./migrations")
118 .run(&self.pool)
119 .await
120 .expect("postgres migration failed");
121 }
122}
123
124impl EventStore for PostgresEventStore {
125 #[instrument(name = "postgres.read_stream", skip(self))]
126 async fn read_stream<E: Event>(
127 &self,
128 stream_id: StreamId,
129 ) -> Result<EventStreamReader<E>, EventStoreError> {
130 info!(
131 stream = %stream_id,
132 "[postgres.read_stream] reading events from postgres"
133 );
134
135 let rows = query(
136 "SELECT event_data FROM eventcore_events WHERE stream_id = $1 ORDER BY stream_version ASC",
137 )
138 .bind(stream_id.as_ref())
139 .fetch_all(&self.pool)
140 .await
141 .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
142
143 let mut events = Vec::with_capacity(rows.len());
144 for row in rows {
145 let payload: Value = row
146 .try_get("event_data")
147 .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
148 let event = serde_json::from_value(payload).map_err(|error| {
149 EventStoreError::DeserializationFailed {
150 stream_id: stream_id.clone(),
151 detail: error.to_string(),
152 }
153 })?;
154 events.push(event);
155 }
156
157 Ok(EventStreamReader::new(events))
158 }
159
160 #[instrument(name = "postgres.append_events", skip(self, writes))]
161 async fn append_events(
162 &self,
163 writes: StreamWrites,
164 ) -> Result<EventStreamSlice, EventStoreError> {
165 let expected_versions = writes.expected_versions().clone();
166 let entries = writes.into_entries();
167
168 if entries.is_empty() {
169 return Ok(EventStreamSlice);
170 }
171
172 info!(
173 stream_count = expected_versions.len(),
174 event_count = entries.len(),
175 "[postgres.append_events] appending events to postgres"
176 );
177
178 let expected_versions_json: Value = expected_versions
180 .iter()
181 .map(|(stream_id, version)| {
182 (stream_id.as_ref().to_string(), json!(version.into_inner()))
183 })
184 .collect();
185
186 let mut tx = self
187 .pool
188 .begin()
189 .await
190 .map_err(|error| map_sqlx_error(error, Operation::BeginTransaction))?;
191
192 query("SELECT set_config('eventcore.expected_versions', $1, true)")
194 .bind(expected_versions_json.to_string())
195 .execute(&mut *tx)
196 .await
197 .map_err(|error| map_sqlx_error(error, Operation::SetExpectedVersions))?;
198
199 for entry in entries {
201 let StreamWriteEntry {
202 stream_id,
203 event_type,
204 event_data,
205 ..
206 } = entry;
207
208 let event_id = Uuid::now_v7();
209 query(
210 "INSERT INTO eventcore_events (event_id, stream_id, event_type, event_data, metadata)
211 VALUES ($1, $2, $3, $4, $5)",
212 )
213 .bind(event_id)
214 .bind(stream_id.as_ref())
215 .bind(event_type)
216 .bind(Json(event_data))
217 .bind(Json(json!({})))
218 .execute(&mut *tx)
219 .await
220 .map_err(|error| map_sqlx_error(error, Operation::AppendEvents))?;
221 }
222
223 tx.commit()
224 .await
225 .map_err(|error| map_sqlx_error(error, Operation::CommitTransaction))?;
226
227 Ok(EventStreamSlice)
228 }
229}
230
231impl CheckpointStore for PostgresEventStore {
232 type Error = PostgresCheckpointError;
233
234 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
235 let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
236 .bind(name)
237 .fetch_optional(&self.pool)
238 .await
239 .map_err(PostgresCheckpointError::DatabaseError)?;
240
241 match row {
242 Some(row) => {
243 let position: Uuid = row.get("last_position");
244 Ok(Some(StreamPosition::new(position)))
245 }
246 None => Ok(None),
247 }
248 }
249
250 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
251 let position_uuid: Uuid = position.into_inner();
252 query(
253 "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
254 VALUES ($1, $2, NOW())
255 ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
256 )
257 .bind(name)
258 .bind(position_uuid)
259 .execute(&self.pool)
260 .await
261 .map_err(PostgresCheckpointError::DatabaseError)?;
262
263 Ok(())
264 }
265}
266
267impl EventReader for PostgresEventStore {
268 type Error = EventStoreError;
269
270 async fn read_events<E: Event>(
271 &self,
272 filter: EventFilter,
273 page: EventPage,
274 ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
275 let after_event_id: Option<Uuid> = page.after_position().map(|p| p.into_inner());
278 let limit: i64 = page.limit().into_inner() as i64;
279
280 let rows = if let Some(prefix) = filter.stream_prefix() {
281 let prefix_str = prefix.as_ref();
282
283 if let Some(after_id) = after_event_id {
284 let query_str = r#"
285 SELECT event_id, event_data, stream_id
286 FROM eventcore_events
287 WHERE event_id > $1
288 AND stream_id LIKE $2 || '%'
289 ORDER BY event_id
290 LIMIT $3
291 "#;
292 query(query_str)
293 .bind(after_id)
294 .bind(prefix_str)
295 .bind(limit)
296 .fetch_all(&self.pool)
297 .await
298 } else {
299 let query_str = r#"
300 SELECT event_id, event_data, stream_id
301 FROM eventcore_events
302 WHERE stream_id LIKE $1 || '%'
303 ORDER BY event_id
304 LIMIT $2
305 "#;
306 query(query_str)
307 .bind(prefix_str)
308 .bind(limit)
309 .fetch_all(&self.pool)
310 .await
311 }
312 } else if let Some(after_id) = after_event_id {
313 let query_str = r#"
314 SELECT event_id, event_data, stream_id
315 FROM eventcore_events
316 WHERE event_id > $1
317 ORDER BY event_id
318 LIMIT $2
319 "#;
320 query(query_str)
321 .bind(after_id)
322 .bind(limit)
323 .fetch_all(&self.pool)
324 .await
325 } else {
326 let query_str = r#"
327 SELECT event_id, event_data, stream_id
328 FROM eventcore_events
329 ORDER BY event_id
330 LIMIT $1
331 "#;
332 query(query_str).bind(limit).fetch_all(&self.pool).await
333 }
334 .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
335
336 let events: Vec<(E, StreamPosition)> = rows
337 .into_iter()
338 .filter_map(|row| {
339 let event_data: Json<Value> = row.get("event_data");
340 let event_id: Uuid = row.get("event_id");
341 serde_json::from_value::<E>(event_data.0)
342 .ok()
343 .map(|e| (e, StreamPosition::new(event_id)))
344 })
345 .collect();
346
347 Ok(events)
348 }
349}
350
351fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError {
352 if let sqlx::Error::Database(db_error) = &error {
353 let code = db_error.code();
354 let code_str = code.as_deref();
355 if code_str == Some("P0001") || code_str == Some("23505") {
358 warn!(
359 error = %db_error,
360 "[postgres.version_conflict] optimistic concurrency check failed"
361 );
362 return EventStoreError::VersionConflict;
363 }
364 }
365
366 error!(
367 error = %error,
368 operation = %operation,
369 "[postgres.database_error] database operation failed"
370 );
371 EventStoreError::StoreFailure { operation }
372}
373
374#[derive(Debug, Error)]
376pub enum PostgresCheckpointError {
377 #[error("failed to create postgres connection pool")]
379 ConnectionFailed(#[source] sqlx::Error),
380
381 #[error("database operation failed: {0}")]
383 DatabaseError(#[source] sqlx::Error),
384}
385
386#[derive(Debug, Clone)]
399pub struct PostgresCheckpointStore {
400 pool: Pool<Postgres>,
401}
402
403impl PostgresCheckpointStore {
404 pub async fn new<S: Into<String>>(
406 connection_string: S,
407 ) -> Result<Self, PostgresCheckpointError> {
408 Self::with_config(connection_string, PostgresConfig::default()).await
409 }
410
411 pub async fn with_config<S: Into<String>>(
413 connection_string: S,
414 config: PostgresConfig,
415 ) -> Result<Self, PostgresCheckpointError> {
416 let connection_string = connection_string.into();
417 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
418 let pool = PgPoolOptions::new()
419 .max_connections(max_connections.get())
420 .acquire_timeout(config.acquire_timeout)
421 .idle_timeout(config.idle_timeout)
422 .connect(&connection_string)
423 .await
424 .map_err(PostgresCheckpointError::ConnectionFailed)?;
425
426 sqlx::migrate!("./migrations")
428 .run(&pool)
429 .await
430 .map_err(|e| {
431 PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e)))
432 })?;
433
434 Ok(Self { pool })
435 }
436
437 pub fn from_pool(pool: Pool<Postgres>) -> Self {
442 Self { pool }
443 }
444}
445
446impl CheckpointStore for PostgresCheckpointStore {
447 type Error = PostgresCheckpointError;
448
449 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
450 let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
451 .bind(name)
452 .fetch_optional(&self.pool)
453 .await
454 .map_err(PostgresCheckpointError::DatabaseError)?;
455
456 match row {
457 Some(row) => {
458 let position: Uuid = row.get("last_position");
459 Ok(Some(StreamPosition::new(position)))
460 }
461 None => Ok(None),
462 }
463 }
464
465 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
466 let position_uuid: Uuid = position.into_inner();
467 query(
468 "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
469 VALUES ($1, $2, NOW())
470 ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
471 )
472 .bind(name)
473 .bind(position_uuid)
474 .execute(&self.pool)
475 .await
476 .map_err(PostgresCheckpointError::DatabaseError)?;
477
478 Ok(())
479 }
480}
481
482#[derive(Debug, Error)]
488pub enum CoordinationError {
489 #[error("leadership not acquired: another instance holds the lock")]
491 LeadershipNotAcquired,
492
493 #[error("database operation failed: {0}")]
495 DatabaseError(#[source] sqlx::Error),
496}
497
498pub struct CoordinationGuard {
528 lock_key: i64,
529 connection: Option<sqlx::pool::PoolConnection<Postgres>>,
532}
533
534impl std::fmt::Debug for CoordinationGuard {
535 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
536 f.debug_struct("CoordinationGuard")
537 .field("lock_key", &self.lock_key)
538 .finish_non_exhaustive()
539 }
540}
541
542impl Drop for CoordinationGuard {
543 fn drop(&mut self) {
544 if let Some(mut connection) = self.connection.take() {
546 let lock_key = self.lock_key;
547
548 let handle = tokio::runtime::Handle::current();
551 let is_multi_thread =
552 handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread;
553
554 if is_multi_thread {
555 tokio::task::block_in_place(|| {
557 handle.block_on(async {
558 let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
560 .bind(lock_key)
561 .execute(&mut *connection)
562 .await;
563 });
565 });
566 } else {
567 tokio::spawn(async move {
572 let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
573 .bind(lock_key)
574 .execute(&mut *connection)
575 .await;
576 });
577 }
578 }
579 }
580}
581
582#[derive(Debug, Clone)]
588pub struct PostgresProjectorCoordinator {
589 pool: Pool<Postgres>,
590}
591
592impl PostgresProjectorCoordinator {
593 pub async fn new<S: Into<String>>(connection_string: S) -> Result<Self, CoordinationError> {
595 Self::with_config(connection_string, PostgresConfig::default()).await
596 }
597
598 pub async fn with_config<S: Into<String>>(
600 connection_string: S,
601 config: PostgresConfig,
602 ) -> Result<Self, CoordinationError> {
603 let connection_string = connection_string.into();
604 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
605 let pool = PgPoolOptions::new()
606 .max_connections(max_connections.get())
607 .acquire_timeout(config.acquire_timeout)
608 .idle_timeout(config.idle_timeout)
609 .connect(&connection_string)
610 .await
611 .map_err(CoordinationError::DatabaseError)?;
612
613 Ok(Self { pool })
614 }
615
616 pub fn from_pool(pool: Pool<Postgres>) -> Self {
618 Self { pool }
619 }
620}
621
622impl ProjectorCoordinator for PostgresProjectorCoordinator {
623 type Error = CoordinationError;
624 type Guard = CoordinationGuard;
625
626 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
627 use std::collections::hash_map::DefaultHasher;
628 use std::hash::{Hash, Hasher};
629
630 let mut hasher = DefaultHasher::new();
632 subscription_name.hash(&mut hasher);
633 let lock_key = hasher.finish() as i64;
634
635 let mut connection = self
639 .pool
640 .acquire()
641 .await
642 .map_err(CoordinationError::DatabaseError)?;
643
644 let row = sqlx::query("SELECT pg_try_advisory_lock($1)")
646 .bind(lock_key)
647 .fetch_one(&mut *connection)
648 .await
649 .map_err(CoordinationError::DatabaseError)?;
650
651 let acquired: bool = row.get(0);
652
653 if acquired {
654 Ok(CoordinationGuard {
655 lock_key,
656 connection: Some(connection),
657 })
658 } else {
659 Err(CoordinationError::LeadershipNotAcquired)
661 }
662 }
663}
664
665impl ProjectorCoordinator for PostgresEventStore {
666 type Error = CoordinationError;
667 type Guard = CoordinationGuard;
668
669 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
670 use std::collections::hash_map::DefaultHasher;
671 use std::hash::{Hash, Hasher};
672
673 let mut hasher = DefaultHasher::new();
675 subscription_name.hash(&mut hasher);
676 let lock_key = hasher.finish() as i64;
677
678 let mut connection = self
682 .pool
683 .acquire()
684 .await
685 .map_err(CoordinationError::DatabaseError)?;
686
687 let row = sqlx::query("SELECT pg_try_advisory_lock($1)")
689 .bind(lock_key)
690 .fetch_one(&mut *connection)
691 .await
692 .map_err(CoordinationError::DatabaseError)?;
693
694 let acquired: bool = row.get(0);
695
696 if acquired {
697 Ok(CoordinationGuard {
698 lock_key,
699 connection: Some(connection),
700 })
701 } else {
702 Err(CoordinationError::LeadershipNotAcquired)
704 }
705 }
706}