1use std::time::Duration;
24
25use eventcore_types::{
26 CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
27 EventStream, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
28 StreamVersion, StreamWrites,
29};
30use futures::StreamExt;
31use nutype::nutype;
32use serde_json::value::RawValue;
33use serde_json::{Value, json};
34use sqlx::types::Json;
35use sqlx::{Pool, Postgres, QueryBuilder, Row, postgres::PgPoolOptions, query};
36use thiserror::Error;
37use tracing::{error, info, instrument, warn};
38use uuid::Uuid;
39
40#[derive(Debug, Error)]
42pub enum PostgresEventStoreError {
43 #[error("failed to create postgres connection pool")]
45 ConnectionFailed(#[source] sqlx::Error),
46}
47
48#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, Display, AsRef, Into))]
67pub struct MaxConnections(std::num::NonZeroU32);
68
69#[derive(Debug, Clone)]
71pub struct PostgresConfig {
72 pub max_connections: MaxConnections,
74 pub acquire_timeout: Duration,
76 pub idle_timeout: Duration,
78}
79
80impl Default for PostgresConfig {
81 fn default() -> Self {
82 const DEFAULT_MAX_CONNECTIONS: std::num::NonZeroU32 = match std::num::NonZeroU32::new(10) {
83 Some(v) => v,
84 None => unreachable!(),
85 };
86
87 Self {
88 max_connections: MaxConnections::new(DEFAULT_MAX_CONNECTIONS),
89 acquire_timeout: Duration::from_secs(30),
90 idle_timeout: Duration::from_secs(600), }
92 }
93}
94
95#[derive(Debug, Clone)]
103pub struct PostgresEventStore {
104 pool: Pool<Postgres>,
105}
106
107impl PostgresEventStore {
108 pub async fn new<S: Into<String>>(
110 connection_string: S,
111 ) -> Result<Self, PostgresEventStoreError> {
112 Self::with_config(connection_string, PostgresConfig::default()).await
113 }
114
115 pub async fn with_config<S: Into<String>>(
117 connection_string: S,
118 config: PostgresConfig,
119 ) -> Result<Self, PostgresEventStoreError> {
120 let connection_string = connection_string.into();
121 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
122 let pool = PgPoolOptions::new()
123 .max_connections(max_connections.get())
124 .acquire_timeout(config.acquire_timeout)
125 .idle_timeout(config.idle_timeout)
126 .connect(&connection_string)
127 .await
128 .map_err(PostgresEventStoreError::ConnectionFailed)?;
129 Ok(Self { pool })
130 }
131
132 pub fn from_pool(pool: Pool<Postgres>) -> Self {
137 Self { pool }
138 }
139
140 #[cfg_attr(test, mutants::skip)] pub async fn ping(&self) {
147 let _ = query("SELECT 1")
148 .execute(&self.pool)
149 .await
150 .expect("postgres ping failed");
151 }
152
153 #[cfg_attr(test, mutants::skip)] pub async fn migrate(&self) {
163 sqlx::migrate!("./migrations")
164 .run(&self.pool)
165 .await
166 .expect("postgres migration failed");
167 }
168}
169
170impl EventStore for PostgresEventStore {
171 #[instrument(name = "postgres.read_stream", skip(self))]
172 async fn read_stream<E: Event>(
173 &self,
174 stream_id: StreamId,
175 ) -> Result<EventStream<E>, EventStoreError> {
176 info!(
177 stream = %stream_id,
178 "[postgres.read_stream] reading events from postgres"
179 );
180
181 let pool = self.pool.clone();
187
188 let stream = async_stream::stream! {
189 let mut rows = query(
190 "SELECT event_data FROM eventcore_events WHERE stream_id = $1 ORDER BY stream_version ASC",
191 )
192 .bind(stream_id.as_ref())
193 .fetch(&pool);
194
195 while let Some(row) = rows.next().await {
196 let row = match row {
197 Ok(row) => row,
198 Err(error) => {
199 yield Err(map_sqlx_error(error, Operation::ReadStream));
200 break;
201 }
202 };
203
204 let payload: Value = match row.try_get("event_data") {
205 Ok(payload) => payload,
206 Err(error) => {
207 yield Err(map_sqlx_error(error, Operation::ReadStream));
208 break;
209 }
210 };
211
212 match serde_json::from_value::<E>(payload) {
216 Ok(event) => yield Ok(event),
217 Err(error) => {
218 yield Err(EventStoreError::DeserializationFailed {
219 stream_id: stream_id.clone(),
220 detail: error.to_string(),
221 });
222 break;
223 }
224 }
225 }
226 };
227
228 Ok(EventStream::new(stream))
229 }
230
231 #[instrument(name = "postgres.append_events", skip(self, writes))]
232 async fn append_events(
233 &self,
234 writes: StreamWrites,
235 ) -> Result<EventStreamSlice, EventStoreError> {
236 let expected_versions = writes.expected_versions().clone();
237 let entries = writes.into_entries();
238
239 if entries.is_empty() {
240 return Ok(EventStreamSlice);
241 }
242
243 info!(
244 stream_count = expected_versions.len(),
245 event_count = entries.len(),
246 "[postgres.append_events] appending events to postgres"
247 );
248
249 let expected_versions_json: Value = expected_versions
251 .iter()
252 .map(|(stream_id, version)| {
253 (stream_id.as_ref().to_string(), json!(version.into_inner()))
254 })
255 .collect();
256
257 let mut tx = self
258 .pool
259 .begin()
260 .await
261 .map_err(|error| map_sqlx_error(error, Operation::BeginTransaction))?;
262
263 let _ = query("SELECT set_config('eventcore.expected_versions', $1, true)")
265 .bind(expected_versions_json.to_string())
266 .execute(&mut *tx)
267 .await
268 .map_err(|error| map_sqlx_error(error, Operation::SetExpectedVersions))?;
269
270 let rows: Vec<(StreamId, &'static str, Box<RawValue>)> = entries
280 .into_iter()
281 .map(|entry| (entry.stream_id, entry.event_type, entry.event_data))
282 .collect();
283
284 const MAX_EVENTS_PER_INSERT: usize = 1000;
285 for chunk in rows.chunks(MAX_EVENTS_PER_INSERT) {
286 let mut builder = QueryBuilder::<Postgres>::new(
287 "INSERT INTO eventcore_events (event_id, stream_id, event_type, event_data, metadata) ",
288 );
289 let _ = builder.push_values(chunk, |mut row, (stream_id, event_type, event_data)| {
290 let _ = row
291 .push_bind(Uuid::now_v7())
292 .push_bind(stream_id.as_ref())
293 .push_bind(*event_type)
294 .push_bind(Json(event_data))
295 .push_bind(Json(json!({})));
296 });
297 let _ = builder
298 .build()
299 .execute(&mut *tx)
300 .await
301 .map_err(|error| map_sqlx_error(error, Operation::AppendEvents))?;
302 }
303
304 tx.commit()
305 .await
306 .map_err(|error| map_sqlx_error(error, Operation::CommitTransaction))?;
307
308 Ok(EventStreamSlice)
309 }
310}
311
312impl CheckpointStore for PostgresEventStore {
313 type Error = PostgresCheckpointError;
314
315 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
316 let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
317 .bind(name)
318 .fetch_optional(&self.pool)
319 .await
320 .map_err(PostgresCheckpointError::DatabaseError)?;
321
322 match row {
323 Some(row) => {
324 let position: Uuid = row.get("last_position");
325 Ok(Some(StreamPosition::new(position)))
326 }
327 None => Ok(None),
328 }
329 }
330
331 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
332 let position_uuid: Uuid = position.into_inner();
333 let _ = query(
334 "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
335 VALUES ($1, $2, NOW())
336 ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
337 )
338 .bind(name)
339 .bind(position_uuid)
340 .execute(&self.pool)
341 .await
342 .map_err(PostgresCheckpointError::DatabaseError)?;
343
344 Ok(())
345 }
346}
347
348impl EventReader for PostgresEventStore {
349 type Error = EventStoreError;
350
351 async fn read_events<E: Event>(
352 &self,
353 filter: EventFilter,
354 page: EventPage,
355 ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
356 let after_event_id: Option<Uuid> = page.after_position().map(|p| p.into_inner());
359 let limit: i64 = page.limit().into_inner() as i64;
360
361 let type_filter = filter.event_type().unwrap_or_else(|| E::event_type_name());
365
366 let pattern_regex = filter
371 .stream_pattern()
372 .map(|p| glob_to_anchored_regex(p.as_ref()));
373
374 let mut builder = QueryBuilder::<Postgres>::new(
378 "SELECT event_id, event_data, stream_id FROM eventcore_events WHERE event_type = ",
379 );
380 let _ = builder.push_bind(type_filter);
381
382 if let Some(after_id) = after_event_id {
383 let _ = builder.push(" AND event_id > ").push_bind(after_id);
384 }
385
386 if let Some(prefix) = filter.stream_prefix() {
387 let _ = builder
388 .push(" AND stream_id LIKE ")
389 .push_bind(prefix.as_ref().to_string())
390 .push(" || '%'");
391 } else if let Some(regex) = pattern_regex {
392 let _ = builder.push(" AND stream_id ~ ").push_bind(regex);
393 }
394
395 let _ = builder.push(" ORDER BY event_id LIMIT ").push_bind(limit);
396
397 let rows = builder
398 .build()
399 .fetch_all(&self.pool)
400 .await
401 .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
402
403 let events: Vec<(E, StreamPosition)> = rows
404 .into_iter()
405 .filter_map(|row| {
406 let event_data: Json<Value> = row.get("event_data");
407 let event_id: Uuid = row.get("event_id");
408 serde_json::from_value::<E>(event_data.0)
409 .ok()
410 .map(|e| (e, StreamPosition::new(event_id)))
411 })
412 .collect();
413
414 Ok(events)
415 }
416}
417
418fn glob_to_anchored_regex(glob: &str) -> String {
434 let mut regex = String::with_capacity(glob.len() + 2);
435 regex.push('^');
436
437 let mut chars = glob.chars().peekable();
438 while let Some(c) = chars.next() {
439 match c {
440 '*' => regex.push_str(".*"),
441 '?' => regex.push('.'),
442 '[' => {
443 let mut class = String::new();
445 let mut closed = false;
446 if matches!(chars.peek(), Some('!')) {
447 let _ = chars.next();
448 class.push('^');
449 }
450 for inner in chars.by_ref() {
451 if inner == ']' {
452 closed = true;
453 break;
454 }
455 class.push(inner);
456 }
457 if closed {
458 regex.push('[');
459 regex.push_str(&class);
460 regex.push(']');
461 } else {
462 regex.push_str(®ex_escape("["));
465 regex.push_str(®ex_escape(&class));
466 }
467 }
468 other => regex.push_str(®ex_escape(&other.to_string())),
469 }
470 }
471
472 regex.push('$');
473 regex
474}
475
476fn regex_escape(literal: &str) -> String {
479 const METACHARACTERS: &[char] = &[
480 '.', '^', '$', '*', '+', '?', '(', ')', '[', ']', '{', '}', '|', '\\',
481 ];
482 let mut escaped = String::with_capacity(literal.len());
483 for c in literal.chars() {
484 if METACHARACTERS.contains(&c) {
485 escaped.push('\\');
486 }
487 escaped.push(c);
488 }
489 escaped
490}
491
492fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError {
493 if let sqlx::Error::Database(db_error) = &error {
494 let code = db_error.code();
495 let code_str = code.as_deref();
496 if code_str == Some("P0001") || code_str == Some("23505") {
499 warn!(
500 error = %db_error,
501 "[postgres.version_conflict] optimistic concurrency check failed"
502 );
503 return parse_version_conflict_from_db_error(db_error.message());
504 }
505 }
506
507 error!(
508 error = %error,
509 operation = %operation,
510 "[postgres.database_error] database operation failed"
511 );
512 EventStoreError::StoreFailure { operation }
513}
514
515fn parse_version_conflict_from_db_error(message: &str) -> EventStoreError {
523 if let Some(parsed) = try_parse_conflict_message(message) {
525 return parsed;
526 }
527
528 let fallback_stream_id =
531 StreamId::try_new("unknown-conflict-stream").expect("static stream id is valid");
532 EventStoreError::VersionConflict {
533 stream_id: fallback_stream_id,
534 expected: StreamVersion::new(0),
535 actual: StreamVersion::new(0),
536 }
537}
538
539fn try_parse_conflict_message(message: &str) -> Option<EventStoreError> {
540 let rest = message.strip_prefix("version_conflict: stream \"")?;
541 let stream_end = rest.find('"')?;
542 let stream_id_str = &rest[..stream_end];
543 let after_stream = &rest[stream_end..];
544
545 let expected_str = after_stream
546 .strip_prefix("\" expected version ")?
547 .split(',')
548 .next()?;
549 let actual_str = after_stream.rsplit("actual ").next()?;
550
551 let expected = expected_str.trim().parse::<usize>().ok()?;
552 let actual = actual_str.trim().parse::<usize>().ok()?;
553 let stream_id = StreamId::try_new(stream_id_str).ok()?;
554
555 Some(EventStoreError::VersionConflict {
556 stream_id,
557 expected: StreamVersion::new(expected),
558 actual: StreamVersion::new(actual),
559 })
560}
561
562#[derive(Debug, Error)]
564pub enum PostgresCheckpointError {
565 #[error("failed to create postgres connection pool")]
567 ConnectionFailed(#[source] sqlx::Error),
568
569 #[error("database operation failed: {0}")]
571 DatabaseError(#[source] sqlx::Error),
572}
573
574#[derive(Debug, Clone)]
587pub struct PostgresCheckpointStore {
588 pool: Pool<Postgres>,
589}
590
591impl PostgresCheckpointStore {
592 pub async fn new<S: Into<String>>(
594 connection_string: S,
595 ) -> Result<Self, PostgresCheckpointError> {
596 Self::with_config(connection_string, PostgresConfig::default()).await
597 }
598
599 pub async fn with_config<S: Into<String>>(
601 connection_string: S,
602 config: PostgresConfig,
603 ) -> Result<Self, PostgresCheckpointError> {
604 let connection_string = connection_string.into();
605 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
606 let pool = PgPoolOptions::new()
607 .max_connections(max_connections.get())
608 .acquire_timeout(config.acquire_timeout)
609 .idle_timeout(config.idle_timeout)
610 .connect(&connection_string)
611 .await
612 .map_err(PostgresCheckpointError::ConnectionFailed)?;
613
614 sqlx::migrate!("./migrations")
616 .run(&pool)
617 .await
618 .map_err(|e| {
619 PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e)))
620 })?;
621
622 Ok(Self { pool })
623 }
624
625 pub fn from_pool(pool: Pool<Postgres>) -> Self {
630 Self { pool }
631 }
632}
633
634impl CheckpointStore for PostgresCheckpointStore {
635 type Error = PostgresCheckpointError;
636
637 async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
638 let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
639 .bind(name)
640 .fetch_optional(&self.pool)
641 .await
642 .map_err(PostgresCheckpointError::DatabaseError)?;
643
644 match row {
645 Some(row) => {
646 let position: Uuid = row.get("last_position");
647 Ok(Some(StreamPosition::new(position)))
648 }
649 None => Ok(None),
650 }
651 }
652
653 async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
654 let position_uuid: Uuid = position.into_inner();
655 let _ = query(
656 "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
657 VALUES ($1, $2, NOW())
658 ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
659 )
660 .bind(name)
661 .bind(position_uuid)
662 .execute(&self.pool)
663 .await
664 .map_err(PostgresCheckpointError::DatabaseError)?;
665
666 Ok(())
667 }
668}
669
670#[derive(Debug, Error)]
676pub enum CoordinationError {
677 #[error(
679 "leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
680 )]
681 LeadershipNotAcquired { subscription_name: String },
682
683 #[error("database operation failed: {0}")]
685 DatabaseError(#[source] sqlx::Error),
686}
687
688pub struct CoordinationGuard {
718 lock_key: i64,
719 connection: Option<sqlx::pool::PoolConnection<Postgres>>,
722}
723
724impl std::fmt::Debug for CoordinationGuard {
725 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
726 f.debug_struct("CoordinationGuard")
727 .field("lock_key", &self.lock_key)
728 .finish_non_exhaustive()
729 }
730}
731
732impl Drop for CoordinationGuard {
733 fn drop(&mut self) {
734 if let Some(mut connection) = self.connection.take() {
736 let lock_key = self.lock_key;
737
738 let handle = tokio::runtime::Handle::current();
741 let is_multi_thread =
742 handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread;
743
744 if is_multi_thread {
745 tokio::task::block_in_place(|| {
747 handle.block_on(async {
748 if let Err(e) = query("SELECT pg_advisory_unlock($1)")
750 .bind(lock_key)
751 .execute(&mut *connection)
752 .await
753 {
754 warn!(
755 lock_key = lock_key,
756 error = %e,
757 "failed to release advisory lock on drop"
758 );
759 }
760 });
762 });
763 } else {
764 drop(tokio::spawn(async move {
769 if let Err(e) = query("SELECT pg_advisory_unlock($1)")
770 .bind(lock_key)
771 .execute(&mut *connection)
772 .await
773 {
774 warn!(
775 lock_key = lock_key,
776 error = %e,
777 "failed to release advisory lock on drop (async)"
778 );
779 }
780 }));
781 }
782 }
783 }
784}
785
786#[derive(Debug, Clone)]
792pub struct PostgresProjectorCoordinator {
793 pool: Pool<Postgres>,
794}
795
796impl PostgresProjectorCoordinator {
797 pub async fn new<S: Into<String>>(connection_string: S) -> Result<Self, CoordinationError> {
799 Self::with_config(connection_string, PostgresConfig::default()).await
800 }
801
802 pub async fn with_config<S: Into<String>>(
804 connection_string: S,
805 config: PostgresConfig,
806 ) -> Result<Self, CoordinationError> {
807 let connection_string = connection_string.into();
808 let max_connections: std::num::NonZeroU32 = config.max_connections.into();
809 let pool = PgPoolOptions::new()
810 .max_connections(max_connections.get())
811 .acquire_timeout(config.acquire_timeout)
812 .idle_timeout(config.idle_timeout)
813 .connect(&connection_string)
814 .await
815 .map_err(CoordinationError::DatabaseError)?;
816
817 Ok(Self { pool })
818 }
819
820 pub fn from_pool(pool: Pool<Postgres>) -> Self {
822 Self { pool }
823 }
824}
825
826fn advisory_lock_key(subscription_name: &str) -> i64 {
831 const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325;
832 const FNV_PRIME: u64 = 0x00000100000001B3;
833
834 let mut hash = FNV_OFFSET_BASIS;
835 for byte in subscription_name.as_bytes() {
836 hash ^= *byte as u64;
837 hash = hash.wrapping_mul(FNV_PRIME);
838 }
839 hash as i64
840}
841
842async fn try_acquire_advisory_lock(
845 pool: &Pool<Postgres>,
846 subscription_name: &str,
847) -> Result<CoordinationGuard, CoordinationError> {
848 let lock_key = advisory_lock_key(subscription_name);
849
850 let mut connection = pool
854 .acquire()
855 .await
856 .map_err(CoordinationError::DatabaseError)?;
857
858 let row = query("SELECT pg_try_advisory_lock($1)")
860 .bind(lock_key)
861 .fetch_one(&mut *connection)
862 .await
863 .map_err(CoordinationError::DatabaseError)?;
864
865 let acquired: bool = row.get(0);
866
867 if acquired {
868 Ok(CoordinationGuard {
869 lock_key,
870 connection: Some(connection),
871 })
872 } else {
873 Err(CoordinationError::LeadershipNotAcquired {
875 subscription_name: subscription_name.to_string(),
876 })
877 }
878}
879
880impl ProjectorCoordinator for PostgresProjectorCoordinator {
881 type Error = CoordinationError;
882 type Guard = CoordinationGuard;
883
884 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
885 try_acquire_advisory_lock(&self.pool, subscription_name).await
886 }
887}
888
889impl ProjectorCoordinator for PostgresEventStore {
890 type Error = CoordinationError;
891 type Guard = CoordinationGuard;
892
893 async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
894 try_acquire_advisory_lock(&self.pool, subscription_name).await
895 }
896}
897
898#[cfg(test)]
899mod tests {
900 use super::{glob_to_anchored_regex, regex_escape};
901
902 #[test]
903 fn star_translates_to_dot_star_anchored() {
904 assert_eq!(glob_to_anchored_regex("account-*"), "^account-.*$");
906 }
907
908 #[test]
909 fn question_mark_translates_to_dot() {
910 assert_eq!(glob_to_anchored_regex("account-?"), "^account-.$");
911 }
912
913 #[test]
914 fn character_class_is_preserved() {
915 assert_eq!(
916 glob_to_anchored_regex("account-[0-9]*"),
917 "^account-[0-9].*$"
918 );
919 }
920
921 #[test]
922 fn negated_character_class_uses_caret() {
923 assert_eq!(glob_to_anchored_regex("account-[!0-9]"), "^account-[^0-9]$");
924 }
925
926 #[test]
927 fn literal_regex_metacharacters_are_escaped() {
928 assert_eq!(glob_to_anchored_regex("a.c+(d)"), "^a\\.c\\+\\(d\\)$");
931 }
932
933 #[test]
934 fn regex_escape_escapes_all_metacharacters() {
935 assert_eq!(
936 regex_escape(".^$*+?()[]{}|\\"),
937 "\\.\\^\\$\\*\\+\\?\\(\\)\\[\\]\\{\\}\\|\\\\"
938 );
939 }
940}