Skip to main content

eventcore_postgres/
lib.rs

1//! PostgreSQL event store backend for EventCore.
2//!
3//! This crate provides a production-grade [`PostgresEventStore`] backed by PostgreSQL,
4//! with ACID transactions, advisory locks, and connection pooling via `sqlx`.
5//!
6//! Event immutability is enforced by PostgreSQL triggers. Stream pattern matching
7//! follows the conventions described in ADR-0047.
8//!
9//! `PostgresEventStore` implements [`EventStore`], [`EventReader`], [`CheckpointStore`],
10//! and [`ProjectorCoordinator`] from `eventcore-types`.
11//!
12//! # Getting Started
13//!
14//! ```no_run
15//! use eventcore_postgres::PostgresEventStore;
16//!
17//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
18//! let store = PostgresEventStore::new("postgres://user:pass@localhost/mydb").await?;
19//! # Ok(())
20//! # }
21//! ```
22
23use std::time::Duration;
24
25use eventcore_types::{
26    CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
27    EventStream, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
28    StreamVersion, StreamWrites,
29};
30use futures::StreamExt;
31use nutype::nutype;
32use serde_json::value::RawValue;
33use serde_json::{Value, json};
34use sqlx::types::Json;
35use sqlx::{Pool, Postgres, QueryBuilder, Row, postgres::PgPoolOptions, query};
36use thiserror::Error;
37use tracing::{error, info, instrument, warn};
38use uuid::Uuid;
39
40/// Errors that can occur when creating a [`PostgresEventStore`].
41#[derive(Debug, Error)]
42pub enum PostgresEventStoreError {
43    /// Failed to create the postgres connection pool.
44    #[error("failed to create postgres connection pool")]
45    ConnectionFailed(#[source] sqlx::Error),
46}
47
48/// Maximum number of database connections in the pool.
49///
50/// MaxConnections represents the connection pool size limit. It must be at least 1,
51/// enforced by using NonZeroU32 as the underlying type.
52///
53/// # Examples
54///
55/// ```no_run
56/// use eventcore_postgres::MaxConnections;
57/// use std::num::NonZeroU32;
58///
59/// let small_pool = MaxConnections::new(NonZeroU32::new(5).expect("5 is non-zero"));
60/// let standard = MaxConnections::new(NonZeroU32::new(10).expect("10 is non-zero"));
61/// let large_pool = MaxConnections::new(NonZeroU32::new(50).expect("50 is non-zero"));
62///
63/// // Zero connections not allowed by type system
64/// // let zero = NonZeroU32::new(0); // Returns None
65/// ```
66#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, Display, AsRef, Into))]
67pub struct MaxConnections(std::num::NonZeroU32);
68
69/// Configuration for PostgresEventStore connection pool.
70#[derive(Debug, Clone)]
71pub struct PostgresConfig {
72    /// Maximum number of connections in the pool (default: 10)
73    pub max_connections: MaxConnections,
74    /// Timeout for acquiring a connection from the pool (default: 30 seconds)
75    pub acquire_timeout: Duration,
76    /// Idle timeout for connections in the pool (default: 10 minutes)
77    pub idle_timeout: Duration,
78}
79
80impl Default for PostgresConfig {
81    fn default() -> Self {
82        const DEFAULT_MAX_CONNECTIONS: std::num::NonZeroU32 = match std::num::NonZeroU32::new(10) {
83            Some(v) => v,
84            None => unreachable!(),
85        };
86
87        Self {
88            max_connections: MaxConnections::new(DEFAULT_MAX_CONNECTIONS),
89            acquire_timeout: Duration::from_secs(30),
90            idle_timeout: Duration::from_secs(600), // 10 minutes
91        }
92    }
93}
94
95/// PostgreSQL-backed event store implementing EventCore's storage traits.
96///
97/// Provides atomic multi-stream event writes with optimistic concurrency control,
98/// advisory locks for projector coordination, and connection pooling.
99///
100/// Use [`PostgresEventStore::new`] (or [`PostgresEventStore::with_config`]) with a
101/// PostgreSQL connection URL to create an instance.
102#[derive(Debug, Clone)]
103pub struct PostgresEventStore {
104    pool: Pool<Postgres>,
105}
106
107impl PostgresEventStore {
108    /// Create a new PostgresEventStore with default configuration.
109    pub async fn new<S: Into<String>>(
110        connection_string: S,
111    ) -> Result<Self, PostgresEventStoreError> {
112        Self::with_config(connection_string, PostgresConfig::default()).await
113    }
114
115    /// Create a new PostgresEventStore with custom configuration.
116    pub async fn with_config<S: Into<String>>(
117        connection_string: S,
118        config: PostgresConfig,
119    ) -> Result<Self, PostgresEventStoreError> {
120        let connection_string = connection_string.into();
121        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
122        let pool = PgPoolOptions::new()
123            .max_connections(max_connections.get())
124            .acquire_timeout(config.acquire_timeout)
125            .idle_timeout(config.idle_timeout)
126            .connect(&connection_string)
127            .await
128            .map_err(PostgresEventStoreError::ConnectionFailed)?;
129        Ok(Self { pool })
130    }
131
132    /// Create a PostgresEventStore from an existing connection pool.
133    ///
134    /// Use this when you need full control over pool configuration or want to
135    /// share a pool across multiple components.
136    pub fn from_pool(pool: Pool<Postgres>) -> Self {
137        Self { pool }
138    }
139
140    /// Verify connectivity by issuing a trivial `SELECT 1` query.
141    ///
142    /// # Panics
143    ///
144    /// Panics if the database is unreachable or the query fails.
145    #[cfg_attr(test, mutants::skip)] // infallible: panics on failure
146    pub async fn ping(&self) {
147        let _ = query("SELECT 1")
148            .execute(&self.pool)
149            .await
150            .expect("postgres ping failed");
151    }
152
153    /// Apply the bundled schema migrations via `sqlx::migrate!("./migrations")`.
154    ///
155    /// This creates the tables EventCore requires, including the
156    /// `eventcore_subscription_versions` table used by [`PostgresCheckpointStore`].
157    ///
158    /// # Panics
159    ///
160    /// Panics if applying the migrations fails.
161    #[cfg_attr(test, mutants::skip)] // infallible: panics on failure
162    pub async fn migrate(&self) {
163        sqlx::migrate!("./migrations")
164            .run(&self.pool)
165            .await
166            .expect("postgres migration failed");
167    }
168}
169
170impl EventStore for PostgresEventStore {
171    #[instrument(name = "postgres.read_stream", skip(self))]
172    async fn read_stream<E: Event>(
173        &self,
174        stream_id: StreamId,
175    ) -> Result<EventStream<E>, EventStoreError> {
176        info!(
177            stream = %stream_id,
178            "[postgres.read_stream] reading events from postgres"
179        );
180
181        // Clone the pool (an `Arc` internally) so the returned stream owns its
182        // connection handle and can be `'static`. The query uses sqlx's lazy
183        // `fetch`, which pulls rows from the database incrementally rather than
184        // buffering the entire result set with `fetch_all` — the real memory
185        // win behind #364 for large streams.
186        let pool = self.pool.clone();
187
188        let stream = async_stream::stream! {
189            let mut rows = query(
190                "SELECT event_data FROM eventcore_events WHERE stream_id = $1 ORDER BY stream_version ASC",
191            )
192            .bind(stream_id.as_ref())
193            .fetch(&pool);
194
195            while let Some(row) = rows.next().await {
196                let row = match row {
197                    Ok(row) => row,
198                    Err(error) => {
199                        yield Err(map_sqlx_error(error, Operation::ReadStream));
200                        break;
201                    }
202                };
203
204                let payload: Value = match row.try_get("event_data") {
205                    Ok(payload) => payload,
206                    Err(error) => {
207                        yield Err(map_sqlx_error(error, Operation::ReadStream));
208                        break;
209                    }
210                };
211
212                // A per-row decode failure surfaces as an `Err` item, matching
213                // the previous behavior where a type mismatch failed the read
214                // (see the read_stream_errors_on_type_mismatch contract test).
215                match serde_json::from_value::<E>(payload) {
216                    Ok(event) => yield Ok(event),
217                    Err(error) => {
218                        yield Err(EventStoreError::DeserializationFailed {
219                            stream_id: stream_id.clone(),
220                            detail: error.to_string(),
221                        });
222                        break;
223                    }
224                }
225            }
226        };
227
228        Ok(EventStream::new(stream))
229    }
230
231    #[instrument(name = "postgres.append_events", skip(self, writes))]
232    async fn append_events(
233        &self,
234        writes: StreamWrites,
235    ) -> Result<EventStreamSlice, EventStoreError> {
236        let expected_versions = writes.expected_versions().clone();
237        let entries = writes.into_entries();
238
239        if entries.is_empty() {
240            return Ok(EventStreamSlice);
241        }
242
243        info!(
244            stream_count = expected_versions.len(),
245            event_count = entries.len(),
246            "[postgres.append_events] appending events to postgres"
247        );
248
249        // Build expected versions JSON for the trigger
250        let expected_versions_json: Value = expected_versions
251            .iter()
252            .map(|(stream_id, version)| {
253                (stream_id.as_ref().to_string(), json!(version.into_inner()))
254            })
255            .collect();
256
257        let mut tx = self
258            .pool
259            .begin()
260            .await
261            .map_err(|error| map_sqlx_error(error, Operation::BeginTransaction))?;
262
263        // Set expected versions in session config for trigger validation
264        let _ = query("SELECT set_config('eventcore.expected_versions', $1, true)")
265            .bind(expected_versions_json.to_string())
266            .execute(&mut *tx)
267            .await
268            .map_err(|error| map_sqlx_error(error, Operation::SetExpectedVersions))?;
269
270        // Insert all events with a single multi-row INSERT per chunk. The
271        // BEFORE INSERT trigger still assigns gap-free stream versions and
272        // enforces optimistic concurrency for each row in VALUES order,
273        // exactly as it did for the previous per-event loop — replacing N
274        // round-trips with one statement. Chunking keeps the bound-parameter
275        // count well under Postgres' 65535-parameter limit (5 binds/event).
276        // Drop the type-erased event payload (only the in-memory store needs
277        // it) and keep just the Send + Sync fields used for SQL binding, so the
278        // borrows held by the insert loop stay Send across the awaits.
279        let rows: Vec<(StreamId, &'static str, Box<RawValue>)> = entries
280            .into_iter()
281            .map(|entry| (entry.stream_id, entry.event_type, entry.event_data))
282            .collect();
283
284        const MAX_EVENTS_PER_INSERT: usize = 1000;
285        for chunk in rows.chunks(MAX_EVENTS_PER_INSERT) {
286            let mut builder = QueryBuilder::<Postgres>::new(
287                "INSERT INTO eventcore_events (event_id, stream_id, event_type, event_data, metadata) ",
288            );
289            let _ = builder.push_values(chunk, |mut row, (stream_id, event_type, event_data)| {
290                let _ = row
291                    .push_bind(Uuid::now_v7())
292                    .push_bind(stream_id.as_ref())
293                    .push_bind(*event_type)
294                    .push_bind(Json(event_data))
295                    .push_bind(Json(json!({})));
296            });
297            let _ = builder
298                .build()
299                .execute(&mut *tx)
300                .await
301                .map_err(|error| map_sqlx_error(error, Operation::AppendEvents))?;
302        }
303
304        tx.commit()
305            .await
306            .map_err(|error| map_sqlx_error(error, Operation::CommitTransaction))?;
307
308        Ok(EventStreamSlice)
309    }
310}
311
312impl CheckpointStore for PostgresEventStore {
313    type Error = PostgresCheckpointError;
314
315    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
316        let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
317            .bind(name)
318            .fetch_optional(&self.pool)
319            .await
320            .map_err(PostgresCheckpointError::DatabaseError)?;
321
322        match row {
323            Some(row) => {
324                let position: Uuid = row.get("last_position");
325                Ok(Some(StreamPosition::new(position)))
326            }
327            None => Ok(None),
328        }
329    }
330
331    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
332        let position_uuid: Uuid = position.into_inner();
333        let _ = query(
334            "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
335             VALUES ($1, $2, NOW())
336             ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
337        )
338        .bind(name)
339        .bind(position_uuid)
340        .execute(&self.pool)
341        .await
342        .map_err(PostgresCheckpointError::DatabaseError)?;
343
344        Ok(())
345    }
346}
347
348impl EventReader for PostgresEventStore {
349    type Error = EventStoreError;
350
351    async fn read_events<E: Event>(
352        &self,
353        filter: EventFilter,
354        page: EventPage,
355    ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
356        // Query events ordered by event_id (UUID7, monotonically increasing).
357        // Use event_id directly as the global position - no need for ROW_NUMBER.
358        let after_event_id: Option<Uuid> = page.after_position().map(|p| p.into_inner());
359        let limit: i64 = page.limit().into_inner() as i64;
360
361        // Filter by event_type in SQL so non-matching types don't consume
362        // batch slots (fixes issue #372). Use explicit filter if set,
363        // otherwise derive from E::event_type_name().
364        let type_filter = filter.event_type().unwrap_or_else(|| E::event_type_name());
365
366        // Glob pattern pushdown translates the pattern to an anchored POSIX
367        // regex matched with the `~` operator (ADR-0047). The translation
368        // escapes all regex metacharacters in literal segments so user input
369        // cannot inject regex syntax.
370        let pattern_regex = filter
371            .stream_pattern()
372            .map(|p| glob_to_anchored_regex(p.as_ref()));
373
374        // Build the query dynamically so prefix XOR pattern, the optional
375        // cursor, and the event_type predicate compose without a combinatorial
376        // explosion of hand-written query strings.
377        let mut builder = QueryBuilder::<Postgres>::new(
378            "SELECT event_id, event_data, stream_id FROM eventcore_events WHERE event_type = ",
379        );
380        let _ = builder.push_bind(type_filter);
381
382        if let Some(after_id) = after_event_id {
383            let _ = builder.push(" AND event_id > ").push_bind(after_id);
384        }
385
386        if let Some(prefix) = filter.stream_prefix() {
387            let _ = builder
388                .push(" AND stream_id LIKE ")
389                .push_bind(prefix.as_ref().to_string())
390                .push(" || '%'");
391        } else if let Some(regex) = pattern_regex {
392            let _ = builder.push(" AND stream_id ~ ").push_bind(regex);
393        }
394
395        let _ = builder.push(" ORDER BY event_id LIMIT ").push_bind(limit);
396
397        let rows = builder
398            .build()
399            .fetch_all(&self.pool)
400            .await
401            .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
402
403        let events: Vec<(E, StreamPosition)> = rows
404            .into_iter()
405            .filter_map(|row| {
406                let event_data: Json<Value> = row.get("event_data");
407                let event_id: Uuid = row.get("event_id");
408                serde_json::from_value::<E>(event_data.0)
409                    .ok()
410                    .map(|e| (e, StreamPosition::new(event_id)))
411            })
412            .collect();
413
414        Ok(events)
415    }
416}
417
418/// Translate a POSIX glob pattern into an anchored POSIX regular expression
419/// suitable for PostgreSQL's `~` operator (ADR-0047).
420///
421/// Mapping:
422/// - `*` → `.*` (matches any sequence, including the `/` separator)
423/// - `?` → `.` (matches exactly one character)
424/// - `[...]` / `[!...]` → a regex character class (`[!` is normalized to the
425///   regex negation `[^`); the class contents are copied verbatim so ranges
426///   like `[0-9]` and `[a-z]` work, while a closing `]` ends the class
427/// - every other character is a literal and is regex-escaped
428///
429/// The result is anchored with `^...$` so the whole stream ID must match,
430/// mirroring `glob::Pattern::matches`. Because the pattern has already been
431/// validated as a compilable `glob::Pattern`, brackets are balanced; a stray
432/// `[` (impossible for a valid pattern) is treated as a literal.
433fn glob_to_anchored_regex(glob: &str) -> String {
434    let mut regex = String::with_capacity(glob.len() + 2);
435    regex.push('^');
436
437    let mut chars = glob.chars().peekable();
438    while let Some(c) = chars.next() {
439        match c {
440            '*' => regex.push_str(".*"),
441            '?' => regex.push('.'),
442            '[' => {
443                // Collect the bracket expression up to the closing ']'.
444                let mut class = String::new();
445                let mut closed = false;
446                if matches!(chars.peek(), Some('!')) {
447                    let _ = chars.next();
448                    class.push('^');
449                }
450                for inner in chars.by_ref() {
451                    if inner == ']' {
452                        closed = true;
453                        break;
454                    }
455                    class.push(inner);
456                }
457                if closed {
458                    regex.push('[');
459                    regex.push_str(&class);
460                    regex.push(']');
461                } else {
462                    // Not a real class (a validated glob never reaches here);
463                    // treat the '[' and collected chars as literals.
464                    regex.push_str(&regex_escape("["));
465                    regex.push_str(&regex_escape(&class));
466                }
467            }
468            other => regex.push_str(&regex_escape(&other.to_string())),
469        }
470    }
471
472    regex.push('$');
473    regex
474}
475
476/// Escape every POSIX-regex metacharacter in a literal segment so it matches
477/// itself, preventing regex injection from stream-pattern input.
478fn regex_escape(literal: &str) -> String {
479    const METACHARACTERS: &[char] = &[
480        '.', '^', '$', '*', '+', '?', '(', ')', '[', ']', '{', '}', '|', '\\',
481    ];
482    let mut escaped = String::with_capacity(literal.len());
483    for c in literal.chars() {
484        if METACHARACTERS.contains(&c) {
485            escaped.push('\\');
486        }
487        escaped.push(c);
488    }
489    escaped
490}
491
492fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError {
493    if let sqlx::Error::Database(db_error) = &error {
494        let code = db_error.code();
495        let code_str = code.as_deref();
496        // P0001: Custom error from trigger (version_conflict)
497        // 23505: Unique constraint violation (fallback for version conflict)
498        if code_str == Some("P0001") || code_str == Some("23505") {
499            warn!(
500                error = %db_error,
501                "[postgres.version_conflict] optimistic concurrency check failed"
502            );
503            return parse_version_conflict_from_db_error(db_error.message());
504        }
505    }
506
507    error!(
508        error = %error,
509        operation = %operation,
510        "[postgres.database_error] database operation failed"
511    );
512    EventStoreError::StoreFailure { operation }
513}
514
515/// Parse version conflict details from the PostgreSQL trigger error message.
516///
517/// The trigger produces messages like:
518///   `version_conflict: stream "my-stream" expected version 0, actual 1`
519///
520/// If parsing fails, falls back to a VersionConflict with a sentinel stream_id
521/// indicating the details could not be extracted.
522fn parse_version_conflict_from_db_error(message: &str) -> EventStoreError {
523    // Pattern: version_conflict: stream "STREAM_ID" expected version EXPECTED, actual ACTUAL
524    if let Some(parsed) = try_parse_conflict_message(message) {
525        return parsed;
526    }
527
528    // Fallback: unique constraint violation (23505) or unparseable trigger message.
529    // Use a sentinel stream_id since we don't have the details.
530    let fallback_stream_id =
531        StreamId::try_new("unknown-conflict-stream").expect("static stream id is valid");
532    EventStoreError::VersionConflict {
533        stream_id: fallback_stream_id,
534        expected: StreamVersion::new(0),
535        actual: StreamVersion::new(0),
536    }
537}
538
539fn try_parse_conflict_message(message: &str) -> Option<EventStoreError> {
540    let rest = message.strip_prefix("version_conflict: stream \"")?;
541    let stream_end = rest.find('"')?;
542    let stream_id_str = &rest[..stream_end];
543    let after_stream = &rest[stream_end..];
544
545    let expected_str = after_stream
546        .strip_prefix("\" expected version ")?
547        .split(',')
548        .next()?;
549    let actual_str = after_stream.rsplit("actual ").next()?;
550
551    let expected = expected_str.trim().parse::<usize>().ok()?;
552    let actual = actual_str.trim().parse::<usize>().ok()?;
553    let stream_id = StreamId::try_new(stream_id_str).ok()?;
554
555    Some(EventStoreError::VersionConflict {
556        stream_id,
557        expected: StreamVersion::new(expected),
558        actual: StreamVersion::new(actual),
559    })
560}
561
562/// Error type for PostgresCheckpointStore operations.
563#[derive(Debug, Error)]
564pub enum PostgresCheckpointError {
565    /// Failed to create connection pool.
566    #[error("failed to create postgres connection pool")]
567    ConnectionFailed(#[source] sqlx::Error),
568
569    /// Database operation failed.
570    #[error("database operation failed: {0}")]
571    DatabaseError(#[source] sqlx::Error),
572}
573
574/// Postgres-backed checkpoint store for tracking projection progress.
575///
576/// `PostgresCheckpointStore` stores checkpoint positions in a PostgreSQL table,
577/// providing durability across process restarts. It implements the `CheckpointStore`
578/// trait from eventcore-types.
579///
580/// # Schema
581///
582/// The store uses the `eventcore_subscription_versions` table with:
583/// - `subscription_name`: Unique identifier for each projector/subscription
584/// - `last_position`: UUID7 representing the global stream position
585/// - `updated_at`: Timestamp of the last checkpoint update
586#[derive(Debug, Clone)]
587pub struct PostgresCheckpointStore {
588    pool: Pool<Postgres>,
589}
590
591impl PostgresCheckpointStore {
592    /// Create a new PostgresCheckpointStore with default configuration.
593    pub async fn new<S: Into<String>>(
594        connection_string: S,
595    ) -> Result<Self, PostgresCheckpointError> {
596        Self::with_config(connection_string, PostgresConfig::default()).await
597    }
598
599    /// Create a new PostgresCheckpointStore with custom configuration.
600    pub async fn with_config<S: Into<String>>(
601        connection_string: S,
602        config: PostgresConfig,
603    ) -> Result<Self, PostgresCheckpointError> {
604        let connection_string = connection_string.into();
605        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
606        let pool = PgPoolOptions::new()
607            .max_connections(max_connections.get())
608            .acquire_timeout(config.acquire_timeout)
609            .idle_timeout(config.idle_timeout)
610            .connect(&connection_string)
611            .await
612            .map_err(PostgresCheckpointError::ConnectionFailed)?;
613
614        // Run migrations to ensure table exists
615        sqlx::migrate!("./migrations")
616            .run(&pool)
617            .await
618            .map_err(|e| {
619                PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e)))
620            })?;
621
622        Ok(Self { pool })
623    }
624
625    /// Create a PostgresCheckpointStore from an existing connection pool.
626    ///
627    /// Use this when you need full control over pool configuration or want to
628    /// share a pool across multiple components.
629    pub fn from_pool(pool: Pool<Postgres>) -> Self {
630        Self { pool }
631    }
632}
633
634impl CheckpointStore for PostgresCheckpointStore {
635    type Error = PostgresCheckpointError;
636
637    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
638        let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
639            .bind(name)
640            .fetch_optional(&self.pool)
641            .await
642            .map_err(PostgresCheckpointError::DatabaseError)?;
643
644        match row {
645            Some(row) => {
646                let position: Uuid = row.get("last_position");
647                Ok(Some(StreamPosition::new(position)))
648            }
649            None => Ok(None),
650        }
651    }
652
653    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
654        let position_uuid: Uuid = position.into_inner();
655        let _ = query(
656            "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
657             VALUES ($1, $2, NOW())
658             ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
659        )
660        .bind(name)
661        .bind(position_uuid)
662        .execute(&self.pool)
663        .await
664        .map_err(PostgresCheckpointError::DatabaseError)?;
665
666        Ok(())
667    }
668}
669
670// ============================================================================
671// PostgresProjectorCoordinator - Distributed projector coordination via Postgres
672// ============================================================================
673
674/// Error type for projector coordination operations.
675#[derive(Debug, Error)]
676pub enum CoordinationError {
677    /// Leadership could not be acquired (another instance holds the lock).
678    #[error(
679        "leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
680    )]
681    LeadershipNotAcquired { subscription_name: String },
682
683    /// Database operation failed.
684    #[error("database operation failed: {0}")]
685    DatabaseError(#[source] sqlx::Error),
686}
687
688/// Guard type that releases leadership when dropped.
689///
690/// Holds the advisory lock key and the actual database connection that acquired
691/// the lock. This is critical because PostgreSQL advisory locks are session-scoped:
692/// the unlock must happen on the same connection that acquired the lock.
693///
694/// # Lock Release Behavior
695///
696/// The guard attempts to explicitly release the advisory lock when dropped:
697///
698/// - **Multi-threaded runtime**: Uses `block_in_place` to synchronously release
699///   the lock before the guard is fully dropped.
700///
701/// - **Single-threaded runtime**: Spawns a task to release the lock asynchronously.
702///   This task may not execute before process shutdown, in which case the lock is
703///   released when the PostgreSQL session ends (connection closes).
704///
705/// # PostgreSQL Session-Scoped Locks
706///
707/// PostgreSQL advisory locks acquired with `pg_try_advisory_lock` are session-scoped
708/// and automatically released when the database connection closes. This provides a
709/// safety net: even if explicit unlock fails or is skipped, the lock will be released
710/// when:
711/// - The connection is returned to the pool and recycled
712/// - The connection pool is shut down
713/// - The database connection times out
714///
715/// For production deployments, configure appropriate connection pool idle timeouts
716/// to ensure timely lock release on ungraceful shutdown.
717pub struct CoordinationGuard {
718    lock_key: i64,
719    /// The actual connection that holds the advisory lock.
720    /// Must be Option so we can take ownership in Drop.
721    connection: Option<sqlx::pool::PoolConnection<Postgres>>,
722}
723
724impl std::fmt::Debug for CoordinationGuard {
725    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
726        f.debug_struct("CoordinationGuard")
727            .field("lock_key", &self.lock_key)
728            .finish_non_exhaustive()
729    }
730}
731
732impl Drop for CoordinationGuard {
733    fn drop(&mut self) {
734        // Take ownership of the connection - we need the same connection that acquired the lock
735        if let Some(mut connection) = self.connection.take() {
736            let lock_key = self.lock_key;
737
738            // Check runtime flavor to determine the appropriate unlock strategy.
739            // block_in_place panics on single-threaded runtimes, so we must check first.
740            let handle = tokio::runtime::Handle::current();
741            let is_multi_thread =
742                handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread;
743
744            if is_multi_thread {
745                // Multi-threaded runtime: use block_in_place for synchronous unlock
746                tokio::task::block_in_place(|| {
747                    handle.block_on(async {
748                        // Unlock on the SAME connection that acquired the lock
749                        if let Err(e) = query("SELECT pg_advisory_unlock($1)")
750                            .bind(lock_key)
751                            .execute(&mut *connection)
752                            .await
753                        {
754                            warn!(
755                                lock_key = lock_key,
756                                error = %e,
757                                "failed to release advisory lock on drop"
758                            );
759                        }
760                        // Connection is returned to pool when dropped here
761                    });
762                });
763            } else {
764                // Single-threaded runtime: spawn a task for async unlock.
765                // Note: This task may not execute before process shutdown. In that case,
766                // the advisory lock is released when the PostgreSQL session ends (the
767                // connection closes). See struct-level documentation for details.
768                drop(tokio::spawn(async move {
769                    if let Err(e) = query("SELECT pg_advisory_unlock($1)")
770                        .bind(lock_key)
771                        .execute(&mut *connection)
772                        .await
773                    {
774                        warn!(
775                            lock_key = lock_key,
776                            error = %e,
777                            "failed to release advisory lock on drop (async)"
778                        );
779                    }
780                }));
781            }
782        }
783    }
784}
785
786/// Postgres-backed projector coordinator for distributed leadership.
787///
788/// `PostgresProjectorCoordinator` uses PostgreSQL advisory locks to ensure
789/// only one projector instance processes events for a given subscription
790/// at a time, preventing duplicate processing in distributed deployments.
791#[derive(Debug, Clone)]
792pub struct PostgresProjectorCoordinator {
793    pool: Pool<Postgres>,
794}
795
796impl PostgresProjectorCoordinator {
797    /// Create a new PostgresProjectorCoordinator with default configuration.
798    pub async fn new<S: Into<String>>(connection_string: S) -> Result<Self, CoordinationError> {
799        Self::with_config(connection_string, PostgresConfig::default()).await
800    }
801
802    /// Create a new PostgresProjectorCoordinator with custom configuration.
803    pub async fn with_config<S: Into<String>>(
804        connection_string: S,
805        config: PostgresConfig,
806    ) -> Result<Self, CoordinationError> {
807        let connection_string = connection_string.into();
808        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
809        let pool = PgPoolOptions::new()
810            .max_connections(max_connections.get())
811            .acquire_timeout(config.acquire_timeout)
812            .idle_timeout(config.idle_timeout)
813            .connect(&connection_string)
814            .await
815            .map_err(CoordinationError::DatabaseError)?;
816
817        Ok(Self { pool })
818    }
819
820    /// Create a PostgresProjectorCoordinator from an existing connection pool.
821    pub fn from_pool(pool: Pool<Postgres>) -> Self {
822        Self { pool }
823    }
824}
825
826/// Compute a stable FNV-1a hash of the subscription name to derive an advisory lock key.
827///
828/// This uses the FNV-1a algorithm (64-bit) which produces deterministic output
829/// across Rust versions, unlike `DefaultHasher` which is explicitly not stable.
830fn advisory_lock_key(subscription_name: &str) -> i64 {
831    const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325;
832    const FNV_PRIME: u64 = 0x00000100000001B3;
833
834    let mut hash = FNV_OFFSET_BASIS;
835    for byte in subscription_name.as_bytes() {
836        hash ^= *byte as u64;
837        hash = hash.wrapping_mul(FNV_PRIME);
838    }
839    hash as i64
840}
841
842/// Try to acquire a PostgreSQL advisory lock for the given subscription name
843/// using the provided connection pool.
844async fn try_acquire_advisory_lock(
845    pool: &Pool<Postgres>,
846    subscription_name: &str,
847) -> Result<CoordinationGuard, CoordinationError> {
848    let lock_key = advisory_lock_key(subscription_name);
849
850    // Acquire a dedicated connection from the pool.
851    // This connection MUST be kept for the lifetime of the guard because
852    // PostgreSQL advisory locks are session-scoped.
853    let mut connection = pool
854        .acquire()
855        .await
856        .map_err(CoordinationError::DatabaseError)?;
857
858    // Attempt to acquire advisory lock (non-blocking) on this specific connection
859    let row = query("SELECT pg_try_advisory_lock($1)")
860        .bind(lock_key)
861        .fetch_one(&mut *connection)
862        .await
863        .map_err(CoordinationError::DatabaseError)?;
864
865    let acquired: bool = row.get(0);
866
867    if acquired {
868        Ok(CoordinationGuard {
869            lock_key,
870            connection: Some(connection),
871        })
872    } else {
873        // Lock not acquired - connection will be returned to pool here
874        Err(CoordinationError::LeadershipNotAcquired {
875            subscription_name: subscription_name.to_string(),
876        })
877    }
878}
879
880impl ProjectorCoordinator for PostgresProjectorCoordinator {
881    type Error = CoordinationError;
882    type Guard = CoordinationGuard;
883
884    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
885        try_acquire_advisory_lock(&self.pool, subscription_name).await
886    }
887}
888
889impl ProjectorCoordinator for PostgresEventStore {
890    type Error = CoordinationError;
891    type Guard = CoordinationGuard;
892
893    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
894        try_acquire_advisory_lock(&self.pool, subscription_name).await
895    }
896}
897
898#[cfg(test)]
899mod tests {
900    use super::{glob_to_anchored_regex, regex_escape};
901
902    #[test]
903    fn star_translates_to_dot_star_anchored() {
904        // '-' is not a regex metacharacter outside a class, so it stays literal.
905        assert_eq!(glob_to_anchored_regex("account-*"), "^account-.*$");
906    }
907
908    #[test]
909    fn question_mark_translates_to_dot() {
910        assert_eq!(glob_to_anchored_regex("account-?"), "^account-.$");
911    }
912
913    #[test]
914    fn character_class_is_preserved() {
915        assert_eq!(
916            glob_to_anchored_regex("account-[0-9]*"),
917            "^account-[0-9].*$"
918        );
919    }
920
921    #[test]
922    fn negated_character_class_uses_caret() {
923        assert_eq!(glob_to_anchored_regex("account-[!0-9]"), "^account-[^0-9]$");
924    }
925
926    #[test]
927    fn literal_regex_metacharacters_are_escaped() {
928        // A literal '.' in the glob must not become a regex wildcard, and other
929        // regex metacharacters must be escaped to prevent injection.
930        assert_eq!(glob_to_anchored_regex("a.c+(d)"), "^a\\.c\\+\\(d\\)$");
931    }
932
933    #[test]
934    fn regex_escape_escapes_all_metacharacters() {
935        assert_eq!(
936            regex_escape(".^$*+?()[]{}|\\"),
937            "\\.\\^\\$\\*\\+\\?\\(\\)\\[\\]\\{\\}\\|\\\\"
938        );
939    }
940}