distributed_lock_redis/
provider.rs1use std::time::Duration;
4
5use distributed_lock_core::error::{LockError, LockResult};
6use distributed_lock_core::traits::{LockProvider, ReaderWriterLockProvider, SemaphoreProvider};
7
8use crate::lock::RedisDistributedLock;
9use crate::rw_lock::RedisDistributedReaderWriterLock;
10use crate::semaphore::RedisDistributedSemaphore;
11use fred::prelude::*;
12
13pub struct RedisLockProviderBuilder {
15 urls: Vec<String>,
16 clients: Vec<RedisClient>,
17 expiry: Duration,
18 extension_cadence: Duration,
19 min_validity: Duration,
20}
21
22impl RedisLockProviderBuilder {
23 pub fn new() -> Self {
25 Self {
26 urls: vec![],
27 clients: vec![],
28 expiry: Duration::from_secs(30),
29 extension_cadence: Duration::from_secs(10),
30 min_validity: Duration::from_millis(27000), }
32 }
33
34 pub fn url(mut self, url: impl Into<String>) -> Self {
38 self.urls.push(url.into());
39 self
40 }
41
42 pub fn urls(mut self, urls: &[impl AsRef<str>]) -> Self {
44 for url in urls {
45 self.urls.push(url.as_ref().to_string());
46 }
47 self
48 }
49
50 pub fn client(mut self, client: RedisClient) -> Self {
52 self.clients.push(client);
53 self
54 }
55
56 pub fn expiry(mut self, expiry: Duration) -> Self {
58 self.expiry = expiry;
59 self
60 }
61
62 pub fn extension_cadence(mut self, cadence: Duration) -> Self {
64 self.extension_cadence = cadence;
65 self
66 }
67
68 pub fn min_validity(mut self, validity: Duration) -> Self {
73 self.min_validity = validity;
74 self
75 }
76
77 pub async fn build(self) -> LockResult<RedisLockProvider> {
79 let mut clients = self.clients;
80
81 for url in self.urls {
83 let config = RedisConfig::from_url(&url).map_err(|e| {
84 LockError::Connection(Box::new(std::io::Error::new(
85 std::io::ErrorKind::InvalidInput,
86 format!("invalid Redis URL: {}", e),
87 )))
88 })?;
89
90 let client = RedisClient::new(config, None, None, None);
91 client.connect();
92 client.wait_for_connect().await.map_err(|e| {
93 LockError::Connection(Box::new(std::io::Error::other(format!(
94 "failed to connect to Redis: {}",
95 e
96 ))))
97 })?;
98
99 clients.push(client);
100 }
101
102 if clients.is_empty() {
103 return Err(LockError::InvalidName(
104 "no Redis clients or URLs provided".to_string(),
105 ));
106 }
107
108 Ok(RedisLockProvider {
109 clients: clients.clone(),
110 primary_client: clients.into_iter().next().unwrap(), expiry: self.expiry,
112 extension_cadence: self.extension_cadence,
113 min_validity: self.min_validity,
114 })
115 }
116}
117
118impl Default for RedisLockProviderBuilder {
119 fn default() -> Self {
120 Self::new()
121 }
122}
123
124pub struct RedisLockProvider {
128 clients: Vec<RedisClient>,
130 primary_client: RedisClient,
132 expiry: Duration,
134 extension_cadence: Duration,
136 min_validity: Duration,
138}
139
140impl RedisLockProvider {
141 pub fn builder() -> RedisLockProviderBuilder {
143 RedisLockProviderBuilder::new()
144 }
145
146 pub async fn new(url: impl Into<String>) -> LockResult<Self> {
148 Self::builder().url(url).build().await
149 }
150}
151
152impl LockProvider for RedisLockProvider {
153 type Lock = RedisDistributedLock;
154
155 fn create_lock(&self, name: &str) -> Self::Lock {
156 RedisDistributedLock::new(
157 name.to_string(),
158 self.clients.clone(),
159 self.expiry,
160 self.min_validity,
161 self.extension_cadence,
162 )
163 }
164}
165
166impl ReaderWriterLockProvider for RedisLockProvider {
167 type Lock = RedisDistributedReaderWriterLock;
168
169 fn create_reader_writer_lock(&self, name: &str) -> Self::Lock {
170 RedisDistributedReaderWriterLock::new(
171 name.to_string(),
172 self.clients.clone(),
173 self.expiry,
174 self.min_validity,
175 self.extension_cadence,
176 )
177 }
178}
179
180impl SemaphoreProvider for RedisLockProvider {
181 type Semaphore = RedisDistributedSemaphore;
182
183 fn create_semaphore(&self, name: &str, max_count: u32) -> Self::Semaphore {
184 RedisDistributedSemaphore::new(
185 name.to_string(),
186 max_count,
187 self.primary_client.clone(),
188 self.expiry,
189 self.extension_cadence,
190 )
191 }
192}