Skip to main content

eventcore_postgres/
lib.rs

1use std::time::Duration;
2
3use eventcore_types::{
4    CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
5    EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
6    StreamVersion, StreamWriteEntry, StreamWrites,
7};
8use nutype::nutype;
9use serde_json::{Value, json};
10use sqlx::types::Json;
11use sqlx::{Pool, Postgres, Row, postgres::PgPoolOptions, query};
12use thiserror::Error;
13use tracing::{error, info, instrument, warn};
14use uuid::Uuid;
15
16#[derive(Debug, Error)]
17pub enum PostgresEventStoreError {
18    #[error("failed to create postgres connection pool")]
19    ConnectionFailed(#[source] sqlx::Error),
20}
21
22/// Maximum number of database connections in the pool.
23///
24/// MaxConnections represents the connection pool size limit. It must be at least 1,
25/// enforced by using NonZeroU32 as the underlying type.
26///
27/// # Examples
28///
29/// ```ignore
30/// use eventcore_postgres::MaxConnections;
31/// use std::num::NonZeroU32;
32///
33/// let small_pool = MaxConnections::new(NonZeroU32::new(5).expect("5 is non-zero"));
34/// let standard = MaxConnections::new(NonZeroU32::new(10).expect("10 is non-zero"));
35/// let large_pool = MaxConnections::new(NonZeroU32::new(50).expect("50 is non-zero"));
36///
37/// // Zero connections not allowed by type system
38/// // let zero = NonZeroU32::new(0); // Returns None
39/// ```
40#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, Display, AsRef, Into))]
41pub struct MaxConnections(std::num::NonZeroU32);
42
43/// Configuration for PostgresEventStore connection pool.
44#[derive(Debug, Clone)]
45pub struct PostgresConfig {
46    /// Maximum number of connections in the pool (default: 10)
47    pub max_connections: MaxConnections,
48    /// Timeout for acquiring a connection from the pool (default: 30 seconds)
49    pub acquire_timeout: Duration,
50    /// Idle timeout for connections in the pool (default: 10 minutes)
51    pub idle_timeout: Duration,
52}
53
54impl Default for PostgresConfig {
55    fn default() -> Self {
56        const DEFAULT_MAX_CONNECTIONS: std::num::NonZeroU32 = match std::num::NonZeroU32::new(10) {
57            Some(v) => v,
58            None => unreachable!(),
59        };
60
61        Self {
62            max_connections: MaxConnections::new(DEFAULT_MAX_CONNECTIONS),
63            acquire_timeout: Duration::from_secs(30),
64            idle_timeout: Duration::from_secs(600), // 10 minutes
65        }
66    }
67}
68
69#[derive(Debug, Clone)]
70pub struct PostgresEventStore {
71    pool: Pool<Postgres>,
72}
73
74impl PostgresEventStore {
75    /// Create a new PostgresEventStore with default configuration.
76    pub async fn new<S: Into<String>>(
77        connection_string: S,
78    ) -> Result<Self, PostgresEventStoreError> {
79        Self::with_config(connection_string, PostgresConfig::default()).await
80    }
81
82    /// Create a new PostgresEventStore with custom configuration.
83    pub async fn with_config<S: Into<String>>(
84        connection_string: S,
85        config: PostgresConfig,
86    ) -> Result<Self, PostgresEventStoreError> {
87        let connection_string = connection_string.into();
88        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
89        let pool = PgPoolOptions::new()
90            .max_connections(max_connections.get())
91            .acquire_timeout(config.acquire_timeout)
92            .idle_timeout(config.idle_timeout)
93            .connect(&connection_string)
94            .await
95            .map_err(PostgresEventStoreError::ConnectionFailed)?;
96        Ok(Self { pool })
97    }
98
99    /// Create a PostgresEventStore from an existing connection pool.
100    ///
101    /// Use this when you need full control over pool configuration or want to
102    /// share a pool across multiple components.
103    pub fn from_pool(pool: Pool<Postgres>) -> Self {
104        Self { pool }
105    }
106
107    #[cfg_attr(test, mutants::skip)] // infallible: panics on failure
108    pub async fn ping(&self) {
109        let _ = query("SELECT 1")
110            .execute(&self.pool)
111            .await
112            .expect("postgres ping failed");
113    }
114
115    #[cfg_attr(test, mutants::skip)] // infallible: panics on failure
116    pub async fn migrate(&self) {
117        sqlx::migrate!("./migrations")
118            .run(&self.pool)
119            .await
120            .expect("postgres migration failed");
121    }
122}
123
124impl EventStore for PostgresEventStore {
125    #[instrument(name = "postgres.read_stream", skip(self))]
126    async fn read_stream<E: Event>(
127        &self,
128        stream_id: StreamId,
129    ) -> Result<EventStreamReader<E>, EventStoreError> {
130        info!(
131            stream = %stream_id,
132            "[postgres.read_stream] reading events from postgres"
133        );
134
135        let rows = query(
136            "SELECT event_data FROM eventcore_events WHERE stream_id = $1 ORDER BY stream_version ASC",
137        )
138        .bind(stream_id.as_ref())
139        .fetch_all(&self.pool)
140        .await
141        .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
142
143        let mut events = Vec::with_capacity(rows.len());
144        for row in rows {
145            let payload: Value = row
146                .try_get("event_data")
147                .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
148            let event = serde_json::from_value(payload).map_err(|error| {
149                EventStoreError::DeserializationFailed {
150                    stream_id: stream_id.clone(),
151                    detail: error.to_string(),
152                }
153            })?;
154            events.push(event);
155        }
156
157        Ok(EventStreamReader::new(events))
158    }
159
160    #[instrument(name = "postgres.append_events", skip(self, writes))]
161    async fn append_events(
162        &self,
163        writes: StreamWrites,
164    ) -> Result<EventStreamSlice, EventStoreError> {
165        let expected_versions = writes.expected_versions().clone();
166        let entries = writes.into_entries();
167
168        if entries.is_empty() {
169            return Ok(EventStreamSlice);
170        }
171
172        info!(
173            stream_count = expected_versions.len(),
174            event_count = entries.len(),
175            "[postgres.append_events] appending events to postgres"
176        );
177
178        // Build expected versions JSON for the trigger
179        let expected_versions_json: Value = expected_versions
180            .iter()
181            .map(|(stream_id, version)| {
182                (stream_id.as_ref().to_string(), json!(version.into_inner()))
183            })
184            .collect();
185
186        let mut tx = self
187            .pool
188            .begin()
189            .await
190            .map_err(|error| map_sqlx_error(error, Operation::BeginTransaction))?;
191
192        // Set expected versions in session config for trigger validation
193        let _ = query("SELECT set_config('eventcore.expected_versions', $1, true)")
194            .bind(expected_versions_json.to_string())
195            .execute(&mut *tx)
196            .await
197            .map_err(|error| map_sqlx_error(error, Operation::SetExpectedVersions))?;
198
199        // Insert all events - trigger handles version assignment and validation
200        for entry in entries {
201            let StreamWriteEntry {
202                stream_id,
203                event_type,
204                event_data,
205                ..
206            } = entry;
207
208            let event_id = Uuid::now_v7();
209            let _ = query(
210                "INSERT INTO eventcore_events (event_id, stream_id, event_type, event_data, metadata)
211                 VALUES ($1, $2, $3, $4, $5)",
212            )
213            .bind(event_id)
214            .bind(stream_id.as_ref())
215            .bind(event_type)
216            .bind(Json(event_data))
217            .bind(Json(json!({})))
218            .execute(&mut *tx)
219            .await
220            .map_err(|error| map_sqlx_error(error, Operation::AppendEvents))?;
221        }
222
223        tx.commit()
224            .await
225            .map_err(|error| map_sqlx_error(error, Operation::CommitTransaction))?;
226
227        Ok(EventStreamSlice)
228    }
229}
230
231impl CheckpointStore for PostgresEventStore {
232    type Error = PostgresCheckpointError;
233
234    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
235        let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
236            .bind(name)
237            .fetch_optional(&self.pool)
238            .await
239            .map_err(PostgresCheckpointError::DatabaseError)?;
240
241        match row {
242            Some(row) => {
243                let position: Uuid = row.get("last_position");
244                Ok(Some(StreamPosition::new(position)))
245            }
246            None => Ok(None),
247        }
248    }
249
250    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
251        let position_uuid: Uuid = position.into_inner();
252        let _ = query(
253            "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
254             VALUES ($1, $2, NOW())
255             ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
256        )
257        .bind(name)
258        .bind(position_uuid)
259        .execute(&self.pool)
260        .await
261        .map_err(PostgresCheckpointError::DatabaseError)?;
262
263        Ok(())
264    }
265}
266
267impl EventReader for PostgresEventStore {
268    type Error = EventStoreError;
269
270    async fn read_events<E: Event>(
271        &self,
272        filter: EventFilter,
273        page: EventPage,
274    ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
275        // Query events ordered by event_id (UUID7, monotonically increasing).
276        // Use event_id directly as the global position - no need for ROW_NUMBER.
277        let after_event_id: Option<Uuid> = page.after_position().map(|p| p.into_inner());
278        let limit: i64 = page.limit().into_inner() as i64;
279
280        let rows = if let Some(prefix) = filter.stream_prefix() {
281            let prefix_str = prefix.as_ref();
282
283            if let Some(after_id) = after_event_id {
284                let query_str = r#"
285                    SELECT event_id, event_data, stream_id
286                    FROM eventcore_events
287                    WHERE event_id > $1
288                      AND stream_id LIKE $2 || '%'
289                    ORDER BY event_id
290                    LIMIT $3
291                "#;
292                query(query_str)
293                    .bind(after_id)
294                    .bind(prefix_str)
295                    .bind(limit)
296                    .fetch_all(&self.pool)
297                    .await
298            } else {
299                let query_str = r#"
300                    SELECT event_id, event_data, stream_id
301                    FROM eventcore_events
302                    WHERE stream_id LIKE $1 || '%'
303                    ORDER BY event_id
304                    LIMIT $2
305                "#;
306                query(query_str)
307                    .bind(prefix_str)
308                    .bind(limit)
309                    .fetch_all(&self.pool)
310                    .await
311            }
312        } else if let Some(after_id) = after_event_id {
313            let query_str = r#"
314                SELECT event_id, event_data, stream_id
315                FROM eventcore_events
316                WHERE event_id > $1
317                ORDER BY event_id
318                LIMIT $2
319            "#;
320            query(query_str)
321                .bind(after_id)
322                .bind(limit)
323                .fetch_all(&self.pool)
324                .await
325        } else {
326            let query_str = r#"
327                SELECT event_id, event_data, stream_id
328                FROM eventcore_events
329                ORDER BY event_id
330                LIMIT $1
331            "#;
332            query(query_str).bind(limit).fetch_all(&self.pool).await
333        }
334        .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
335
336        let events: Vec<(E, StreamPosition)> = rows
337            .into_iter()
338            .filter_map(|row| {
339                let event_data: Json<Value> = row.get("event_data");
340                let event_id: Uuid = row.get("event_id");
341                serde_json::from_value::<E>(event_data.0)
342                    .ok()
343                    .map(|e| (e, StreamPosition::new(event_id)))
344            })
345            .collect();
346
347        Ok(events)
348    }
349}
350
351fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError {
352    if let sqlx::Error::Database(db_error) = &error {
353        let code = db_error.code();
354        let code_str = code.as_deref();
355        // P0001: Custom error from trigger (version_conflict)
356        // 23505: Unique constraint violation (fallback for version conflict)
357        if code_str == Some("P0001") || code_str == Some("23505") {
358            warn!(
359                error = %db_error,
360                "[postgres.version_conflict] optimistic concurrency check failed"
361            );
362            return parse_version_conflict_from_db_error(db_error.message());
363        }
364    }
365
366    error!(
367        error = %error,
368        operation = %operation,
369        "[postgres.database_error] database operation failed"
370    );
371    EventStoreError::StoreFailure { operation }
372}
373
374/// Parse version conflict details from the PostgreSQL trigger error message.
375///
376/// The trigger produces messages like:
377///   `version_conflict: stream "my-stream" expected version 0, actual 1`
378///
379/// If parsing fails, falls back to a VersionConflict with a sentinel stream_id
380/// indicating the details could not be extracted.
381fn parse_version_conflict_from_db_error(message: &str) -> EventStoreError {
382    // Pattern: version_conflict: stream "STREAM_ID" expected version EXPECTED, actual ACTUAL
383    if let Some(parsed) = try_parse_conflict_message(message) {
384        return parsed;
385    }
386
387    // Fallback: unique constraint violation (23505) or unparseable trigger message.
388    // Use a sentinel stream_id since we don't have the details.
389    let fallback_stream_id =
390        StreamId::try_new("unknown-conflict-stream").expect("static stream id is valid");
391    EventStoreError::VersionConflict {
392        stream_id: fallback_stream_id,
393        expected: StreamVersion::new(0),
394        actual: StreamVersion::new(0),
395    }
396}
397
398fn try_parse_conflict_message(message: &str) -> Option<EventStoreError> {
399    let rest = message.strip_prefix("version_conflict: stream \"")?;
400    let stream_end = rest.find('"')?;
401    let stream_id_str = &rest[..stream_end];
402    let after_stream = &rest[stream_end..];
403
404    let expected_str = after_stream
405        .strip_prefix("\" expected version ")?
406        .split(',')
407        .next()?;
408    let actual_str = after_stream.rsplit("actual ").next()?;
409
410    let expected = expected_str.trim().parse::<usize>().ok()?;
411    let actual = actual_str.trim().parse::<usize>().ok()?;
412    let stream_id = StreamId::try_new(stream_id_str).ok()?;
413
414    Some(EventStoreError::VersionConflict {
415        stream_id,
416        expected: StreamVersion::new(expected),
417        actual: StreamVersion::new(actual),
418    })
419}
420
421/// Error type for PostgresCheckpointStore operations.
422#[derive(Debug, Error)]
423pub enum PostgresCheckpointError {
424    /// Failed to create connection pool.
425    #[error("failed to create postgres connection pool")]
426    ConnectionFailed(#[source] sqlx::Error),
427
428    /// Database operation failed.
429    #[error("database operation failed: {0}")]
430    DatabaseError(#[source] sqlx::Error),
431}
432
433/// Postgres-backed checkpoint store for tracking projection progress.
434///
435/// `PostgresCheckpointStore` stores checkpoint positions in a PostgreSQL table,
436/// providing durability across process restarts. It implements the `CheckpointStore`
437/// trait from eventcore-types.
438///
439/// # Schema
440///
441/// The store uses the `eventcore_subscription_versions` table with:
442/// - `subscription_name`: Unique identifier for each projector/subscription
443/// - `last_position`: UUID7 representing the global stream position
444/// - `updated_at`: Timestamp of the last checkpoint update
445#[derive(Debug, Clone)]
446pub struct PostgresCheckpointStore {
447    pool: Pool<Postgres>,
448}
449
450impl PostgresCheckpointStore {
451    /// Create a new PostgresCheckpointStore with default configuration.
452    pub async fn new<S: Into<String>>(
453        connection_string: S,
454    ) -> Result<Self, PostgresCheckpointError> {
455        Self::with_config(connection_string, PostgresConfig::default()).await
456    }
457
458    /// Create a new PostgresCheckpointStore with custom configuration.
459    pub async fn with_config<S: Into<String>>(
460        connection_string: S,
461        config: PostgresConfig,
462    ) -> Result<Self, PostgresCheckpointError> {
463        let connection_string = connection_string.into();
464        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
465        let pool = PgPoolOptions::new()
466            .max_connections(max_connections.get())
467            .acquire_timeout(config.acquire_timeout)
468            .idle_timeout(config.idle_timeout)
469            .connect(&connection_string)
470            .await
471            .map_err(PostgresCheckpointError::ConnectionFailed)?;
472
473        // Run migrations to ensure table exists
474        sqlx::migrate!("./migrations")
475            .run(&pool)
476            .await
477            .map_err(|e| {
478                PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e)))
479            })?;
480
481        Ok(Self { pool })
482    }
483
484    /// Create a PostgresCheckpointStore from an existing connection pool.
485    ///
486    /// Use this when you need full control over pool configuration or want to
487    /// share a pool across multiple components.
488    pub fn from_pool(pool: Pool<Postgres>) -> Self {
489        Self { pool }
490    }
491}
492
493impl CheckpointStore for PostgresCheckpointStore {
494    type Error = PostgresCheckpointError;
495
496    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
497        let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
498            .bind(name)
499            .fetch_optional(&self.pool)
500            .await
501            .map_err(PostgresCheckpointError::DatabaseError)?;
502
503        match row {
504            Some(row) => {
505                let position: Uuid = row.get("last_position");
506                Ok(Some(StreamPosition::new(position)))
507            }
508            None => Ok(None),
509        }
510    }
511
512    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
513        let position_uuid: Uuid = position.into_inner();
514        let _ = query(
515            "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
516             VALUES ($1, $2, NOW())
517             ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
518        )
519        .bind(name)
520        .bind(position_uuid)
521        .execute(&self.pool)
522        .await
523        .map_err(PostgresCheckpointError::DatabaseError)?;
524
525        Ok(())
526    }
527}
528
529// ============================================================================
530// PostgresProjectorCoordinator - Distributed projector coordination via Postgres
531// ============================================================================
532
533/// Error type for projector coordination operations.
534#[derive(Debug, Error)]
535pub enum CoordinationError {
536    /// Leadership could not be acquired (another instance holds the lock).
537    #[error(
538        "leadership not acquired for subscription '{subscription_name}': another instance holds the lock"
539    )]
540    LeadershipNotAcquired { subscription_name: String },
541
542    /// Database operation failed.
543    #[error("database operation failed: {0}")]
544    DatabaseError(#[source] sqlx::Error),
545}
546
547/// Guard type that releases leadership when dropped.
548///
549/// Holds the advisory lock key and the actual database connection that acquired
550/// the lock. This is critical because PostgreSQL advisory locks are session-scoped:
551/// the unlock must happen on the same connection that acquired the lock.
552///
553/// # Lock Release Behavior
554///
555/// The guard attempts to explicitly release the advisory lock when dropped:
556///
557/// - **Multi-threaded runtime**: Uses `block_in_place` to synchronously release
558///   the lock before the guard is fully dropped.
559///
560/// - **Single-threaded runtime**: Spawns a task to release the lock asynchronously.
561///   This task may not execute before process shutdown, in which case the lock is
562///   released when the PostgreSQL session ends (connection closes).
563///
564/// # PostgreSQL Session-Scoped Locks
565///
566/// PostgreSQL advisory locks acquired with `pg_try_advisory_lock` are session-scoped
567/// and automatically released when the database connection closes. This provides a
568/// safety net: even if explicit unlock fails or is skipped, the lock will be released
569/// when:
570/// - The connection is returned to the pool and recycled
571/// - The connection pool is shut down
572/// - The database connection times out
573///
574/// For production deployments, configure appropriate connection pool idle timeouts
575/// to ensure timely lock release on ungraceful shutdown.
576pub struct CoordinationGuard {
577    lock_key: i64,
578    /// The actual connection that holds the advisory lock.
579    /// Must be Option so we can take ownership in Drop.
580    connection: Option<sqlx::pool::PoolConnection<Postgres>>,
581}
582
583impl std::fmt::Debug for CoordinationGuard {
584    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
585        f.debug_struct("CoordinationGuard")
586            .field("lock_key", &self.lock_key)
587            .finish_non_exhaustive()
588    }
589}
590
591impl Drop for CoordinationGuard {
592    fn drop(&mut self) {
593        // Take ownership of the connection - we need the same connection that acquired the lock
594        if let Some(mut connection) = self.connection.take() {
595            let lock_key = self.lock_key;
596
597            // Check runtime flavor to determine the appropriate unlock strategy.
598            // block_in_place panics on single-threaded runtimes, so we must check first.
599            let handle = tokio::runtime::Handle::current();
600            let is_multi_thread =
601                handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread;
602
603            if is_multi_thread {
604                // Multi-threaded runtime: use block_in_place for synchronous unlock
605                tokio::task::block_in_place(|| {
606                    handle.block_on(async {
607                        // Unlock on the SAME connection that acquired the lock
608                        if let Err(e) = query("SELECT pg_advisory_unlock($1)")
609                            .bind(lock_key)
610                            .execute(&mut *connection)
611                            .await
612                        {
613                            warn!(
614                                lock_key = lock_key,
615                                error = %e,
616                                "failed to release advisory lock on drop"
617                            );
618                        }
619                        // Connection is returned to pool when dropped here
620                    });
621                });
622            } else {
623                // Single-threaded runtime: spawn a task for async unlock.
624                // Note: This task may not execute before process shutdown. In that case,
625                // the advisory lock is released when the PostgreSQL session ends (the
626                // connection closes). See struct-level documentation for details.
627                drop(tokio::spawn(async move {
628                    if let Err(e) = query("SELECT pg_advisory_unlock($1)")
629                        .bind(lock_key)
630                        .execute(&mut *connection)
631                        .await
632                    {
633                        warn!(
634                            lock_key = lock_key,
635                            error = %e,
636                            "failed to release advisory lock on drop (async)"
637                        );
638                    }
639                }));
640            }
641        }
642    }
643}
644
645/// Postgres-backed projector coordinator for distributed leadership.
646///
647/// `PostgresProjectorCoordinator` uses PostgreSQL advisory locks to ensure
648/// only one projector instance processes events for a given subscription
649/// at a time, preventing duplicate processing in distributed deployments.
650#[derive(Debug, Clone)]
651pub struct PostgresProjectorCoordinator {
652    pool: Pool<Postgres>,
653}
654
655impl PostgresProjectorCoordinator {
656    /// Create a new PostgresProjectorCoordinator with default configuration.
657    pub async fn new<S: Into<String>>(connection_string: S) -> Result<Self, CoordinationError> {
658        Self::with_config(connection_string, PostgresConfig::default()).await
659    }
660
661    /// Create a new PostgresProjectorCoordinator with custom configuration.
662    pub async fn with_config<S: Into<String>>(
663        connection_string: S,
664        config: PostgresConfig,
665    ) -> Result<Self, CoordinationError> {
666        let connection_string = connection_string.into();
667        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
668        let pool = PgPoolOptions::new()
669            .max_connections(max_connections.get())
670            .acquire_timeout(config.acquire_timeout)
671            .idle_timeout(config.idle_timeout)
672            .connect(&connection_string)
673            .await
674            .map_err(CoordinationError::DatabaseError)?;
675
676        Ok(Self { pool })
677    }
678
679    /// Create a PostgresProjectorCoordinator from an existing connection pool.
680    pub fn from_pool(pool: Pool<Postgres>) -> Self {
681        Self { pool }
682    }
683}
684
685/// Compute a stable FNV-1a hash of the subscription name to derive an advisory lock key.
686///
687/// This uses the FNV-1a algorithm (64-bit) which produces deterministic output
688/// across Rust versions, unlike `DefaultHasher` which is explicitly not stable.
689fn advisory_lock_key(subscription_name: &str) -> i64 {
690    const FNV_OFFSET_BASIS: u64 = 0xcbf29ce484222325;
691    const FNV_PRIME: u64 = 0x00000100000001B3;
692
693    let mut hash = FNV_OFFSET_BASIS;
694    for byte in subscription_name.as_bytes() {
695        hash ^= *byte as u64;
696        hash = hash.wrapping_mul(FNV_PRIME);
697    }
698    hash as i64
699}
700
701/// Try to acquire a PostgreSQL advisory lock for the given subscription name
702/// using the provided connection pool.
703async fn try_acquire_advisory_lock(
704    pool: &Pool<Postgres>,
705    subscription_name: &str,
706) -> Result<CoordinationGuard, CoordinationError> {
707    let lock_key = advisory_lock_key(subscription_name);
708
709    // Acquire a dedicated connection from the pool.
710    // This connection MUST be kept for the lifetime of the guard because
711    // PostgreSQL advisory locks are session-scoped.
712    let mut connection = pool
713        .acquire()
714        .await
715        .map_err(CoordinationError::DatabaseError)?;
716
717    // Attempt to acquire advisory lock (non-blocking) on this specific connection
718    let row = query("SELECT pg_try_advisory_lock($1)")
719        .bind(lock_key)
720        .fetch_one(&mut *connection)
721        .await
722        .map_err(CoordinationError::DatabaseError)?;
723
724    let acquired: bool = row.get(0);
725
726    if acquired {
727        Ok(CoordinationGuard {
728            lock_key,
729            connection: Some(connection),
730        })
731    } else {
732        // Lock not acquired - connection will be returned to pool here
733        Err(CoordinationError::LeadershipNotAcquired {
734            subscription_name: subscription_name.to_string(),
735        })
736    }
737}
738
739impl ProjectorCoordinator for PostgresProjectorCoordinator {
740    type Error = CoordinationError;
741    type Guard = CoordinationGuard;
742
743    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
744        try_acquire_advisory_lock(&self.pool, subscription_name).await
745    }
746}
747
748impl ProjectorCoordinator for PostgresEventStore {
749    type Error = CoordinationError;
750    type Guard = CoordinationGuard;
751
752    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
753        try_acquire_advisory_lock(&self.pool, subscription_name).await
754    }
755}