distributed_lock_postgres/
provider.rs

1//! PostgreSQL lock provider implementation.
2
3use std::time::Duration;
4
5use distributed_lock_core::error::{LockError, LockResult};
6use distributed_lock_core::traits::{LockProvider, ReaderWriterLockProvider};
7
8use crate::connection::PostgresConnection;
9use crate::key::PostgresAdvisoryLockKey;
10use crate::lock::PostgresDistributedLock;
11use crate::rw_lock::PostgresDistributedReaderWriterLock;
12use sqlx::PgPool;
13
14/// Builder for PostgreSQL lock provider configuration.
15pub struct PostgresLockProviderBuilder {
16    connection: Option<PostgresConnection>,
17    use_transaction: bool,
18    keepalive_cadence: Option<Duration>,
19}
20
21impl PostgresLockProviderBuilder {
22    /// Creates a new builder.
23    pub fn new() -> Self {
24        Self {
25            connection: None,
26            use_transaction: false,
27            keepalive_cadence: None,
28        }
29    }
30
31    /// Sets the PostgreSQL connection string.
32    pub fn connection_string(mut self, conn_str: impl Into<String>) -> Self {
33        self.connection = Some(PostgresConnection::ConnectionString(conn_str.into()));
34        self
35    }
36
37    /// Sets an existing connection pool.
38    pub fn pool(mut self, pool: PgPool) -> Self {
39        self.connection = Some(PostgresConnection::Pool(pool));
40        self
41    }
42
43    /// Sets whether to use transaction-scoped locks.
44    pub fn use_transaction(mut self, use_transaction: bool) -> Self {
45        self.use_transaction = use_transaction;
46        self
47    }
48
49    /// Sets the keepalive cadence for long-held locks.
50    pub fn keepalive_cadence(mut self, cadence: Duration) -> Self {
51        self.keepalive_cadence = Some(cadence);
52        self
53    }
54
55    /// Builds the provider.
56    pub async fn build(self) -> LockResult<PostgresLockProvider> {
57        let connection = self
58            .connection
59            .ok_or_else(|| LockError::InvalidName("connection not specified".to_string()))?;
60
61        let pool = connection.get_pool().await?;
62
63        Ok(PostgresLockProvider {
64            pool,
65            use_transaction: self.use_transaction,
66            keepalive_cadence: self.keepalive_cadence,
67        })
68    }
69}
70
71impl Default for PostgresLockProviderBuilder {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77/// Provider for PostgreSQL-based distributed locks.
78pub struct PostgresLockProvider {
79    pool: PgPool,
80    use_transaction: bool,
81    keepalive_cadence: Option<Duration>,
82}
83
84impl PostgresLockProvider {
85    /// Returns a new builder for configuring the provider.
86    pub fn builder() -> PostgresLockProviderBuilder {
87        PostgresLockProviderBuilder::new()
88    }
89
90    /// Creates a provider using the specified connection string.
91    pub async fn new(connection_string: impl Into<String>) -> LockResult<Self> {
92        Self::builder()
93            .connection_string(connection_string)
94            .build()
95            .await
96    }
97}
98
99impl LockProvider for PostgresLockProvider {
100    type Lock = PostgresDistributedLock;
101
102    fn create_lock(&self, name: &str) -> Self::Lock {
103        let key = PostgresAdvisoryLockKey::from_name(name, true)
104            .expect("failed to encode lock name as key");
105        PostgresDistributedLock::new(
106            name.to_string(),
107            key,
108            self.pool.clone(),
109            self.use_transaction,
110            self.keepalive_cadence,
111        )
112    }
113}
114
115impl ReaderWriterLockProvider for PostgresLockProvider {
116    type Lock = PostgresDistributedReaderWriterLock;
117
118    fn create_reader_writer_lock(&self, name: &str) -> Self::Lock {
119        let key = PostgresAdvisoryLockKey::from_name(name, true)
120            .expect("failed to encode lock name as key");
121        PostgresDistributedReaderWriterLock::new(
122            name.to_string(),
123            key,
124            self.pool.clone(),
125            self.use_transaction,
126            self.keepalive_cadence,
127        )
128    }
129}