distributed_lock_file/
lock.rs1use std::path::PathBuf;
4use std::time::Duration;
5
6use distributed_lock_core::error::{LockError, LockResult};
7use distributed_lock_core::timeout::TimeoutValue;
8use distributed_lock_core::traits::DistributedLock;
9use tracing::{Span, instrument};
10
11use crate::handle::FileLockHandle;
12use crate::name::get_lock_file_name;
13
14pub struct FileDistributedLock {
19 path: PathBuf,
21 name: String,
23}
24
25impl FileDistributedLock {
26 pub fn from_path(path: impl Into<PathBuf>) -> Result<Self, LockError> {
31 let path = path.into();
32 let name = path
33 .file_name()
34 .and_then(|n| n.to_str())
35 .ok_or_else(|| LockError::InvalidName("invalid file path".to_string()))?
36 .to_string();
37
38 Ok(Self { path, name })
39 }
40
41 pub(crate) fn new(directory: &std::path::Path, name: &str) -> LockResult<Self> {
43 let path = get_lock_file_name(directory, name)?;
44 Ok(Self {
45 path,
46 name: name.to_string(),
47 })
48 }
49
50 pub fn path(&self) -> &PathBuf {
52 &self.path
53 }
54
55 async fn try_acquire_internal(&self) -> LockResult<Option<FileLockHandle>> {
59 use fd_lock::RwLock;
60 use std::fs::OpenOptions;
61 use std::io::ErrorKind;
62
63 const MAX_RETRIES: i32 = 1600;
64 let mut retry_count = 0;
65
66 loop {
67 if let Some(parent) = self.path.parent() {
69 let mut dir_retry_count = 0;
70 loop {
71 match std::fs::create_dir_all(parent) {
72 Ok(_) => break,
73 Err(e)
74 if dir_retry_count < MAX_RETRIES
75 && (e.kind() == ErrorKind::PermissionDenied
76 || e.kind() == ErrorKind::AlreadyExists) =>
77 {
78 dir_retry_count += 1;
79 continue;
80 }
81 Err(e) => {
82 return Err(LockError::Connection(Box::new(std::io::Error::new(
83 e.kind(),
84 format!(
85 "failed to ensure lock directory '{}' exists: {}",
86 parent.display(),
87 e
88 ),
89 ))));
90 }
91 }
92 }
93 }
94
95 let file_result = OpenOptions::new()
99 .read(true)
100 .write(true)
101 .create(true)
102 .truncate(false)
103 .open(&self.path);
104
105 let file = match file_result {
106 Ok(f) => f,
107 Err(e) => {
108 match e.kind() {
109 ErrorKind::PermissionDenied | ErrorKind::IsADirectory => {
110 if self.path.is_dir() {
112 return Err(LockError::InvalidName(format!(
113 "Failed to create lock file '{}' because it is already the name of a directory",
114 self.path.display()
115 )));
116 }
117
118 if retry_count < MAX_RETRIES && e.kind() == ErrorKind::PermissionDenied
121 {
122 retry_count += 1;
123 continue;
124 }
125
126 return Err(LockError::Connection(Box::new(std::io::Error::new(
127 e.kind(),
128 format!(
129 "failed to open lock file '{}': {}",
130 self.path.display(),
131 e
132 ),
133 ))));
134 }
135 ErrorKind::NotFound => {
136 if retry_count < MAX_RETRIES {
138 retry_count += 1;
139 continue;
140 }
141 return Err(LockError::Connection(Box::new(e)));
142 }
143 _ => return Err(LockError::Connection(Box::new(e))),
144 }
145 }
146 };
147
148 let lock_file = RwLock::new(file);
149
150 match FileLockHandle::try_new(lock_file, self.path.clone()) {
152 Ok(handle) => return Ok(Some(handle)),
153 Err(LockError::Backend(e)) => {
154 let error_msg = e.to_string().to_lowercase();
157 if error_msg.contains("already held")
158 || error_msg.contains("would block")
159 || error_msg.contains("resource temporarily unavailable")
160 || error_msg.contains("access is denied")
161 {
163 return Ok(None);
165 } else {
166 return Err(LockError::Backend(e));
168 }
169 }
170 Err(e) => return Err(e),
171 }
172 }
173 }
174}
175
176impl DistributedLock for FileDistributedLock {
177 type Handle = FileLockHandle;
178
179 fn name(&self) -> &str {
180 &self.name
181 }
182
183 #[instrument(skip(self), fields(lock.name = %self.name, lock.path = %self.path.display(), timeout = ?timeout, backend = "file"))]
184 async fn acquire(&self, timeout: Option<Duration>) -> LockResult<Self::Handle> {
185 let timeout_value = TimeoutValue::from(timeout);
186 let start = std::time::Instant::now();
187 Span::current().record("operation", "acquire");
188
189 let mut sleep_duration = Duration::from_millis(10);
192 const MAX_SLEEP: Duration = Duration::from_secs(1);
193 const MIN_SLEEP: Duration = Duration::from_millis(5);
194 const BACKOFF_MULTIPLIER: u32 = 2;
195
196 loop {
197 match self.try_acquire_internal().await {
198 Ok(Some(handle)) => {
199 let elapsed = start.elapsed();
200 Span::current().record("acquired", true);
201 Span::current().record("elapsed_ms", elapsed.as_millis() as u64);
202 return Ok(handle);
203 }
204 Ok(None) => {
205 if !timeout_value.is_infinite() {
207 let elapsed = start.elapsed();
208 let timeout_duration = timeout_value.as_duration().unwrap();
209 if elapsed >= timeout_duration {
210 Span::current().record("acquired", false);
211 Span::current().record("error", "timeout");
212 return Err(LockError::Timeout(timeout_duration));
213 }
214
215 let remaining = timeout_duration - elapsed;
217 if sleep_duration > remaining {
218 sleep_duration = remaining;
219 }
220 }
221
222 let jitter_range = sleep_duration.as_millis() as u64 / 4;
225 let jitter = if jitter_range > 0 {
226 let nanos = start.elapsed().as_nanos() as u64;
229 (nanos % (jitter_range * 2)).saturating_sub(jitter_range)
230 } else {
231 0
232 };
233
234 let sleep_with_jitter = sleep_duration
235 .checked_add(Duration::from_millis(jitter))
236 .unwrap_or(sleep_duration);
237
238 tokio::time::sleep(sleep_with_jitter).await;
240
241 sleep_duration = (sleep_duration * BACKOFF_MULTIPLIER)
243 .min(MAX_SLEEP)
244 .max(MIN_SLEEP);
245 }
246 Err(e) => return Err(e),
247 }
248 }
249 }
250
251 #[instrument(skip(self), fields(lock.name = %self.name, lock.path = %self.path.display(), backend = "file"))]
252 async fn try_acquire(&self) -> LockResult<Option<Self::Handle>> {
253 Span::current().record("operation", "try_acquire");
254 let result = self.try_acquire_internal().await;
255 match &result {
256 Ok(Some(_)) => {
257 Span::current().record("acquired", true);
258 }
259 Ok(None) => {
260 Span::current().record("acquired", false);
261 Span::current().record("reason", "lock_held");
262 }
263 Err(e) => {
264 Span::current().record("acquired", false);
265 Span::current().record("error", e.to_string());
266 }
267 }
268 result
269 }
270}