eventcore_postgres/
lib.rs

1use std::time::Duration;
2
3use eventcore_types::{
4    CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
5    EventStreamReader, EventStreamSlice, Operation, ProjectorCoordinator, StreamId, StreamPosition,
6    StreamWriteEntry, StreamWrites,
7};
8use nutype::nutype;
9use serde_json::{Value, json};
10use sqlx::types::Json;
11use sqlx::{Pool, Postgres, Row, postgres::PgPoolOptions, query};
12use thiserror::Error;
13use tracing::{error, info, instrument, warn};
14use uuid::Uuid;
15
16#[derive(Debug, Error)]
17pub enum PostgresEventStoreError {
18    #[error("failed to create postgres connection pool")]
19    ConnectionFailed(#[source] sqlx::Error),
20}
21
22/// Maximum number of database connections in the pool.
23///
24/// MaxConnections represents the connection pool size limit. It must be at least 1,
25/// enforced by using NonZeroU32 as the underlying type.
26///
27/// # Examples
28///
29/// ```ignore
30/// use eventcore_postgres::MaxConnections;
31/// use std::num::NonZeroU32;
32///
33/// let small_pool = MaxConnections::new(NonZeroU32::new(5).expect("5 is non-zero"));
34/// let standard = MaxConnections::new(NonZeroU32::new(10).expect("10 is non-zero"));
35/// let large_pool = MaxConnections::new(NonZeroU32::new(50).expect("50 is non-zero"));
36///
37/// // Zero connections not allowed by type system
38/// // let zero = NonZeroU32::new(0); // Returns None
39/// ```
40#[nutype(derive(Debug, Clone, Copy, PartialEq, Eq, Display, AsRef, Into))]
41pub struct MaxConnections(std::num::NonZeroU32);
42
43/// Configuration for PostgresEventStore connection pool.
44#[derive(Debug, Clone)]
45pub struct PostgresConfig {
46    /// Maximum number of connections in the pool (default: 10)
47    pub max_connections: MaxConnections,
48    /// Timeout for acquiring a connection from the pool (default: 30 seconds)
49    pub acquire_timeout: Duration,
50    /// Idle timeout for connections in the pool (default: 10 minutes)
51    pub idle_timeout: Duration,
52}
53
54impl Default for PostgresConfig {
55    fn default() -> Self {
56        const DEFAULT_MAX_CONNECTIONS: std::num::NonZeroU32 = match std::num::NonZeroU32::new(10) {
57            Some(v) => v,
58            None => unreachable!(),
59        };
60
61        Self {
62            max_connections: MaxConnections::new(DEFAULT_MAX_CONNECTIONS),
63            acquire_timeout: Duration::from_secs(30),
64            idle_timeout: Duration::from_secs(600), // 10 minutes
65        }
66    }
67}
68
69#[derive(Debug, Clone)]
70pub struct PostgresEventStore {
71    pool: Pool<Postgres>,
72}
73
74impl PostgresEventStore {
75    /// Create a new PostgresEventStore with default configuration.
76    pub async fn new<S: Into<String>>(
77        connection_string: S,
78    ) -> Result<Self, PostgresEventStoreError> {
79        Self::with_config(connection_string, PostgresConfig::default()).await
80    }
81
82    /// Create a new PostgresEventStore with custom configuration.
83    pub async fn with_config<S: Into<String>>(
84        connection_string: S,
85        config: PostgresConfig,
86    ) -> Result<Self, PostgresEventStoreError> {
87        let connection_string = connection_string.into();
88        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
89        let pool = PgPoolOptions::new()
90            .max_connections(max_connections.get())
91            .acquire_timeout(config.acquire_timeout)
92            .idle_timeout(config.idle_timeout)
93            .connect(&connection_string)
94            .await
95            .map_err(PostgresEventStoreError::ConnectionFailed)?;
96        Ok(Self { pool })
97    }
98
99    /// Create a PostgresEventStore from an existing connection pool.
100    ///
101    /// Use this when you need full control over pool configuration or want to
102    /// share a pool across multiple components.
103    pub fn from_pool(pool: Pool<Postgres>) -> Self {
104        Self { pool }
105    }
106
107    #[cfg_attr(test, mutants::skip)] // infallible: panics on failure
108    pub async fn ping(&self) {
109        query("SELECT 1")
110            .execute(&self.pool)
111            .await
112            .expect("postgres ping failed");
113    }
114
115    #[cfg_attr(test, mutants::skip)] // infallible: panics on failure
116    pub async fn migrate(&self) {
117        sqlx::migrate!("./migrations")
118            .run(&self.pool)
119            .await
120            .expect("postgres migration failed");
121    }
122}
123
124impl EventStore for PostgresEventStore {
125    #[instrument(name = "postgres.read_stream", skip(self))]
126    async fn read_stream<E: Event>(
127        &self,
128        stream_id: StreamId,
129    ) -> Result<EventStreamReader<E>, EventStoreError> {
130        info!(
131            stream = %stream_id,
132            "[postgres.read_stream] reading events from postgres"
133        );
134
135        let rows = query(
136            "SELECT event_data FROM eventcore_events WHERE stream_id = $1 ORDER BY stream_version ASC",
137        )
138        .bind(stream_id.as_ref())
139        .fetch_all(&self.pool)
140        .await
141        .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
142
143        let mut events = Vec::with_capacity(rows.len());
144        for row in rows {
145            let payload: Value = row
146                .try_get("event_data")
147                .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
148            let event = serde_json::from_value(payload).map_err(|error| {
149                EventStoreError::DeserializationFailed {
150                    stream_id: stream_id.clone(),
151                    detail: error.to_string(),
152                }
153            })?;
154            events.push(event);
155        }
156
157        Ok(EventStreamReader::new(events))
158    }
159
160    #[instrument(name = "postgres.append_events", skip(self, writes))]
161    async fn append_events(
162        &self,
163        writes: StreamWrites,
164    ) -> Result<EventStreamSlice, EventStoreError> {
165        let expected_versions = writes.expected_versions().clone();
166        let entries = writes.into_entries();
167
168        if entries.is_empty() {
169            return Ok(EventStreamSlice);
170        }
171
172        info!(
173            stream_count = expected_versions.len(),
174            event_count = entries.len(),
175            "[postgres.append_events] appending events to postgres"
176        );
177
178        // Build expected versions JSON for the trigger
179        let expected_versions_json: Value = expected_versions
180            .iter()
181            .map(|(stream_id, version)| {
182                (stream_id.as_ref().to_string(), json!(version.into_inner()))
183            })
184            .collect();
185
186        let mut tx = self
187            .pool
188            .begin()
189            .await
190            .map_err(|error| map_sqlx_error(error, Operation::BeginTransaction))?;
191
192        // Set expected versions in session config for trigger validation
193        query("SELECT set_config('eventcore.expected_versions', $1, true)")
194            .bind(expected_versions_json.to_string())
195            .execute(&mut *tx)
196            .await
197            .map_err(|error| map_sqlx_error(error, Operation::SetExpectedVersions))?;
198
199        // Insert all events - trigger handles version assignment and validation
200        for entry in entries {
201            let StreamWriteEntry {
202                stream_id,
203                event_type,
204                event_data,
205                ..
206            } = entry;
207
208            let event_id = Uuid::now_v7();
209            query(
210                "INSERT INTO eventcore_events (event_id, stream_id, event_type, event_data, metadata)
211                 VALUES ($1, $2, $3, $4, $5)",
212            )
213            .bind(event_id)
214            .bind(stream_id.as_ref())
215            .bind(event_type)
216            .bind(Json(event_data))
217            .bind(Json(json!({})))
218            .execute(&mut *tx)
219            .await
220            .map_err(|error| map_sqlx_error(error, Operation::AppendEvents))?;
221        }
222
223        tx.commit()
224            .await
225            .map_err(|error| map_sqlx_error(error, Operation::CommitTransaction))?;
226
227        Ok(EventStreamSlice)
228    }
229}
230
231impl CheckpointStore for PostgresEventStore {
232    type Error = PostgresCheckpointError;
233
234    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
235        let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
236            .bind(name)
237            .fetch_optional(&self.pool)
238            .await
239            .map_err(PostgresCheckpointError::DatabaseError)?;
240
241        match row {
242            Some(row) => {
243                let position: Uuid = row.get("last_position");
244                Ok(Some(StreamPosition::new(position)))
245            }
246            None => Ok(None),
247        }
248    }
249
250    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
251        let position_uuid: Uuid = position.into_inner();
252        query(
253            "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
254             VALUES ($1, $2, NOW())
255             ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
256        )
257        .bind(name)
258        .bind(position_uuid)
259        .execute(&self.pool)
260        .await
261        .map_err(PostgresCheckpointError::DatabaseError)?;
262
263        Ok(())
264    }
265}
266
267impl EventReader for PostgresEventStore {
268    type Error = EventStoreError;
269
270    async fn read_events<E: Event>(
271        &self,
272        filter: EventFilter,
273        page: EventPage,
274    ) -> Result<Vec<(E, StreamPosition)>, Self::Error> {
275        // Query events ordered by event_id (UUID7, monotonically increasing).
276        // Use event_id directly as the global position - no need for ROW_NUMBER.
277        let after_event_id: Option<Uuid> = page.after_position().map(|p| p.into_inner());
278        let limit: i64 = page.limit().into_inner() as i64;
279
280        let rows = if let Some(prefix) = filter.stream_prefix() {
281            let prefix_str = prefix.as_ref();
282
283            if let Some(after_id) = after_event_id {
284                let query_str = r#"
285                    SELECT event_id, event_data, stream_id
286                    FROM eventcore_events
287                    WHERE event_id > $1
288                      AND stream_id LIKE $2 || '%'
289                    ORDER BY event_id
290                    LIMIT $3
291                "#;
292                query(query_str)
293                    .bind(after_id)
294                    .bind(prefix_str)
295                    .bind(limit)
296                    .fetch_all(&self.pool)
297                    .await
298            } else {
299                let query_str = r#"
300                    SELECT event_id, event_data, stream_id
301                    FROM eventcore_events
302                    WHERE stream_id LIKE $1 || '%'
303                    ORDER BY event_id
304                    LIMIT $2
305                "#;
306                query(query_str)
307                    .bind(prefix_str)
308                    .bind(limit)
309                    .fetch_all(&self.pool)
310                    .await
311            }
312        } else if let Some(after_id) = after_event_id {
313            let query_str = r#"
314                SELECT event_id, event_data, stream_id
315                FROM eventcore_events
316                WHERE event_id > $1
317                ORDER BY event_id
318                LIMIT $2
319            "#;
320            query(query_str)
321                .bind(after_id)
322                .bind(limit)
323                .fetch_all(&self.pool)
324                .await
325        } else {
326            let query_str = r#"
327                SELECT event_id, event_data, stream_id
328                FROM eventcore_events
329                ORDER BY event_id
330                LIMIT $1
331            "#;
332            query(query_str).bind(limit).fetch_all(&self.pool).await
333        }
334        .map_err(|error| map_sqlx_error(error, Operation::ReadStream))?;
335
336        let events: Vec<(E, StreamPosition)> = rows
337            .into_iter()
338            .filter_map(|row| {
339                let event_data: Json<Value> = row.get("event_data");
340                let event_id: Uuid = row.get("event_id");
341                serde_json::from_value::<E>(event_data.0)
342                    .ok()
343                    .map(|e| (e, StreamPosition::new(event_id)))
344            })
345            .collect();
346
347        Ok(events)
348    }
349}
350
351fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError {
352    if let sqlx::Error::Database(db_error) = &error {
353        let code = db_error.code();
354        let code_str = code.as_deref();
355        // P0001: Custom error from trigger (version_conflict)
356        // 23505: Unique constraint violation (fallback for version conflict)
357        if code_str == Some("P0001") || code_str == Some("23505") {
358            warn!(
359                error = %db_error,
360                "[postgres.version_conflict] optimistic concurrency check failed"
361            );
362            return EventStoreError::VersionConflict;
363        }
364    }
365
366    error!(
367        error = %error,
368        operation = %operation,
369        "[postgres.database_error] database operation failed"
370    );
371    EventStoreError::StoreFailure { operation }
372}
373
374/// Error type for PostgresCheckpointStore operations.
375#[derive(Debug, Error)]
376pub enum PostgresCheckpointError {
377    /// Failed to create connection pool.
378    #[error("failed to create postgres connection pool")]
379    ConnectionFailed(#[source] sqlx::Error),
380
381    /// Database operation failed.
382    #[error("database operation failed: {0}")]
383    DatabaseError(#[source] sqlx::Error),
384}
385
386/// Postgres-backed checkpoint store for tracking projection progress.
387///
388/// `PostgresCheckpointStore` stores checkpoint positions in a PostgreSQL table,
389/// providing durability across process restarts. It implements the `CheckpointStore`
390/// trait from eventcore-types.
391///
392/// # Schema
393///
394/// The store uses the `eventcore_subscription_versions` table with:
395/// - `subscription_name`: Unique identifier for each projector/subscription
396/// - `last_position`: UUID7 representing the global stream position
397/// - `updated_at`: Timestamp of the last checkpoint update
398#[derive(Debug, Clone)]
399pub struct PostgresCheckpointStore {
400    pool: Pool<Postgres>,
401}
402
403impl PostgresCheckpointStore {
404    /// Create a new PostgresCheckpointStore with default configuration.
405    pub async fn new<S: Into<String>>(
406        connection_string: S,
407    ) -> Result<Self, PostgresCheckpointError> {
408        Self::with_config(connection_string, PostgresConfig::default()).await
409    }
410
411    /// Create a new PostgresCheckpointStore with custom configuration.
412    pub async fn with_config<S: Into<String>>(
413        connection_string: S,
414        config: PostgresConfig,
415    ) -> Result<Self, PostgresCheckpointError> {
416        let connection_string = connection_string.into();
417        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
418        let pool = PgPoolOptions::new()
419            .max_connections(max_connections.get())
420            .acquire_timeout(config.acquire_timeout)
421            .idle_timeout(config.idle_timeout)
422            .connect(&connection_string)
423            .await
424            .map_err(PostgresCheckpointError::ConnectionFailed)?;
425
426        // Run migrations to ensure table exists
427        sqlx::migrate!("./migrations")
428            .run(&pool)
429            .await
430            .map_err(|e| {
431                PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e)))
432            })?;
433
434        Ok(Self { pool })
435    }
436
437    /// Create a PostgresCheckpointStore from an existing connection pool.
438    ///
439    /// Use this when you need full control over pool configuration or want to
440    /// share a pool across multiple components.
441    pub fn from_pool(pool: Pool<Postgres>) -> Self {
442        Self { pool }
443    }
444}
445
446impl CheckpointStore for PostgresCheckpointStore {
447    type Error = PostgresCheckpointError;
448
449    async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
450        let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
451            .bind(name)
452            .fetch_optional(&self.pool)
453            .await
454            .map_err(PostgresCheckpointError::DatabaseError)?;
455
456        match row {
457            Some(row) => {
458                let position: Uuid = row.get("last_position");
459                Ok(Some(StreamPosition::new(position)))
460            }
461            None => Ok(None),
462        }
463    }
464
465    async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
466        let position_uuid: Uuid = position.into_inner();
467        query(
468            "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
469             VALUES ($1, $2, NOW())
470             ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
471        )
472        .bind(name)
473        .bind(position_uuid)
474        .execute(&self.pool)
475        .await
476        .map_err(PostgresCheckpointError::DatabaseError)?;
477
478        Ok(())
479    }
480}
481
482// ============================================================================
483// PostgresProjectorCoordinator - Distributed projector coordination via Postgres
484// ============================================================================
485
486/// Error type for projector coordination operations.
487#[derive(Debug, Error)]
488pub enum CoordinationError {
489    /// Leadership could not be acquired (another instance holds the lock).
490    #[error("leadership not acquired: another instance holds the lock")]
491    LeadershipNotAcquired,
492
493    /// Database operation failed.
494    #[error("database operation failed: {0}")]
495    DatabaseError(#[source] sqlx::Error),
496}
497
498/// Guard type that releases leadership when dropped.
499///
500/// Holds the advisory lock key and the actual database connection that acquired
501/// the lock. This is critical because PostgreSQL advisory locks are session-scoped:
502/// the unlock must happen on the same connection that acquired the lock.
503///
504/// # Lock Release Behavior
505///
506/// The guard attempts to explicitly release the advisory lock when dropped:
507///
508/// - **Multi-threaded runtime**: Uses `block_in_place` to synchronously release
509///   the lock before the guard is fully dropped.
510///
511/// - **Single-threaded runtime**: Spawns a task to release the lock asynchronously.
512///   This task may not execute before process shutdown, in which case the lock is
513///   released when the PostgreSQL session ends (connection closes).
514///
515/// # PostgreSQL Session-Scoped Locks
516///
517/// PostgreSQL advisory locks acquired with `pg_try_advisory_lock` are session-scoped
518/// and automatically released when the database connection closes. This provides a
519/// safety net: even if explicit unlock fails or is skipped, the lock will be released
520/// when:
521/// - The connection is returned to the pool and recycled
522/// - The connection pool is shut down
523/// - The database connection times out
524///
525/// For production deployments, configure appropriate connection pool idle timeouts
526/// to ensure timely lock release on ungraceful shutdown.
527pub struct CoordinationGuard {
528    lock_key: i64,
529    /// The actual connection that holds the advisory lock.
530    /// Must be Option so we can take ownership in Drop.
531    connection: Option<sqlx::pool::PoolConnection<Postgres>>,
532}
533
534impl std::fmt::Debug for CoordinationGuard {
535    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
536        f.debug_struct("CoordinationGuard")
537            .field("lock_key", &self.lock_key)
538            .finish_non_exhaustive()
539    }
540}
541
542impl Drop for CoordinationGuard {
543    fn drop(&mut self) {
544        // Take ownership of the connection - we need the same connection that acquired the lock
545        if let Some(mut connection) = self.connection.take() {
546            let lock_key = self.lock_key;
547
548            // Check runtime flavor to determine the appropriate unlock strategy.
549            // block_in_place panics on single-threaded runtimes, so we must check first.
550            let handle = tokio::runtime::Handle::current();
551            let is_multi_thread =
552                handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread;
553
554            if is_multi_thread {
555                // Multi-threaded runtime: use block_in_place for synchronous unlock
556                tokio::task::block_in_place(|| {
557                    handle.block_on(async {
558                        // Unlock on the SAME connection that acquired the lock
559                        let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
560                            .bind(lock_key)
561                            .execute(&mut *connection)
562                            .await;
563                        // Connection is returned to pool when dropped here
564                    });
565                });
566            } else {
567                // Single-threaded runtime: spawn a task for async unlock.
568                // Note: This task may not execute before process shutdown. In that case,
569                // the advisory lock is released when the PostgreSQL session ends (the
570                // connection closes). See struct-level documentation for details.
571                tokio::spawn(async move {
572                    let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
573                        .bind(lock_key)
574                        .execute(&mut *connection)
575                        .await;
576                });
577            }
578        }
579    }
580}
581
582/// Postgres-backed projector coordinator for distributed leadership.
583///
584/// `PostgresProjectorCoordinator` uses PostgreSQL advisory locks to ensure
585/// only one projector instance processes events for a given subscription
586/// at a time, preventing duplicate processing in distributed deployments.
587#[derive(Debug, Clone)]
588pub struct PostgresProjectorCoordinator {
589    pool: Pool<Postgres>,
590}
591
592impl PostgresProjectorCoordinator {
593    /// Create a new PostgresProjectorCoordinator with default configuration.
594    pub async fn new<S: Into<String>>(connection_string: S) -> Result<Self, CoordinationError> {
595        Self::with_config(connection_string, PostgresConfig::default()).await
596    }
597
598    /// Create a new PostgresProjectorCoordinator with custom configuration.
599    pub async fn with_config<S: Into<String>>(
600        connection_string: S,
601        config: PostgresConfig,
602    ) -> Result<Self, CoordinationError> {
603        let connection_string = connection_string.into();
604        let max_connections: std::num::NonZeroU32 = config.max_connections.into();
605        let pool = PgPoolOptions::new()
606            .max_connections(max_connections.get())
607            .acquire_timeout(config.acquire_timeout)
608            .idle_timeout(config.idle_timeout)
609            .connect(&connection_string)
610            .await
611            .map_err(CoordinationError::DatabaseError)?;
612
613        Ok(Self { pool })
614    }
615
616    /// Create a PostgresProjectorCoordinator from an existing connection pool.
617    pub fn from_pool(pool: Pool<Postgres>) -> Self {
618        Self { pool }
619    }
620}
621
622impl ProjectorCoordinator for PostgresProjectorCoordinator {
623    type Error = CoordinationError;
624    type Guard = CoordinationGuard;
625
626    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
627        use std::collections::hash_map::DefaultHasher;
628        use std::hash::{Hash, Hasher};
629
630        // Derive advisory lock key from subscription name
631        let mut hasher = DefaultHasher::new();
632        subscription_name.hash(&mut hasher);
633        let lock_key = hasher.finish() as i64;
634
635        // Acquire a dedicated connection from the pool.
636        // This connection MUST be kept for the lifetime of the guard because
637        // PostgreSQL advisory locks are session-scoped.
638        let mut connection = self
639            .pool
640            .acquire()
641            .await
642            .map_err(CoordinationError::DatabaseError)?;
643
644        // Attempt to acquire advisory lock (non-blocking) on this specific connection
645        let row = sqlx::query("SELECT pg_try_advisory_lock($1)")
646            .bind(lock_key)
647            .fetch_one(&mut *connection)
648            .await
649            .map_err(CoordinationError::DatabaseError)?;
650
651        let acquired: bool = row.get(0);
652
653        if acquired {
654            Ok(CoordinationGuard {
655                lock_key,
656                connection: Some(connection),
657            })
658        } else {
659            // Lock not acquired - connection will be returned to pool here
660            Err(CoordinationError::LeadershipNotAcquired)
661        }
662    }
663}
664
665impl ProjectorCoordinator for PostgresEventStore {
666    type Error = CoordinationError;
667    type Guard = CoordinationGuard;
668
669    async fn try_acquire(&self, subscription_name: &str) -> Result<Self::Guard, Self::Error> {
670        use std::collections::hash_map::DefaultHasher;
671        use std::hash::{Hash, Hasher};
672
673        // Derive advisory lock key from subscription name
674        let mut hasher = DefaultHasher::new();
675        subscription_name.hash(&mut hasher);
676        let lock_key = hasher.finish() as i64;
677
678        // Acquire a dedicated connection from the pool.
679        // This connection MUST be kept for the lifetime of the guard because
680        // PostgreSQL advisory locks are session-scoped.
681        let mut connection = self
682            .pool
683            .acquire()
684            .await
685            .map_err(CoordinationError::DatabaseError)?;
686
687        // Attempt to acquire advisory lock (non-blocking) on this specific connection
688        let row = sqlx::query("SELECT pg_try_advisory_lock($1)")
689            .bind(lock_key)
690            .fetch_one(&mut *connection)
691            .await
692            .map_err(CoordinationError::DatabaseError)?;
693
694        let acquired: bool = row.get(0);
695
696        if acquired {
697            Ok(CoordinationGuard {
698                lock_key,
699                connection: Some(connection),
700            })
701        } else {
702            // Lock not acquired - connection will be returned to pool here
703            Err(CoordinationError::LeadershipNotAcquired)
704        }
705    }
706}