distributed_lock_postgres/
provider.rs1use std::time::Duration;
4
5use distributed_lock_core::error::{LockError, LockResult};
6use distributed_lock_core::traits::{LockProvider, ReaderWriterLockProvider};
7
8use crate::connection::PostgresConnection;
9use crate::key::PostgresAdvisoryLockKey;
10use crate::lock::PostgresDistributedLock;
11use crate::rw_lock::PostgresDistributedReaderWriterLock;
12use sqlx::PgPool;
13
14pub struct PostgresLockProviderBuilder {
16 connection: Option<PostgresConnection>,
17 use_transaction: bool,
18 keepalive_cadence: Option<Duration>,
19}
20
21impl PostgresLockProviderBuilder {
22 pub fn new() -> Self {
24 Self {
25 connection: None,
26 use_transaction: false,
27 keepalive_cadence: None,
28 }
29 }
30
31 pub fn connection_string(mut self, conn_str: impl Into<String>) -> Self {
33 self.connection = Some(PostgresConnection::ConnectionString(conn_str.into()));
34 self
35 }
36
37 pub fn pool(mut self, pool: PgPool) -> Self {
39 self.connection = Some(PostgresConnection::Pool(pool));
40 self
41 }
42
43 pub fn use_transaction(mut self, use_transaction: bool) -> Self {
45 self.use_transaction = use_transaction;
46 self
47 }
48
49 pub fn keepalive_cadence(mut self, cadence: Duration) -> Self {
51 self.keepalive_cadence = Some(cadence);
52 self
53 }
54
55 pub async fn build(self) -> LockResult<PostgresLockProvider> {
57 let connection = self
58 .connection
59 .ok_or_else(|| LockError::InvalidName("connection not specified".to_string()))?;
60
61 let pool = connection.get_pool().await?;
62
63 Ok(PostgresLockProvider {
64 pool,
65 use_transaction: self.use_transaction,
66 keepalive_cadence: self.keepalive_cadence,
67 })
68 }
69}
70
71impl Default for PostgresLockProviderBuilder {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77pub struct PostgresLockProvider {
79 pool: PgPool,
80 use_transaction: bool,
81 keepalive_cadence: Option<Duration>,
82}
83
84impl PostgresLockProvider {
85 pub fn builder() -> PostgresLockProviderBuilder {
87 PostgresLockProviderBuilder::new()
88 }
89
90 pub async fn new(connection_string: impl Into<String>) -> LockResult<Self> {
92 Self::builder()
93 .connection_string(connection_string)
94 .build()
95 .await
96 }
97}
98
99impl LockProvider for PostgresLockProvider {
100 type Lock = PostgresDistributedLock;
101
102 fn create_lock(&self, name: &str) -> Self::Lock {
103 let key = PostgresAdvisoryLockKey::from_name(name, true)
104 .expect("failed to encode lock name as key");
105 PostgresDistributedLock::new(
106 name.to_string(),
107 key,
108 self.pool.clone(),
109 self.use_transaction,
110 self.keepalive_cadence,
111 )
112 }
113}
114
115impl ReaderWriterLockProvider for PostgresLockProvider {
116 type Lock = PostgresDistributedReaderWriterLock;
117
118 fn create_reader_writer_lock(&self, name: &str) -> Self::Lock {
119 let key = PostgresAdvisoryLockKey::from_name(name, true)
120 .expect("failed to encode lock name as key");
121 PostgresDistributedReaderWriterLock::new(
122 name.to_string(),
123 key,
124 self.pool.clone(),
125 self.use_transaction,
126 self.keepalive_cadence,
127 )
128 }
129}