Skip to main content

eventcore_postgres/
lib.rs

1use std::time::Duration;
2
3use eventcore_types::{
4    CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
5    EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
6    StreamVersion, StreamWriteEntry, StreamWrites,
7};
8use nutype::nutype;
9use serde_json::{Value, json};
10use sqlx::types::Json;
11use sqlx::{Pool, Postgres, Row, postgres::PgPoolOptions, query};
12use thiserror::Error;
13use tracing::{error, info, instrument, warn};
14use uuid::Uuid;
15
16#[derive(Debug, Error)]
17pub enum PostgresEventStoreError {
18    #[error("failed to create postgres connection pool")]
19    ConnectionFailed(#[source] sqlx::Error),
20}
21
22/// Maximum number of database connections in the pool.
23///
24/// MaxConnections represents the connection pool size limit. It must be at least 1,
25/// enforced by using NonZeroU32 as the underlying type.
26///
27/// # Examples
28///
29/// ```ignore
30/// use eventcore_postgres::MaxConnections;
31/// use std::num::NonZeroU32;
32///
33/// let small_pool = MaxConnections::new(NonZeroU32::new(5).expect("5 is non-zero"));
34/// let standard = MaxConnections::new(NonZeroU32::new(10).expect("10 is non-zero"));
35/// let large_pool = MaxConnections::new(NonZeroU32::new(50).expect("50 is non-zero"));
36///
37/// // Zero connections not allowed by type system
38/// // let zero = NonZeroU32::new(0); // Returns None
39/// ```
40#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, Display, AsRef, Into))]
41pub struct MaxConnections(std::num::NonZeroU32);
42
43/// Configuration for PostgresEventStore connection pool.
44#[derive(Debug, Clone)]
45pub struct PostgresConfig {
46    /// Maximum number of connections in the pool (default: 10)
47    pub max_connections: MaxConnections,
48    /// Timeout for acquiring a connection from the pool (default: 30 seconds)
49    pub acquire_timeout: Duration,
50    /// Idle timeout for connections in the pool (default: 10 minutes)
51    pub idle_timeout: Duration,
52}
53
54impl Default for PostgresConfig {
55    fn default() -> Self {
56        const DEFAULT_MAX_CONNECTIONS: std::num::NonZeroU32 = match std::num::NonZeroU32::new(10) {
57            Some(v) => v,
58            None => unreachable!(),
59        };
60
61        Self {
62            max_connections: MaxConnections::new(DEFAULT_MAX_CONNECTIONS),
63            acquire_timeout: Duration::from_secs(30),
64            idle_timeout: Duration::from_secs(600), // 10 minutes
65        }
66    }
67}
68
69#[derive(Debug, Clone)]
70pub struct PostgresEventStore {
71    pool: Pool<Postgres>,
72}
73
74impl PostgresEventStore {
75    /// Create a new PostgresEventStore with default configuration.
76    pub async fn new<S: Into<String>>(
77        connection_string: S,
78    ) -> Result<Self, PostgresEventStoreError> {
79        Self::with_config(connection_string, PostgresConfig::default()).await
80    }
81
82    /// Create a new PostgresEventStore with custom configuration.
83    pub async fn with_config<S: Into<String>>(
84        connection_string: S,
85        config: PostgresConfig,
86    ) -> Result<Self, PostgresEventStoreError> {
87        let connection_string = connection_string.into();
88        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
89        let pool = PgPoolOptions::new()
90            .max_connections(max_connections.get())
91            .acquire_timeout(config.acquire_timeout)
92            .idle_timeout(config.idle_timeout)
93            .connect(&connection_string)
94            .await
95            .map_err(PostgresEventStoreError::ConnectionFailed)?;
96        Ok(Self { pool })
97    }
98
99    /// Create a PostgresEventStore from an existing connection pool.
100    ///
101    /// Use this when you need full control over pool configuration or want to
102    /// share a pool across multiple components.
103    pub fn from_pool(pool: Pool<Postgres>) -> Self {
104        Self { pool }
105    }
106
107    #[cfg_attr(test, mutants::skip)] // infallible: panics on failure
108    pub async fn ping(&self) {
109        let _ = query("SELECT 1")
110            .execute(&self.pool)
111            .await
112            .expect("postgres ping failed");
113    }
114
115    #[cfg_attr(test, mutants::skip)] // infallible: panics on failure
116    pub async fn migrate(&self) {
117        sqlx::migrate!("./migrations")
118            .run(&self.pool)
119            .await
120            .expect("postgres migration failed");
121    }
122}
123
124impl EventStore for PostgresEventStore {
125    #[instrument(name = "postgres.read_stream", skip(self))]
126    async fn read_stream<E: Event>(
127        &self,
128        stream_id: StreamId,
129    ) -> Result<EventStreamReader<E>, EventStoreError> {
130        info!(
131            stream = %stream_id,
132            "[postgres.read_stream] reading events from postgres"
133        );
134
135        let rows = query(
136            "SELECT event_data FROM eventcore_events WHERE stream_id = $1 ORDER BY stream_version ASC",
137        )
138        .bind(stream_id.as_ref())
139        .fetch_all(&self.pool)
140        .await
141        .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
142
143        let mut events = Vec::with_capacity(rows.len());
144        for row in rows {
145            let payload: Value = row
146                .try_get("event_data")
147                .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
148            let event = serde_json::from_value(payload).map_err(|error| {
149                EventStoreError::DeserializationFailed {
150                    stream_id: stream_id.clone(),
151                    detail: error.to_string(),
152                }
153            })?;
154            events.push(event);
155        }
156
157        Ok(EventStreamReader::new(events))
158    }
159
160    #[instrument(name = "postgres.append_events", skip(self, writes))]
161    async fn append_events(
162        &self,
163        writes: StreamWrites,
164    ) -> Result<EventStreamSlice, EventStoreError> {
165        let expected_versions = writes.expected_versions().clone();
166        let entries = writes.into_entries();
167
168        if entries.is_empty() {
169            return Ok(EventStreamSlice);
170        }
171
172        info!(
173            stream_count = expected_versions.len(),
174            event_count = entries.len(),
175            "[postgres.append_events] appending events to postgres"
176        );
177
178        // Build expected versions JSON for the trigger
179        let expected_versions_json: Value = expected_versions
180            .iter()
181            .map(|(stream_id, version)| {
182                (stream_id.as_ref().to_string(), json!(version.into_inner()))
183            })
184            .collect();
185
186        let mut tx = self
187            .pool
188            .begin()
189            .await
190            .map_err(|error| map_sqlx_error(error, Operation::BeginTransaction))?;
191
192        // Set expected versions in session config for trigger validation
193        let _ = query("SELECT set_config('eventcore.expected_versions', $1, true)")
194            .bind(expected_versions_json.to_string())
195            .execute(&mut *tx)
196            .await
197            .map_err(|error| map_sqlx_error(error, Operation::SetExpectedVersions))?;
198
199        // Insert all events - trigger handles version assignment and validation
200        for entry in entries {
201            let StreamWriteEntry {
202                stream_id,
203                event_type,
204                event_data,
205                ..
206            } = entry;
207
208            let event_id = Uuid::now_v7();
209            let _ = query(
210                "INSERT INTO eventcore_events (event_id, stream_id, event_type, event_data, metadata)
211                 VALUES ($1, $2, $3, $4, $5)",
212            )
213            .bind(event_id)
214            .bind(stream_id.as_ref())
215            .bind(event_type)
216            .bind(Json(event_data))
217            .bind(Json(json!({})))
218            .execute(&mut *tx)
219            .await
220            .map_err(|error| map_sqlx_error(error, Operation::AppendEvents))?;
221        }
222
223        tx.commit()
224            .await
225            .map_err(|error| map_sqlx_error(error, Operation::CommitTransaction))?;
226
227        Ok(EventStreamSlice)
228    }
229}
230
231impl CheckpointStore for PostgresEventStore {
232    type Error = PostgresCheckpointError;
233
234    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
235        let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
236            .bind(name)
237            .fetch_optional(&self.pool)
238            .await
239            .map_err(PostgresCheckpointError::DatabaseError)?;
240
241        match row {
242            Some(row) => {
243                let position: Uuid = row.get("last_position");
244                Ok(Some(StreamPosition::new(position)))
245            }
246            None => Ok(None),
247        }
248    }
249
250    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
251        let position_uuid: Uuid = position.into_inner();
252        let _ = query(
253            "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
254             VALUES ($1, $2, NOW())
255             ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
256        )
257        .bind(name)
258        .bind(position_uuid)
259        .execute(&self.pool)
260        .await
261        .map_err(PostgresCheckpointError::DatabaseError)?;
262
263        Ok(())
264    }
265}
266
267impl EventReader for PostgresEventStore {
268    type Error = EventStoreError;
269
270    async fn read_events<E: Event>(
271        &self,
272        filter: EventFilter,
273        page: EventPage,
274    ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
275        // Query events ordered by event_id (UUID7, monotonically increasing).
276        // Use event_id directly as the global position - no need for ROW_NUMBER.
277        let after_event_id: Option<Uuid> = page.after_position().map(|p| p.into_inner());
278        let limit: i64 = page.limit().into_inner() as i64;
279
280        // Filter by event_type in SQL so non-matching types don't consume
281        // batch slots (fixes issue #372). Use explicit filter if set,
282        // otherwise derive from E::event_type_name().
283        let type_filter = filter.event_type().unwrap_or_else(|| E::event_type_name());
284
285        let rows = if let Some(prefix) = filter.stream_prefix() {
286            let prefix_str = prefix.as_ref();
287
288            if let Some(after_id) = after_event_id {
289                let query_str = r#"
290                    SELECT event_id, event_data, stream_id
291                    FROM eventcore_events
292                    WHERE event_type = $1
293                      AND event_id > $2
294                      AND stream_id LIKE $3 || '%'
295                    ORDER BY event_id
296                    LIMIT $4
297                "#;
298                query(query_str)
299                    .bind(type_filter)
300                    .bind(after_id)
301                    .bind(prefix_str)
302                    .bind(limit)
303                    .fetch_all(&self.pool)
304                    .await
305            } else {
306                let query_str = r#"
307                    SELECT event_id, event_data, stream_id
308                    FROM eventcore_events
309                    WHERE event_type = $1
310                      AND stream_id LIKE $2 || '%'
311                    ORDER BY event_id
312                    LIMIT $3
313                "#;
314                query(query_str)
315                    .bind(type_filter)
316                    .bind(prefix_str)
317                    .bind(limit)
318                    .fetch_all(&self.pool)
319                    .await
320            }
321        } else if let Some(after_id) = after_event_id {
322            let query_str = r#"
323                SELECT event_id, event_data, stream_id
324                FROM eventcore_events
325                WHERE event_type = $1
326                  AND event_id > $2
327                ORDER BY event_id
328                LIMIT $3
329            "#;
330            query(query_str)
331                .bind(type_filter)
332                .bind(after_id)
333                .bind(limit)
334                .fetch_all(&self.pool)
335                .await
336        } else {
337            let query_str = r#"
338                SELECT event_id, event_data, stream_id
339                FROM eventcore_events
340                WHERE event_type = $1
341                ORDER BY event_id
342                LIMIT $2
343            "#;
344            query(query_str)
345                .bind(type_filter)
346                .bind(limit)
347                .fetch_all(&self.pool)
348                .await
349        }
350        .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
351
352        let events: Vec<(E, StreamPosition)> = rows
353            .into_iter()
354            .filter_map(|row| {
355                let event_data: Json<Value> = row.get("event_data");
356                let event_id: Uuid = row.get("event_id");
357                serde_json::from_value::<E>(event_data.0)
358                    .ok()
359                    .map(|e| (e, StreamPosition::new(event_id)))
360            })
361            .collect();
362
363        Ok(events)
364    }
365}
366
367fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError {
368    if let sqlx::Error::Database(db_error) = &error {
369        let code = db_error.code();
370        let code_str = code.as_deref();
371        // P0001: Custom error from trigger (version_conflict)
372        // 23505: Unique constraint violation (fallback for version conflict)
373        if code_str == Some("P0001") || code_str == Some("23505") {
374            warn!(
375                error = %db_error,
376                "[postgres.version_conflict] optimistic concurrency check failed"
377            );
378            return parse_version_conflict_from_db_error(db_error.message());
379        }
380    }
381
382    error!(
383        error = %error,
384        operation = %operation,
385        "[postgres.database_error] database operation failed"
386    );
387    EventStoreError::StoreFailure { operation }
388}
389
390/// Parse version conflict details from the PostgreSQL trigger error message.
391///
392/// The trigger produces messages like:
393///   `version_conflict: stream "my-stream" expected version 0, actual 1`
394///
395/// If parsing fails, falls back to a VersionConflict with a sentinel stream_id
396/// indicating the details could not be extracted.
397fn parse_version_conflict_from_db_error(message: &str) -> EventStoreError {
398    // Pattern: version_conflict: stream "STREAM_ID" expected version EXPECTED, actual ACTUAL
399    if let Some(parsed) = try_parse_conflict_message(message) {
400        return parsed;
401    }
402
403    // Fallback: unique constraint violation (23505) or unparseable trigger message.
404    // Use a sentinel stream_id since we don't have the details.
405    let fallback_stream_id =
406        StreamId::try_new("unknown-conflict-stream").expect("static stream id is valid");
407    EventStoreError::VersionConflict {
408        stream_id: fallback_stream_id,
409        expected: StreamVersion::new(0),
410        actual: StreamVersion::new(0),
411    }
412}
413
414fn try_parse_conflict_message(message: &str) -> Option<EventStoreError> {
415    let rest = message.strip_prefix("version_conflict: stream \"")?;
416    let stream_end = rest.find('"')?;
417    let stream_id_str = &rest[..stream_end];
418    let after_stream = &rest[stream_end..];
419
420    let expected_str = after_stream
421        .strip_prefix("\" expected version ")?
422        .split(',')
423        .next()?;
424    let actual_str = after_stream.rsplit("actual ").next()?;
425
426    let expected = expected_str.trim().parse::<usize>().ok()?;
427    let actual = actual_str.trim().parse::<usize>().ok()?;
428    let stream_id = StreamId::try_new(stream_id_str).ok()?;
429
430    Some(EventStoreError::VersionConflict {
431        stream_id,
432        expected: StreamVersion::new(expected),
433        actual: StreamVersion::new(actual),
434    })
435}
436
437/// Error type for PostgresCheckpointStore operations.
438#[derive(Debug, Error)]
439pub enum PostgresCheckpointError {
440    /// Failed to create connection pool.
441    #[error("failed to create postgres connection pool")]
442    ConnectionFailed(#[source] sqlx::Error),
443
444    /// Database operation failed.
445    #[error("database operation failed: {0}")]
446    DatabaseError(#[source] sqlx::Error),
447}
448
449/// Postgres-backed checkpoint store for tracking projection progress.
450///
451/// `PostgresCheckpointStore` stores checkpoint positions in a PostgreSQL table,
452/// providing durability across process restarts. It implements the `CheckpointStore`
453/// trait from eventcore-types.
454///
455/// # Schema
456///
457/// The store uses the `eventcore_subscription_versions` table with:
458/// - `subscription_name`: Unique identifier for each projector/subscription
459/// - `last_position`: UUID7 representing the global stream position
460/// - `updated_at`: Timestamp of the last checkpoint update
461#[derive(Debug, Clone)]
462pub struct PostgresCheckpointStore {
463    pool: Pool<Postgres>,
464}
465
466impl PostgresCheckpointStore {
467    /// Create a new PostgresCheckpointStore with default configuration.
468    pub async fn new<S: Into<String>>(
469        connection_string: S,
470    ) -> Result<Self, PostgresCheckpointError> {
471        Self::with_config(connection_string, PostgresConfig::default()).await
472    }
473
474    /// Create a new PostgresCheckpointStore with custom configuration.
475    pub async fn with_config<S: Into<String>>(
476        connection_string: S,
477        config: PostgresConfig,
478    ) -> Result<Self, PostgresCheckpointError> {
479        let connection_string = connection_string.into();
480        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
481        let pool = PgPoolOptions::new()
482            .max_connections(max_connections.get())
483            .acquire_timeout(config.acquire_timeout)
484            .idle_timeout(config.idle_timeout)
485            .connect(&connection_string)
486            .await
487            .map_err(PostgresCheckpointError::ConnectionFailed)?;
488
489        // Run migrations to ensure table exists
490        sqlx::migrate!("./migrations")
491            .run(&pool)
492            .await
493            .map_err(|e| {
494                PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e)))
495            })?;
496
497        Ok(Self { pool })
498    }
499
500    /// Create a PostgresCheckpointStore from an existing connection pool.
501    ///
502    /// Use this when you need full control over pool configuration or want to
503    /// share a pool across multiple components.
504    pub fn from_pool(pool: Pool<Postgres>) -> Self {
505        Self { pool }
506    }
507}
508
509impl CheckpointStore for PostgresCheckpointStore {
510    type Error = PostgresCheckpointError;
511
512    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
513        let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
514            .bind(name)
515            .fetch_optional(&self.pool)
516            .await
517            .map_err(PostgresCheckpointError::DatabaseError)?;
518
519        match row {
520            Some(row) => {
521                let position: Uuid = row.get("last_position");
522                Ok(Some(StreamPosition::new(position)))
523            }
524            None => Ok(None),
525        }
526    }
527
528    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
529        let position_uuid: Uuid = position.into_inner();
530        let _ = query(
531            "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
532             VALUES ($1, $2, NOW())
533             ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
534        )
535        .bind(name)
536        .bind(position_uuid)
537        .execute(&self.pool)
538        .await
539        .map_err(PostgresCheckpointError::DatabaseError)?;
540
541        Ok(())
542    }
543}
544
545// ============================================================================
546// PostgresProjectorCoordinator - Distributed projector coordination via Postgres
547// ============================================================================
548
549/// Error type for projector coordination operations.
550#[derive(Debug, Error)]
551pub enum CoordinationError {
552    /// Leadership could not be acquired (another instance holds the lock).
553    #[error(
554        "leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
555    )]
556    LeadershipNotAcquired { subscription_name: String },
557
558    /// Database operation failed.
559    #[error("database operation failed: {0}")]
560    DatabaseError(#[source] sqlx::Error),
561}
562
563/// Guard type that releases leadership when dropped.
564///
565/// Holds the advisory lock key and the actual database connection that acquired
566/// the lock. This is critical because PostgreSQL advisory locks are session-scoped:
567/// the unlock must happen on the same connection that acquired the lock.
568///
569/// # Lock Release Behavior
570///
571/// The guard attempts to explicitly release the advisory lock when dropped:
572///
573/// - **Multi-threaded runtime**: Uses `block_in_place` to synchronously release
574///   the lock before the guard is fully dropped.
575///
576/// - **Single-threaded runtime**: Spawns a task to release the lock asynchronously.
577///   This task may not execute before process shutdown, in which case the lock is
578///   released when the PostgreSQL session ends (connection closes).
579///
580/// # PostgreSQL Session-Scoped Locks
581///
582/// PostgreSQL advisory locks acquired with `pg_try_advisory_lock` are session-scoped
583/// and automatically released when the database connection closes. This provides a
584/// safety net: even if explicit unlock fails or is skipped, the lock will be released
585/// when:
586/// - The connection is returned to the pool and recycled
587/// - The connection pool is shut down
588/// - The database connection times out
589///
590/// For production deployments, configure appropriate connection pool idle timeouts
591/// to ensure timely lock release on ungraceful shutdown.
592pub struct CoordinationGuard {
593    lock_key: i64,
594    /// The actual connection that holds the advisory lock.
595    /// Must be Option so we can take ownership in Drop.
596    connection: Option<sqlx::pool::PoolConnection<Postgres>>,
597}
598
599impl std::fmt::Debug for CoordinationGuard {
600    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601        f.debug_struct("CoordinationGuard")
602            .field("lock_key", &self.lock_key)
603            .finish_non_exhaustive()
604    }
605}
606
607impl Drop for CoordinationGuard {
608    fn drop(&mut self) {
609        // Take ownership of the connection - we need the same connection that acquired the lock
610        if let Some(mut connection) = self.connection.take() {
611            let lock_key = self.lock_key;
612
613            // Check runtime flavor to determine the appropriate unlock strategy.
614            // block_in_place panics on single-threaded runtimes, so we must check first.
615            let handle = tokio::runtime::Handle::current();
616            let is_multi_thread =
617                handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread;
618
619            if is_multi_thread {
620                // Multi-threaded runtime: use block_in_place for synchronous unlock
621                tokio::task::block_in_place(|| {
622                    handle.block_on(async {
623                        // Unlock on the SAME connection that acquired the lock
624                        if let Err(e) = query("SELECT pg_advisory_unlock($1)")
625                            .bind(lock_key)
626                            .execute(&mut *connection)
627                            .await
628                        {
629                            warn!(
630                                lock_key = lock_key,
631                                error = %e,
632                                "failed to release advisory lock on drop"
633                            );
634                        }
635                        // Connection is returned to pool when dropped here
636                    });
637                });
638            } else {
639                // Single-threaded runtime: spawn a task for async unlock.
640                // Note: This task may not execute before process shutdown. In that case,
641                // the advisory lock is released when the PostgreSQL session ends (the
642                // connection closes). See struct-level documentation for details.
643                drop(tokio::spawn(async move {
644                    if let Err(e) = query("SELECT pg_advisory_unlock($1)")
645                        .bind(lock_key)
646                        .execute(&mut *connection)
647                        .await
648                    {
649                        warn!(
650                            lock_key = lock_key,
651                            error = %e,
652                            "failed to release advisory lock on drop (async)"
653                        );
654                    }
655                }));
656            }
657        }
658    }
659}
660
661/// Postgres-backed projector coordinator for distributed leadership.
662///
663/// `PostgresProjectorCoordinator` uses PostgreSQL advisory locks to ensure
664/// only one projector instance processes events for a given subscription
665/// at a time, preventing duplicate processing in distributed deployments.
666#[derive(Debug, Clone)]
667pub struct PostgresProjectorCoordinator {
668    pool: Pool<Postgres>,
669}
670
671impl PostgresProjectorCoordinator {
672    /// Create a new PostgresProjectorCoordinator with default configuration.
673    pub async fn new<S: Into<String>>(connection_string: S) -> Result<Self, CoordinationError> {
674        Self::with_config(connection_string, PostgresConfig::default()).await
675    }
676
677    /// Create a new PostgresProjectorCoordinator with custom configuration.
678    pub async fn with_config<S: Into<String>>(
679        connection_string: S,
680        config: PostgresConfig,
681    ) -> Result<Self, CoordinationError> {
682        let connection_string = connection_string.into();
683        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
684        let pool = PgPoolOptions::new()
685            .max_connections(max_connections.get())
686            .acquire_timeout(config.acquire_timeout)
687            .idle_timeout(config.idle_timeout)
688            .connect(&connection_string)
689            .await
690            .map_err(CoordinationError::DatabaseError)?;
691
692        Ok(Self { pool })
693    }
694
695    /// Create a PostgresProjectorCoordinator from an existing connection pool.
696    pub fn from_pool(pool: Pool<Postgres>) -> Self {
697        Self { pool }
698    }
699}
700
701/// Compute a stable FNV-1a hash of the subscription name to derive an advisory lock key.
702///
703/// This uses the FNV-1a algorithm (64-bit) which produces deterministic output
704/// across Rust versions, unlike `DefaultHasher` which is explicitly not stable.
705fn advisory_lock_key(subscription_name: &str) -> i64 {
706    const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325;
707    const FNV_PRIME: u64 = 0x00000100000001B3;
708
709    let mut hash = FNV_OFFSET_BASIS;
710    for byte in subscription_name.as_bytes() {
711        hash ^= *byte as u64;
712        hash = hash.wrapping_mul(FNV_PRIME);
713    }
714    hash as i64
715}
716
717/// Try to acquire a PostgreSQL advisory lock for the given subscription name
718/// using the provided connection pool.
719async fn try_acquire_advisory_lock(
720    pool: &Pool<Postgres>,
721    subscription_name: &str,
722) -> Result<CoordinationGuard, CoordinationError> {
723    let lock_key = advisory_lock_key(subscription_name);
724
725    // Acquire a dedicated connection from the pool.
726    // This connection MUST be kept for the lifetime of the guard because
727    // PostgreSQL advisory locks are session-scoped.
728    let mut connection = pool
729        .acquire()
730        .await
731        .map_err(CoordinationError::DatabaseError)?;
732
733    // Attempt to acquire advisory lock (non-blocking) on this specific connection
734    let row = query("SELECT pg_try_advisory_lock($1)")
735        .bind(lock_key)
736        .fetch_one(&mut *connection)
737        .await
738        .map_err(CoordinationError::DatabaseError)?;
739
740    let acquired: bool = row.get(0);
741
742    if acquired {
743        Ok(CoordinationGuard {
744            lock_key,
745            connection: Some(connection),
746        })
747    } else {
748        // Lock not acquired - connection will be returned to pool here
749        Err(CoordinationError::LeadershipNotAcquired {
750            subscription_name: subscription_name.to_string(),
751        })
752    }
753}
754
755impl ProjectorCoordinator for PostgresProjectorCoordinator {
756    type Error = CoordinationError;
757    type Guard = CoordinationGuard;
758
759    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
760        try_acquire_advisory_lock(&self.pool, subscription_name).await
761    }
762}
763
764impl ProjectorCoordinator for PostgresEventStore {
765    type Error = CoordinationError;
766    type Guard = CoordinationGuard;
767
768    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
769        try_acquire_advisory_lock(&self.pool, subscription_name).await
770    }
771}