1use std::time::Duration;
2
3use eventcore_types::{
4 CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
5 EventStream, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
6 StreamVersion, StreamWrites,
7};
8use futures::StreamExt;
9use nutype::nutype;
10use serde_json::value::RawValue;
11use serde_json::{Value, json};
12use sqlx::types::Json;
13use sqlx::{Pool, Postgres, QueryBuilder, Row, postgres::PgPoolOptions, query};
14use thiserror::Error;
15use tracing::{error, info, instrument, warn};
16use uuid::Uuid;
17
18#[derive(Debug, Error)]
19pub enum PostgresEventStoreError {
20 #[error("failed to create postgres connection pool")]
21 ConnectionFailed(#[source] sqlx::Error),
22}
23
24#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, Display, AsRef, Into))]
43pub struct MaxConnections(std::num::NonZeroU32);
44
45#[derive(Debug, Clone)]
47pub struct PostgresConfig {
48 pub max_connections: MaxConnections,
50 pub acquire_timeout: Duration,
52 pub idle_timeout: Duration,
54}
55
56impl Default for PostgresConfig {
57 fn default() -> Self {
58 const DEFAULT_MAX_CONNECTIONS: std::num::NonZeroU32 = match std::num::NonZeroU32::new(10) {
59 Some(v) => v,
60 None => unreachable!(),
61 };
62
63 Self {
64 max_connections: MaxConnections::new(DEFAULT_MAX_CONNECTIONS),
65 acquire_timeout: Duration::from_secs(30),
66 idle_timeout: Duration::from_secs(600), }
68 }
69}
70
71#[derive(Debug, Clone)]
72pub struct PostgresEventStore {
73 pool: Pool<Postgres>,
74}
75
76impl PostgresEventStore {
77 pub async fn new<S: Into<String>>(
79 connection_string: S,
80 ) -> Result<Self, PostgresEventStoreError> {
81 Self::with_config(connection_string, PostgresConfig::default()).await
82 }
83
84 pub async fn with_config<S: Into<String>>(
86 connection_string: S,
87 config: PostgresConfig,
88 ) -> Result<Self, PostgresEventStoreError> {
89 let connection_string = connection_string.into();
90 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
91 let pool = PgPoolOptions::new()
92 .max_connections(max_connections.get())
93 .acquire_timeout(config.acquire_timeout)
94 .idle_timeout(config.idle_timeout)
95 .connect(&connection_string)
96 .await
97 .map_err(PostgresEventStoreError::ConnectionFailed)?;
98 Ok(Self { pool })
99 }
100
101 pub fn from_pool(pool: Pool<Postgres>) -> Self {
106 Self { pool }
107 }
108
109 #[cfg_attr(test, mutants::skip)] pub async fn ping(&self) {
111 let _ = query("SELECT 1")
112 .execute(&self.pool)
113 .await
114 .expect("postgres ping failed");
115 }
116
117 #[cfg_attr(test, mutants::skip)] pub async fn migrate(&self) {
119 sqlx::migrate!("./migrations")
120 .run(&self.pool)
121 .await
122 .expect("postgres migration failed");
123 }
124}
125
126impl EventStore for PostgresEventStore {
127 #[instrument(name = "postgres.read_stream", skip(self))]
128 async fn read_stream<E: Event>(
129 &self,
130 stream_id: StreamId,
131 ) -> Result<EventStream<E>, EventStoreError> {
132 info!(
133 stream = %stream_id,
134 "[postgres.read_stream] reading events from postgres"
135 );
136
137 let pool = self.pool.clone();
143
144 let stream = async_stream::stream! {
145 let mut rows = query(
146 "SELECT event_data FROM eventcore_events WHERE stream_id = $1 ORDER BY stream_version ASC",
147 )
148 .bind(stream_id.as_ref())
149 .fetch(&pool);
150
151 while let Some(row) = rows.next().await {
152 let row = match row {
153 Ok(row) => row,
154 Err(error) => {
155 yield Err(map_sqlx_error(error, Operation::ReadStream));
156 break;
157 }
158 };
159
160 let payload: Value = match row.try_get("event_data") {
161 Ok(payload) => payload,
162 Err(error) => {
163 yield Err(map_sqlx_error(error, Operation::ReadStream));
164 break;
165 }
166 };
167
168 match serde_json::from_value::<E>(payload) {
172 Ok(event) => yield Ok(event),
173 Err(error) => {
174 yield Err(EventStoreError::DeserializationFailed {
175 stream_id: stream_id.clone(),
176 detail: error.to_string(),
177 });
178 break;
179 }
180 }
181 }
182 };
183
184 Ok(EventStream::new(stream))
185 }
186
187 #[instrument(name = "postgres.append_events", skip(self, writes))]
188 async fn append_events(
189 &self,
190 writes: StreamWrites,
191 ) -> Result<EventStreamSlice, EventStoreError> {
192 let expected_versions = writes.expected_versions().clone();
193 let entries = writes.into_entries();
194
195 if entries.is_empty() {
196 return Ok(EventStreamSlice);
197 }
198
199 info!(
200 stream_count = expected_versions.len(),
201 event_count = entries.len(),
202 "[postgres.append_events] appending events to postgres"
203 );
204
205 let expected_versions_json: Value = expected_versions
207 .iter()
208 .map(|(stream_id, version)| {
209 (stream_id.as_ref().to_string(), json!(version.into_inner()))
210 })
211 .collect();
212
213 let mut tx = self
214 .pool
215 .begin()
216 .await
217 .map_err(|error| map_sqlx_error(error, Operation::BeginTransaction))?;
218
219 let _ = query("SELECT set_config('eventcore.expected_versions', $1, true)")
221 .bind(expected_versions_json.to_string())
222 .execute(&mut *tx)
223 .await
224 .map_err(|error| map_sqlx_error(error, Operation::SetExpectedVersions))?;
225
226 let rows: Vec<(StreamId, &'static str, Box<RawValue>)> = entries
236 .into_iter()
237 .map(|entry| (entry.stream_id, entry.event_type, entry.event_data))
238 .collect();
239
240 const MAX_EVENTS_PER_INSERT: usize = 1000;
241 for chunk in rows.chunks(MAX_EVENTS_PER_INSERT) {
242 let mut builder = QueryBuilder::<Postgres>::new(
243 "INSERT INTO eventcore_events (event_id, stream_id, event_type, event_data, metadata) ",
244 );
245 let _ = builder.push_values(chunk, |mut row, (stream_id, event_type, event_data)| {
246 let _ = row
247 .push_bind(Uuid::now_v7())
248 .push_bind(stream_id.as_ref())
249 .push_bind(*event_type)
250 .push_bind(Json(event_data))
251 .push_bind(Json(json!({})));
252 });
253 let _ = builder
254 .build()
255 .execute(&mut *tx)
256 .await
257 .map_err(|error| map_sqlx_error(error, Operation::AppendEvents))?;
258 }
259
260 tx.commit()
261 .await
262 .map_err(|error| map_sqlx_error(error, Operation::CommitTransaction))?;
263
264 Ok(EventStreamSlice)
265 }
266}
267
268impl CheckpointStore for PostgresEventStore {
269 type Error = PostgresCheckpointError;
270
271 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
272 let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
273 .bind(name)
274 .fetch_optional(&self.pool)
275 .await
276 .map_err(PostgresCheckpointError::DatabaseError)?;
277
278 match row {
279 Some(row) => {
280 let position: Uuid = row.get("last_position");
281 Ok(Some(StreamPosition::new(position)))
282 }
283 None => Ok(None),
284 }
285 }
286
287 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
288 let position_uuid: Uuid = position.into_inner();
289 let _ = query(
290 "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
291 VALUES ($1, $2, NOW())
292 ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
293 )
294 .bind(name)
295 .bind(position_uuid)
296 .execute(&self.pool)
297 .await
298 .map_err(PostgresCheckpointError::DatabaseError)?;
299
300 Ok(())
301 }
302}
303
304impl EventReader for PostgresEventStore {
305 type Error = EventStoreError;
306
307 async fn read_events<E: Event>(
308 &self,
309 filter: EventFilter,
310 page: EventPage,
311 ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
312 let after_event_id: Option<Uuid> = page.after_position().map(|p| p.into_inner());
315 let limit: i64 = page.limit().into_inner() as i64;
316
317 let type_filter = filter.event_type().unwrap_or_else(|| E::event_type_name());
321
322 let pattern_regex = filter
327 .stream_pattern()
328 .map(|p| glob_to_anchored_regex(p.as_ref()));
329
330 let mut builder = QueryBuilder::<Postgres>::new(
334 "SELECT event_id, event_data, stream_id FROM eventcore_events WHERE event_type = ",
335 );
336 let _ = builder.push_bind(type_filter);
337
338 if let Some(after_id) = after_event_id {
339 let _ = builder.push(" AND event_id > ").push_bind(after_id);
340 }
341
342 if let Some(prefix) = filter.stream_prefix() {
343 let _ = builder
344 .push(" AND stream_id LIKE ")
345 .push_bind(prefix.as_ref().to_string())
346 .push(" || '%'");
347 } else if let Some(regex) = pattern_regex {
348 let _ = builder.push(" AND stream_id ~ ").push_bind(regex);
349 }
350
351 let _ = builder.push(" ORDER BY event_id LIMIT ").push_bind(limit);
352
353 let rows = builder
354 .build()
355 .fetch_all(&self.pool)
356 .await
357 .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
358
359 let events: Vec<(E, StreamPosition)> = rows
360 .into_iter()
361 .filter_map(|row| {
362 let event_data: Json<Value> = row.get("event_data");
363 let event_id: Uuid = row.get("event_id");
364 serde_json::from_value::<E>(event_data.0)
365 .ok()
366 .map(|e| (e, StreamPosition::new(event_id)))
367 })
368 .collect();
369
370 Ok(events)
371 }
372}
373
374fn glob_to_anchored_regex(glob: &str) -> String {
390 let mut regex = String::with_capacity(glob.len() + 2);
391 regex.push('^');
392
393 let mut chars = glob.chars().peekable();
394 while let Some(c) = chars.next() {
395 match c {
396 '*' => regex.push_str(".*"),
397 '?' => regex.push('.'),
398 '[' => {
399 let mut class = String::new();
401 let mut closed = false;
402 if matches!(chars.peek(), Some('!')) {
403 let _ = chars.next();
404 class.push('^');
405 }
406 for inner in chars.by_ref() {
407 if inner == ']' {
408 closed = true;
409 break;
410 }
411 class.push(inner);
412 }
413 if closed {
414 regex.push('[');
415 regex.push_str(&class);
416 regex.push(']');
417 } else {
418 regex.push_str(®ex_escape("["));
421 regex.push_str(®ex_escape(&class));
422 }
423 }
424 other => regex.push_str(®ex_escape(&other.to_string())),
425 }
426 }
427
428 regex.push('$');
429 regex
430}
431
432fn regex_escape(literal: &str) -> String {
435 const METACHARACTERS: &[char] = &[
436 '.', '^', '$', '*', '+', '?', '(', ')', '[', ']', '{', '}', '|', '\\',
437 ];
438 let mut escaped = String::with_capacity(literal.len());
439 for c in literal.chars() {
440 if METACHARACTERS.contains(&c) {
441 escaped.push('\\');
442 }
443 escaped.push(c);
444 }
445 escaped
446}
447
448fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError {
449 if let sqlx::Error::Database(db_error) = &error {
450 let code = db_error.code();
451 let code_str = code.as_deref();
452 if code_str == Some("P0001") || code_str == Some("23505") {
455 warn!(
456 error = %db_error,
457 "[postgres.version_conflict] optimistic concurrency check failed"
458 );
459 return parse_version_conflict_from_db_error(db_error.message());
460 }
461 }
462
463 error!(
464 error = %error,
465 operation = %operation,
466 "[postgres.database_error] database operation failed"
467 );
468 EventStoreError::StoreFailure { operation }
469}
470
471fn parse_version_conflict_from_db_error(message: &str) -> EventStoreError {
479 if let Some(parsed) = try_parse_conflict_message(message) {
481 return parsed;
482 }
483
484 let fallback_stream_id =
487 StreamId::try_new("unknown-conflict-stream").expect("static stream id is valid");
488 EventStoreError::VersionConflict {
489 stream_id: fallback_stream_id,
490 expected: StreamVersion::new(0),
491 actual: StreamVersion::new(0),
492 }
493}
494
495fn try_parse_conflict_message(message: &str) -> Option<EventStoreError> {
496 let rest = message.strip_prefix("version_conflict: stream \"")?;
497 let stream_end = rest.find('"')?;
498 let stream_id_str = &rest[..stream_end];
499 let after_stream = &rest[stream_end..];
500
501 let expected_str = after_stream
502 .strip_prefix("\" expected version ")?
503 .split(',')
504 .next()?;
505 let actual_str = after_stream.rsplit("actual ").next()?;
506
507 let expected = expected_str.trim().parse::<usize>().ok()?;
508 let actual = actual_str.trim().parse::<usize>().ok()?;
509 let stream_id = StreamId::try_new(stream_id_str).ok()?;
510
511 Some(EventStoreError::VersionConflict {
512 stream_id,
513 expected: StreamVersion::new(expected),
514 actual: StreamVersion::new(actual),
515 })
516}
517
518#[derive(Debug, Error)]
520pub enum PostgresCheckpointError {
521 #[error("failed to create postgres connection pool")]
523 ConnectionFailed(#[source] sqlx::Error),
524
525 #[error("database operation failed: {0}")]
527 DatabaseError(#[source] sqlx::Error),
528}
529
530#[derive(Debug, Clone)]
543pub struct PostgresCheckpointStore {
544 pool: Pool<Postgres>,
545}
546
547impl PostgresCheckpointStore {
548 pub async fn new<S: Into<String>>(
550 connection_string: S,
551 ) -> Result<Self, PostgresCheckpointError> {
552 Self::with_config(connection_string, PostgresConfig::default()).await
553 }
554
555 pub async fn with_config<S: Into<String>>(
557 connection_string: S,
558 config: PostgresConfig,
559 ) -> Result<Self, PostgresCheckpointError> {
560 let connection_string = connection_string.into();
561 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
562 let pool = PgPoolOptions::new()
563 .max_connections(max_connections.get())
564 .acquire_timeout(config.acquire_timeout)
565 .idle_timeout(config.idle_timeout)
566 .connect(&connection_string)
567 .await
568 .map_err(PostgresCheckpointError::ConnectionFailed)?;
569
570 sqlx::migrate!("./migrations")
572 .run(&pool)
573 .await
574 .map_err(|e| {
575 PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e)))
576 })?;
577
578 Ok(Self { pool })
579 }
580
581 pub fn from_pool(pool: Pool<Postgres>) -> Self {
586 Self { pool }
587 }
588}
589
590impl CheckpointStore for PostgresCheckpointStore {
591 type Error = PostgresCheckpointError;
592
593 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
594 let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
595 .bind(name)
596 .fetch_optional(&self.pool)
597 .await
598 .map_err(PostgresCheckpointError::DatabaseError)?;
599
600 match row {
601 Some(row) => {
602 let position: Uuid = row.get("last_position");
603 Ok(Some(StreamPosition::new(position)))
604 }
605 None => Ok(None),
606 }
607 }
608
609 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
610 let position_uuid: Uuid = position.into_inner();
611 let _ = query(
612 "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
613 VALUES ($1, $2, NOW())
614 ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
615 )
616 .bind(name)
617 .bind(position_uuid)
618 .execute(&self.pool)
619 .await
620 .map_err(PostgresCheckpointError::DatabaseError)?;
621
622 Ok(())
623 }
624}
625
626#[derive(Debug, Error)]
632pub enum CoordinationError {
633 #[error(
635 "leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
636 )]
637 LeadershipNotAcquired { subscription_name: String },
638
639 #[error("database operation failed: {0}")]
641 DatabaseError(#[source] sqlx::Error),
642}
643
644pub struct CoordinationGuard {
674 lock_key: i64,
675 connection: Option<sqlx::pool::PoolConnection<Postgres>>,
678}
679
680impl std::fmt::Debug for CoordinationGuard {
681 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
682 f.debug_struct("CoordinationGuard")
683 .field("lock_key", &self.lock_key)
684 .finish_non_exhaustive()
685 }
686}
687
688impl Drop for CoordinationGuard {
689 fn drop(&mut self) {
690 if let Some(mut connection) = self.connection.take() {
692 let lock_key = self.lock_key;
693
694 let handle = tokio::runtime::Handle::current();
697 let is_multi_thread =
698 handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread;
699
700 if is_multi_thread {
701 tokio::task::block_in_place(|| {
703 handle.block_on(async {
704 if let Err(e) = query("SELECT pg_advisory_unlock($1)")
706 .bind(lock_key)
707 .execute(&mut *connection)
708 .await
709 {
710 warn!(
711 lock_key = lock_key,
712 error = %e,
713 "failed to release advisory lock on drop"
714 );
715 }
716 });
718 });
719 } else {
720 drop(tokio::spawn(async move {
725 if let Err(e) = query("SELECT pg_advisory_unlock($1)")
726 .bind(lock_key)
727 .execute(&mut *connection)
728 .await
729 {
730 warn!(
731 lock_key = lock_key,
732 error = %e,
733 "failed to release advisory lock on drop (async)"
734 );
735 }
736 }));
737 }
738 }
739 }
740}
741
742#[derive(Debug, Clone)]
748pub struct PostgresProjectorCoordinator {
749 pool: Pool<Postgres>,
750}
751
752impl PostgresProjectorCoordinator {
753 pub async fn new<S: Into<String>>(connection_string: S) -> Result<Self, CoordinationError> {
755 Self::with_config(connection_string, PostgresConfig::default()).await
756 }
757
758 pub async fn with_config<S: Into<String>>(
760 connection_string: S,
761 config: PostgresConfig,
762 ) -> Result<Self, CoordinationError> {
763 let connection_string = connection_string.into();
764 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
765 let pool = PgPoolOptions::new()
766 .max_connections(max_connections.get())
767 .acquire_timeout(config.acquire_timeout)
768 .idle_timeout(config.idle_timeout)
769 .connect(&connection_string)
770 .await
771 .map_err(CoordinationError::DatabaseError)?;
772
773 Ok(Self { pool })
774 }
775
776 pub fn from_pool(pool: Pool<Postgres>) -> Self {
778 Self { pool }
779 }
780}
781
782fn advisory_lock_key(subscription_name: &str) -> i64 {
787 const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325;
788 const FNV_PRIME: u64 = 0x00000100000001B3;
789
790 let mut hash = FNV_OFFSET_BASIS;
791 for byte in subscription_name.as_bytes() {
792 hash ^= *byte as u64;
793 hash = hash.wrapping_mul(FNV_PRIME);
794 }
795 hash as i64
796}
797
798async fn try_acquire_advisory_lock(
801 pool: &Pool<Postgres>,
802 subscription_name: &str,
803) -> Result<CoordinationGuard, CoordinationError> {
804 let lock_key = advisory_lock_key(subscription_name);
805
806 let mut connection = pool
810 .acquire()
811 .await
812 .map_err(CoordinationError::DatabaseError)?;
813
814 let row = query("SELECT pg_try_advisory_lock($1)")
816 .bind(lock_key)
817 .fetch_one(&mut *connection)
818 .await
819 .map_err(CoordinationError::DatabaseError)?;
820
821 let acquired: bool = row.get(0);
822
823 if acquired {
824 Ok(CoordinationGuard {
825 lock_key,
826 connection: Some(connection),
827 })
828 } else {
829 Err(CoordinationError::LeadershipNotAcquired {
831 subscription_name: subscription_name.to_string(),
832 })
833 }
834}
835
836impl ProjectorCoordinator for PostgresProjectorCoordinator {
837 type Error = CoordinationError;
838 type Guard = CoordinationGuard;
839
840 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
841 try_acquire_advisory_lock(&self.pool, subscription_name).await
842 }
843}
844
845impl ProjectorCoordinator for PostgresEventStore {
846 type Error = CoordinationError;
847 type Guard = CoordinationGuard;
848
849 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
850 try_acquire_advisory_lock(&self.pool, subscription_name).await
851 }
852}
853
854#[cfg(test)]
855mod tests {
856 use super::{glob_to_anchored_regex, regex_escape};
857
858 #[test]
859 fn star_translates_to_dot_star_anchored() {
860 assert_eq!(glob_to_anchored_regex("account-*"), "^account-.*$");
862 }
863
864 #[test]
865 fn question_mark_translates_to_dot() {
866 assert_eq!(glob_to_anchored_regex("account-?"), "^account-.$");
867 }
868
869 #[test]
870 fn character_class_is_preserved() {
871 assert_eq!(
872 glob_to_anchored_regex("account-[0-9]*"),
873 "^account-[0-9].*$"
874 );
875 }
876
877 #[test]
878 fn negated_character_class_uses_caret() {
879 assert_eq!(glob_to_anchored_regex("account-[!0-9]"), "^account-[^0-9]$");
880 }
881
882 #[test]
883 fn literal_regex_metacharacters_are_escaped() {
884 assert_eq!(glob_to_anchored_regex("a.c+(d)"), "^a\\.c\\+\\(d\\)$");
887 }
888
889 #[test]
890 fn regex_escape_escapes_all_metacharacters() {
891 assert_eq!(
892 regex_escape(".^$*+?()[]{}|\\"),
893 "\\.\\^\\$\\*\\+\\?\\(\\)\\[\\]\\{\\}\\|\\\\"
894 );
895 }
896}