Skip to main content

eventcore_postgres/
lib.rs

1use std::time::Duration;
2
3use eventcore_types::{
4    CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
5    EventStream, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
6    StreamVersion, StreamWrites,
7};
8use futures::StreamExt;
9use nutype::nutype;
10use serde_json::value::RawValue;
11use serde_json::{Value, json};
12use sqlx::types::Json;
13use sqlx::{Pool, Postgres, QueryBuilder, Row, postgres::PgPoolOptions, query};
14use thiserror::Error;
15use tracing::{error, info, instrument, warn};
16use uuid::Uuid;
17
18#[derive(Debug, Error)]
19pub enum PostgresEventStoreError {
20    #[error("failed to create postgres connection pool")]
21    ConnectionFailed(#[source] sqlx::Error),
22}
23
24/// Maximum number of database connections in the pool.
25///
26/// MaxConnections represents the connection pool size limit. It must be at least 1,
27/// enforced by using NonZeroU32 as the underlying type.
28///
29/// # Examples
30///
31/// ```ignore
32/// use eventcore_postgres::MaxConnections;
33/// use std::num::NonZeroU32;
34///
35/// let small_pool = MaxConnections::new(NonZeroU32::new(5).expect("5 is non-zero"));
36/// let standard = MaxConnections::new(NonZeroU32::new(10).expect("10 is non-zero"));
37/// let large_pool = MaxConnections::new(NonZeroU32::new(50).expect("50 is non-zero"));
38///
39/// // Zero connections not allowed by type system
40/// // let zero = NonZeroU32::new(0); // Returns None
41/// ```
42#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, Display, AsRef, Into))]
43pub struct MaxConnections(std::num::NonZeroU32);
44
45/// Configuration for PostgresEventStore connection pool.
46#[derive(Debug, Clone)]
47pub struct PostgresConfig {
48    /// Maximum number of connections in the pool (default: 10)
49    pub max_connections: MaxConnections,
50    /// Timeout for acquiring a connection from the pool (default: 30 seconds)
51    pub acquire_timeout: Duration,
52    /// Idle timeout for connections in the pool (default: 10 minutes)
53    pub idle_timeout: Duration,
54}
55
56impl Default for PostgresConfig {
57    fn default() -> Self {
58        const DEFAULT_MAX_CONNECTIONS: std::num::NonZeroU32 = match std::num::NonZeroU32::new(10) {
59            Some(v) => v,
60            None => unreachable!(),
61        };
62
63        Self {
64            max_connections: MaxConnections::new(DEFAULT_MAX_CONNECTIONS),
65            acquire_timeout: Duration::from_secs(30),
66            idle_timeout: Duration::from_secs(600), // 10 minutes
67        }
68    }
69}
70
71#[derive(Debug, Clone)]
72pub struct PostgresEventStore {
73    pool: Pool<Postgres>,
74}
75
76impl PostgresEventStore {
77    /// Create a new PostgresEventStore with default configuration.
78    pub async fn new<S: Into<String>>(
79        connection_string: S,
80    ) -> Result<Self, PostgresEventStoreError> {
81        Self::with_config(connection_string, PostgresConfig::default()).await
82    }
83
84    /// Create a new PostgresEventStore with custom configuration.
85    pub async fn with_config<S: Into<String>>(
86        connection_string: S,
87        config: PostgresConfig,
88    ) -> Result<Self, PostgresEventStoreError> {
89        let connection_string = connection_string.into();
90        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
91        let pool = PgPoolOptions::new()
92            .max_connections(max_connections.get())
93            .acquire_timeout(config.acquire_timeout)
94            .idle_timeout(config.idle_timeout)
95            .connect(&connection_string)
96            .await
97            .map_err(PostgresEventStoreError::ConnectionFailed)?;
98        Ok(Self { pool })
99    }
100
101    /// Create a PostgresEventStore from an existing connection pool.
102    ///
103    /// Use this when you need full control over pool configuration or want to
104    /// share a pool across multiple components.
105    pub fn from_pool(pool: Pool<Postgres>) -> Self {
106        Self { pool }
107    }
108
109    #[cfg_attr(test, mutants::skip)] // infallible: panics on failure
110    pub async fn ping(&self) {
111        let _ = query("SELECT 1")
112            .execute(&self.pool)
113            .await
114            .expect("postgres ping failed");
115    }
116
117    #[cfg_attr(test, mutants::skip)] // infallible: panics on failure
118    pub async fn migrate(&self) {
119        sqlx::migrate!("./migrations")
120            .run(&self.pool)
121            .await
122            .expect("postgres migration failed");
123    }
124}
125
126impl EventStore for PostgresEventStore {
127    #[instrument(name = "postgres.read_stream", skip(self))]
128    async fn read_stream<E: Event>(
129        &self,
130        stream_id: StreamId,
131    ) -> Result<EventStream<E>, EventStoreError> {
132        info!(
133            stream = %stream_id,
134            "[postgres.read_stream] reading events from postgres"
135        );
136
137        // Clone the pool (an `Arc` internally) so the returned stream owns its
138        // connection handle and can be `'static`. The query uses sqlx's lazy
139        // `fetch`, which pulls rows from the database incrementally rather than
140        // buffering the entire result set with `fetch_all` — the real memory
141        // win behind #364 for large streams.
142        let pool = self.pool.clone();
143
144        let stream = async_stream::stream! {
145            let mut rows = query(
146                "SELECT event_data FROM eventcore_events WHERE stream_id = $1 ORDER BY stream_version ASC",
147            )
148            .bind(stream_id.as_ref())
149            .fetch(&pool);
150
151            while let Some(row) = rows.next().await {
152                let row = match row {
153                    Ok(row) => row,
154                    Err(error) => {
155                        yield Err(map_sqlx_error(error, Operation::ReadStream));
156                        break;
157                    }
158                };
159
160                let payload: Value = match row.try_get("event_data") {
161                    Ok(payload) => payload,
162                    Err(error) => {
163                        yield Err(map_sqlx_error(error, Operation::ReadStream));
164                        break;
165                    }
166                };
167
168                // A per-row decode failure surfaces as an `Err` item, matching
169                // the previous behavior where a type mismatch failed the read
170                // (see the read_stream_errors_on_type_mismatch contract test).
171                match serde_json::from_value::<E>(payload) {
172                    Ok(event) => yield Ok(event),
173                    Err(error) => {
174                        yield Err(EventStoreError::DeserializationFailed {
175                            stream_id: stream_id.clone(),
176                            detail: error.to_string(),
177                        });
178                        break;
179                    }
180                }
181            }
182        };
183
184        Ok(EventStream::new(stream))
185    }
186
187    #[instrument(name = "postgres.append_events", skip(self, writes))]
188    async fn append_events(
189        &self,
190        writes: StreamWrites,
191    ) -> Result<EventStreamSlice, EventStoreError> {
192        let expected_versions = writes.expected_versions().clone();
193        let entries = writes.into_entries();
194
195        if entries.is_empty() {
196            return Ok(EventStreamSlice);
197        }
198
199        info!(
200            stream_count = expected_versions.len(),
201            event_count = entries.len(),
202            "[postgres.append_events] appending events to postgres"
203        );
204
205        // Build expected versions JSON for the trigger
206        let expected_versions_json: Value = expected_versions
207            .iter()
208            .map(|(stream_id, version)| {
209                (stream_id.as_ref().to_string(), json!(version.into_inner()))
210            })
211            .collect();
212
213        let mut tx = self
214            .pool
215            .begin()
216            .await
217            .map_err(|error| map_sqlx_error(error, Operation::BeginTransaction))?;
218
219        // Set expected versions in session config for trigger validation
220        let _ = query("SELECT set_config('eventcore.expected_versions', $1, true)")
221            .bind(expected_versions_json.to_string())
222            .execute(&mut *tx)
223            .await
224            .map_err(|error| map_sqlx_error(error, Operation::SetExpectedVersions))?;
225
226        // Insert all events with a single multi-row INSERT per chunk. The
227        // BEFORE INSERT trigger still assigns gap-free stream versions and
228        // enforces optimistic concurrency for each row in VALUES order,
229        // exactly as it did for the previous per-event loop — replacing N
230        // round-trips with one statement. Chunking keeps the bound-parameter
231        // count well under Postgres' 65535-parameter limit (5 binds/event).
232        // Drop the type-erased event payload (only the in-memory store needs
233        // it) and keep just the Send + Sync fields used for SQL binding, so the
234        // borrows held by the insert loop stay Send across the awaits.
235        let rows: Vec<(StreamId, &'static str, Box<RawValue>)> = entries
236            .into_iter()
237            .map(|entry| (entry.stream_id, entry.event_type, entry.event_data))
238            .collect();
239
240        const MAX_EVENTS_PER_INSERT: usize = 1000;
241        for chunk in rows.chunks(MAX_EVENTS_PER_INSERT) {
242            let mut builder = QueryBuilder::<Postgres>::new(
243                "INSERT INTO eventcore_events (event_id, stream_id, event_type, event_data, metadata) ",
244            );
245            let _ = builder.push_values(chunk, |mut row, (stream_id, event_type, event_data)| {
246                let _ = row
247                    .push_bind(Uuid::now_v7())
248                    .push_bind(stream_id.as_ref())
249                    .push_bind(*event_type)
250                    .push_bind(Json(event_data))
251                    .push_bind(Json(json!({})));
252            });
253            let _ = builder
254                .build()
255                .execute(&mut *tx)
256                .await
257                .map_err(|error| map_sqlx_error(error, Operation::AppendEvents))?;
258        }
259
260        tx.commit()
261            .await
262            .map_err(|error| map_sqlx_error(error, Operation::CommitTransaction))?;
263
264        Ok(EventStreamSlice)
265    }
266}
267
268impl CheckpointStore for PostgresEventStore {
269    type Error = PostgresCheckpointError;
270
271    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
272        let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
273            .bind(name)
274            .fetch_optional(&self.pool)
275            .await
276            .map_err(PostgresCheckpointError::DatabaseError)?;
277
278        match row {
279            Some(row) => {
280                let position: Uuid = row.get("last_position");
281                Ok(Some(StreamPosition::new(position)))
282            }
283            None => Ok(None),
284        }
285    }
286
287    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
288        let position_uuid: Uuid = position.into_inner();
289        let _ = query(
290            "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
291             VALUES ($1, $2, NOW())
292             ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
293        )
294        .bind(name)
295        .bind(position_uuid)
296        .execute(&self.pool)
297        .await
298        .map_err(PostgresCheckpointError::DatabaseError)?;
299
300        Ok(())
301    }
302}
303
304impl EventReader for PostgresEventStore {
305    type Error = EventStoreError;
306
307    async fn read_events<E: Event>(
308        &self,
309        filter: EventFilter,
310        page: EventPage,
311    ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
312        // Query events ordered by event_id (UUID7, monotonically increasing).
313        // Use event_id directly as the global position - no need for ROW_NUMBER.
314        let after_event_id: Option<Uuid> = page.after_position().map(|p| p.into_inner());
315        let limit: i64 = page.limit().into_inner() as i64;
316
317        // Filter by event_type in SQL so non-matching types don't consume
318        // batch slots (fixes issue #372). Use explicit filter if set,
319        // otherwise derive from E::event_type_name().
320        let type_filter = filter.event_type().unwrap_or_else(|| E::event_type_name());
321
322        // Glob pattern pushdown translates the pattern to an anchored POSIX
323        // regex matched with the `~` operator (ADR-0047). The translation
324        // escapes all regex metacharacters in literal segments so user input
325        // cannot inject regex syntax.
326        let pattern_regex = filter
327            .stream_pattern()
328            .map(|p| glob_to_anchored_regex(p.as_ref()));
329
330        // Build the query dynamically so prefix XOR pattern, the optional
331        // cursor, and the event_type predicate compose without a combinatorial
332        // explosion of hand-written query strings.
333        let mut builder = QueryBuilder::<Postgres>::new(
334            "SELECT event_id, event_data, stream_id FROM eventcore_events WHERE event_type = ",
335        );
336        let _ = builder.push_bind(type_filter);
337
338        if let Some(after_id) = after_event_id {
339            let _ = builder.push(" AND event_id > ").push_bind(after_id);
340        }
341
342        if let Some(prefix) = filter.stream_prefix() {
343            let _ = builder
344                .push(" AND stream_id LIKE ")
345                .push_bind(prefix.as_ref().to_string())
346                .push(" || '%'");
347        } else if let Some(regex) = pattern_regex {
348            let _ = builder.push(" AND stream_id ~ ").push_bind(regex);
349        }
350
351        let _ = builder.push(" ORDER BY event_id LIMIT ").push_bind(limit);
352
353        let rows = builder
354            .build()
355            .fetch_all(&self.pool)
356            .await
357            .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
358
359        let events: Vec<(E, StreamPosition)> = rows
360            .into_iter()
361            .filter_map(|row| {
362                let event_data: Json<Value> = row.get("event_data");
363                let event_id: Uuid = row.get("event_id");
364                serde_json::from_value::<E>(event_data.0)
365                    .ok()
366                    .map(|e| (e, StreamPosition::new(event_id)))
367            })
368            .collect();
369
370        Ok(events)
371    }
372}
373
374/// Translate a POSIX glob pattern into an anchored POSIX regular expression
375/// suitable for PostgreSQL's `~` operator (ADR-0047).
376///
377/// Mapping:
378/// - `*` → `.*` (matches any sequence, including the `/` separator)
379/// - `?` → `.` (matches exactly one character)
380/// - `[...]` / `[!...]` → a regex character class (`[!` is normalized to the
381///   regex negation `[^`); the class contents are copied verbatim so ranges
382///   like `[0-9]` and `[a-z]` work, while a closing `]` ends the class
383/// - every other character is a literal and is regex-escaped
384///
385/// The result is anchored with `^...$` so the whole stream ID must match,
386/// mirroring `glob::Pattern::matches`. Because the pattern has already been
387/// validated as a compilable `glob::Pattern`, brackets are balanced; a stray
388/// `[` (impossible for a valid pattern) is treated as a literal.
389fn glob_to_anchored_regex(glob: &str) -> String {
390    let mut regex = String::with_capacity(glob.len() + 2);
391    regex.push('^');
392
393    let mut chars = glob.chars().peekable();
394    while let Some(c) = chars.next() {
395        match c {
396            '*' => regex.push_str(".*"),
397            '?' => regex.push('.'),
398            '[' => {
399                // Collect the bracket expression up to the closing ']'.
400                let mut class = String::new();
401                let mut closed = false;
402                if matches!(chars.peek(), Some('!')) {
403                    let _ = chars.next();
404                    class.push('^');
405                }
406                for inner in chars.by_ref() {
407                    if inner == ']' {
408                        closed = true;
409                        break;
410                    }
411                    class.push(inner);
412                }
413                if closed {
414                    regex.push('[');
415                    regex.push_str(&class);
416                    regex.push(']');
417                } else {
418                    // Not a real class (a validated glob never reaches here);
419                    // treat the '[' and collected chars as literals.
420                    regex.push_str(&regex_escape("["));
421                    regex.push_str(&regex_escape(&class));
422                }
423            }
424            other => regex.push_str(&regex_escape(&other.to_string())),
425        }
426    }
427
428    regex.push('$');
429    regex
430}
431
432/// Escape every POSIX-regex metacharacter in a literal segment so it matches
433/// itself, preventing regex injection from stream-pattern input.
434fn regex_escape(literal: &str) -> String {
435    const METACHARACTERS: &[char] = &[
436        '.', '^', '$', '*', '+', '?', '(', ')', '[', ']', '{', '}', '|', '\\',
437    ];
438    let mut escaped = String::with_capacity(literal.len());
439    for c in literal.chars() {
440        if METACHARACTERS.contains(&c) {
441            escaped.push('\\');
442        }
443        escaped.push(c);
444    }
445    escaped
446}
447
448fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError {
449    if let sqlx::Error::Database(db_error) = &error {
450        let code = db_error.code();
451        let code_str = code.as_deref();
452        // P0001: Custom error from trigger (version_conflict)
453        // 23505: Unique constraint violation (fallback for version conflict)
454        if code_str == Some("P0001") || code_str == Some("23505") {
455            warn!(
456                error = %db_error,
457                "[postgres.version_conflict] optimistic concurrency check failed"
458            );
459            return parse_version_conflict_from_db_error(db_error.message());
460        }
461    }
462
463    error!(
464        error = %error,
465        operation = %operation,
466        "[postgres.database_error] database operation failed"
467    );
468    EventStoreError::StoreFailure { operation }
469}
470
471/// Parse version conflict details from the PostgreSQL trigger error message.
472///
473/// The trigger produces messages like:
474///   `version_conflict: stream "my-stream" expected version 0, actual 1`
475///
476/// If parsing fails, falls back to a VersionConflict with a sentinel stream_id
477/// indicating the details could not be extracted.
478fn parse_version_conflict_from_db_error(message: &str) -> EventStoreError {
479    // Pattern: version_conflict: stream "STREAM_ID" expected version EXPECTED, actual ACTUAL
480    if let Some(parsed) = try_parse_conflict_message(message) {
481        return parsed;
482    }
483
484    // Fallback: unique constraint violation (23505) or unparseable trigger message.
485    // Use a sentinel stream_id since we don't have the details.
486    let fallback_stream_id =
487        StreamId::try_new("unknown-conflict-stream").expect("static stream id is valid");
488    EventStoreError::VersionConflict {
489        stream_id: fallback_stream_id,
490        expected: StreamVersion::new(0),
491        actual: StreamVersion::new(0),
492    }
493}
494
495fn try_parse_conflict_message(message: &str) -> Option<EventStoreError> {
496    let rest = message.strip_prefix("version_conflict: stream \"")?;
497    let stream_end = rest.find('"')?;
498    let stream_id_str = &rest[..stream_end];
499    let after_stream = &rest[stream_end..];
500
501    let expected_str = after_stream
502        .strip_prefix("\" expected version ")?
503        .split(',')
504        .next()?;
505    let actual_str = after_stream.rsplit("actual ").next()?;
506
507    let expected = expected_str.trim().parse::<usize>().ok()?;
508    let actual = actual_str.trim().parse::<usize>().ok()?;
509    let stream_id = StreamId::try_new(stream_id_str).ok()?;
510
511    Some(EventStoreError::VersionConflict {
512        stream_id,
513        expected: StreamVersion::new(expected),
514        actual: StreamVersion::new(actual),
515    })
516}
517
518/// Error type for PostgresCheckpointStore operations.
519#[derive(Debug, Error)]
520pub enum PostgresCheckpointError {
521    /// Failed to create connection pool.
522    #[error("failed to create postgres connection pool")]
523    ConnectionFailed(#[source] sqlx::Error),
524
525    /// Database operation failed.
526    #[error("database operation failed: {0}")]
527    DatabaseError(#[source] sqlx::Error),
528}
529
530/// Postgres-backed checkpoint store for tracking projection progress.
531///
532/// `PostgresCheckpointStore` stores checkpoint positions in a PostgreSQL table,
533/// providing durability across process restarts. It implements the `CheckpointStore`
534/// trait from eventcore-types.
535///
536/// # Schema
537///
538/// The store uses the `eventcore_subscription_versions` table with:
539/// - `subscription_name`: Unique identifier for each projector/subscription
540/// - `last_position`: UUID7 representing the global stream position
541/// - `updated_at`: Timestamp of the last checkpoint update
542#[derive(Debug, Clone)]
543pub struct PostgresCheckpointStore {
544    pool: Pool<Postgres>,
545}
546
547impl PostgresCheckpointStore {
548    /// Create a new PostgresCheckpointStore with default configuration.
549    pub async fn new<S: Into<String>>(
550        connection_string: S,
551    ) -> Result<Self, PostgresCheckpointError> {
552        Self::with_config(connection_string, PostgresConfig::default()).await
553    }
554
555    /// Create a new PostgresCheckpointStore with custom configuration.
556    pub async fn with_config<S: Into<String>>(
557        connection_string: S,
558        config: PostgresConfig,
559    ) -> Result<Self, PostgresCheckpointError> {
560        let connection_string = connection_string.into();
561        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
562        let pool = PgPoolOptions::new()
563            .max_connections(max_connections.get())
564            .acquire_timeout(config.acquire_timeout)
565            .idle_timeout(config.idle_timeout)
566            .connect(&connection_string)
567            .await
568            .map_err(PostgresCheckpointError::ConnectionFailed)?;
569
570        // Run migrations to ensure table exists
571        sqlx::migrate!("./migrations")
572            .run(&pool)
573            .await
574            .map_err(|e| {
575                PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e)))
576            })?;
577
578        Ok(Self { pool })
579    }
580
581    /// Create a PostgresCheckpointStore from an existing connection pool.
582    ///
583    /// Use this when you need full control over pool configuration or want to
584    /// share a pool across multiple components.
585    pub fn from_pool(pool: Pool<Postgres>) -> Self {
586        Self { pool }
587    }
588}
589
590impl CheckpointStore for PostgresCheckpointStore {
591    type Error = PostgresCheckpointError;
592
593    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
594        let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
595            .bind(name)
596            .fetch_optional(&self.pool)
597            .await
598            .map_err(PostgresCheckpointError::DatabaseError)?;
599
600        match row {
601            Some(row) => {
602                let position: Uuid = row.get("last_position");
603                Ok(Some(StreamPosition::new(position)))
604            }
605            None => Ok(None),
606        }
607    }
608
609    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
610        let position_uuid: Uuid = position.into_inner();
611        let _ = query(
612            "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
613             VALUES ($1, $2, NOW())
614             ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
615        )
616        .bind(name)
617        .bind(position_uuid)
618        .execute(&self.pool)
619        .await
620        .map_err(PostgresCheckpointError::DatabaseError)?;
621
622        Ok(())
623    }
624}
625
626// ============================================================================
627// PostgresProjectorCoordinator - Distributed projector coordination via Postgres
628// ============================================================================
629
630/// Error type for projector coordination operations.
631#[derive(Debug, Error)]
632pub enum CoordinationError {
633    /// Leadership could not be acquired (another instance holds the lock).
634    #[error(
635        "leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
636    )]
637    LeadershipNotAcquired { subscription_name: String },
638
639    /// Database operation failed.
640    #[error("database operation failed: {0}")]
641    DatabaseError(#[source] sqlx::Error),
642}
643
644/// Guard type that releases leadership when dropped.
645///
646/// Holds the advisory lock key and the actual database connection that acquired
647/// the lock. This is critical because PostgreSQL advisory locks are session-scoped:
648/// the unlock must happen on the same connection that acquired the lock.
649///
650/// # Lock Release Behavior
651///
652/// The guard attempts to explicitly release the advisory lock when dropped:
653///
654/// - **Multi-threaded runtime**: Uses `block_in_place` to synchronously release
655///   the lock before the guard is fully dropped.
656///
657/// - **Single-threaded runtime**: Spawns a task to release the lock asynchronously.
658///   This task may not execute before process shutdown, in which case the lock is
659///   released when the PostgreSQL session ends (connection closes).
660///
661/// # PostgreSQL Session-Scoped Locks
662///
663/// PostgreSQL advisory locks acquired with `pg_try_advisory_lock` are session-scoped
664/// and automatically released when the database connection closes. This provides a
665/// safety net: even if explicit unlock fails or is skipped, the lock will be released
666/// when:
667/// - The connection is returned to the pool and recycled
668/// - The connection pool is shut down
669/// - The database connection times out
670///
671/// For production deployments, configure appropriate connection pool idle timeouts
672/// to ensure timely lock release on ungraceful shutdown.
673pub struct CoordinationGuard {
674    lock_key: i64,
675    /// The actual connection that holds the advisory lock.
676    /// Must be Option so we can take ownership in Drop.
677    connection: Option<sqlx::pool::PoolConnection<Postgres>>,
678}
679
680impl std::fmt::Debug for CoordinationGuard {
681    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
682        f.debug_struct("CoordinationGuard")
683            .field("lock_key", &self.lock_key)
684            .finish_non_exhaustive()
685    }
686}
687
688impl Drop for CoordinationGuard {
689    fn drop(&mut self) {
690        // Take ownership of the connection - we need the same connection that acquired the lock
691        if let Some(mut connection) = self.connection.take() {
692            let lock_key = self.lock_key;
693
694            // Check runtime flavor to determine the appropriate unlock strategy.
695            // block_in_place panics on single-threaded runtimes, so we must check first.
696            let handle = tokio::runtime::Handle::current();
697            let is_multi_thread =
698                handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread;
699
700            if is_multi_thread {
701                // Multi-threaded runtime: use block_in_place for synchronous unlock
702                tokio::task::block_in_place(|| {
703                    handle.block_on(async {
704                        // Unlock on the SAME connection that acquired the lock
705                        if let Err(e) = query("SELECT pg_advisory_unlock($1)")
706                            .bind(lock_key)
707                            .execute(&mut *connection)
708                            .await
709                        {
710                            warn!(
711                                lock_key = lock_key,
712                                error = %e,
713                                "failed to release advisory lock on drop"
714                            );
715                        }
716                        // Connection is returned to pool when dropped here
717                    });
718                });
719            } else {
720                // Single-threaded runtime: spawn a task for async unlock.
721                // Note: This task may not execute before process shutdown. In that case,
722                // the advisory lock is released when the PostgreSQL session ends (the
723                // connection closes). See struct-level documentation for details.
724                drop(tokio::spawn(async move {
725                    if let Err(e) = query("SELECT pg_advisory_unlock($1)")
726                        .bind(lock_key)
727                        .execute(&mut *connection)
728                        .await
729                    {
730                        warn!(
731                            lock_key = lock_key,
732                            error = %e,
733                            "failed to release advisory lock on drop (async)"
734                        );
735                    }
736                }));
737            }
738        }
739    }
740}
741
742/// Postgres-backed projector coordinator for distributed leadership.
743///
744/// `PostgresProjectorCoordinator` uses PostgreSQL advisory locks to ensure
745/// only one projector instance processes events for a given subscription
746/// at a time, preventing duplicate processing in distributed deployments.
747#[derive(Debug, Clone)]
748pub struct PostgresProjectorCoordinator {
749    pool: Pool<Postgres>,
750}
751
752impl PostgresProjectorCoordinator {
753    /// Create a new PostgresProjectorCoordinator with default configuration.
754    pub async fn new<S: Into<String>>(connection_string: S) -> Result<Self, CoordinationError> {
755        Self::with_config(connection_string, PostgresConfig::default()).await
756    }
757
758    /// Create a new PostgresProjectorCoordinator with custom configuration.
759    pub async fn with_config<S: Into<String>>(
760        connection_string: S,
761        config: PostgresConfig,
762    ) -> Result<Self, CoordinationError> {
763        let connection_string = connection_string.into();
764        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
765        let pool = PgPoolOptions::new()
766            .max_connections(max_connections.get())
767            .acquire_timeout(config.acquire_timeout)
768            .idle_timeout(config.idle_timeout)
769            .connect(&connection_string)
770            .await
771            .map_err(CoordinationError::DatabaseError)?;
772
773        Ok(Self { pool })
774    }
775
776    /// Create a PostgresProjectorCoordinator from an existing connection pool.
777    pub fn from_pool(pool: Pool<Postgres>) -> Self {
778        Self { pool }
779    }
780}
781
782/// Compute a stable FNV-1a hash of the subscription name to derive an advisory lock key.
783///
784/// This uses the FNV-1a algorithm (64-bit) which produces deterministic output
785/// across Rust versions, unlike `DefaultHasher` which is explicitly not stable.
786fn advisory_lock_key(subscription_name: &str) -> i64 {
787    const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325;
788    const FNV_PRIME: u64 = 0x00000100000001B3;
789
790    let mut hash = FNV_OFFSET_BASIS;
791    for byte in subscription_name.as_bytes() {
792        hash ^= *byte as u64;
793        hash = hash.wrapping_mul(FNV_PRIME);
794    }
795    hash as i64
796}
797
798/// Try to acquire a PostgreSQL advisory lock for the given subscription name
799/// using the provided connection pool.
800async fn try_acquire_advisory_lock(
801    pool: &Pool<Postgres>,
802    subscription_name: &str,
803) -> Result<CoordinationGuard, CoordinationError> {
804    let lock_key = advisory_lock_key(subscription_name);
805
806    // Acquire a dedicated connection from the pool.
807    // This connection MUST be kept for the lifetime of the guard because
808    // PostgreSQL advisory locks are session-scoped.
809    let mut connection = pool
810        .acquire()
811        .await
812        .map_err(CoordinationError::DatabaseError)?;
813
814    // Attempt to acquire advisory lock (non-blocking) on this specific connection
815    let row = query("SELECT pg_try_advisory_lock($1)")
816        .bind(lock_key)
817        .fetch_one(&mut *connection)
818        .await
819        .map_err(CoordinationError::DatabaseError)?;
820
821    let acquired: bool = row.get(0);
822
823    if acquired {
824        Ok(CoordinationGuard {
825            lock_key,
826            connection: Some(connection),
827        })
828    } else {
829        // Lock not acquired - connection will be returned to pool here
830        Err(CoordinationError::LeadershipNotAcquired {
831            subscription_name: subscription_name.to_string(),
832        })
833    }
834}
835
836impl ProjectorCoordinator for PostgresProjectorCoordinator {
837    type Error = CoordinationError;
838    type Guard = CoordinationGuard;
839
840    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
841        try_acquire_advisory_lock(&self.pool, subscription_name).await
842    }
843}
844
845impl ProjectorCoordinator for PostgresEventStore {
846    type Error = CoordinationError;
847    type Guard = CoordinationGuard;
848
849    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
850        try_acquire_advisory_lock(&self.pool, subscription_name).await
851    }
852}
853
854#[cfg(test)]
855mod tests {
856    use super::{glob_to_anchored_regex, regex_escape};
857
858    #[test]
859    fn star_translates_to_dot_star_anchored() {
860        // '-' is not a regex metacharacter outside a class, so it stays literal.
861        assert_eq!(glob_to_anchored_regex("account-*"), "^account-.*$");
862    }
863
864    #[test]
865    fn question_mark_translates_to_dot() {
866        assert_eq!(glob_to_anchored_regex("account-?"), "^account-.$");
867    }
868
869    #[test]
870    fn character_class_is_preserved() {
871        assert_eq!(
872            glob_to_anchored_regex("account-[0-9]*"),
873            "^account-[0-9].*$"
874        );
875    }
876
877    #[test]
878    fn negated_character_class_uses_caret() {
879        assert_eq!(glob_to_anchored_regex("account-[!0-9]"), "^account-[^0-9]$");
880    }
881
882    #[test]
883    fn literal_regex_metacharacters_are_escaped() {
884        // A literal '.' in the glob must not become a regex wildcard, and other
885        // regex metacharacters must be escaped to prevent injection.
886        assert_eq!(glob_to_anchored_regex("a.c+(d)"), "^a\\.c\\+\\(d\\)$");
887    }
888
889    #[test]
890    fn regex_escape_escapes_all_metacharacters() {
891        assert_eq!(
892            regex_escape(".^$*+?()[]{}|\\"),
893            "\\.\\^\\$\\*\\+\\?\\(\\)\\[\\]\\{\\}\\|\\\\"
894        );
895    }
896}