distributed_lock_file/
lock.rs1use std::path::PathBuf;
4use std::time::Duration;
5
6use distributed_lock_core::error::{LockError, LockResult};
7use distributed_lock_core::timeout::TimeoutValue;
8use distributed_lock_core::traits::DistributedLock;
9use tracing::{instrument, Span};
10
11use crate::handle::FileLockHandle;
12use crate::name::get_lock_file_name;
13
14pub struct FileDistributedLock {
19 path: PathBuf,
21 name: String,
23}
24
25impl FileDistributedLock {
26 pub fn from_path(path: impl Into<PathBuf>) -> Result<Self, LockError> {
31 let path = path.into();
32 let name = path
33 .file_name()
34 .and_then(|n| n.to_str())
35 .ok_or_else(|| LockError::InvalidName("invalid file path".to_string()))?
36 .to_string();
37
38 Ok(Self { path, name })
39 }
40
41 pub(crate) fn new(directory: &std::path::Path, name: &str) -> LockResult<Self> {
43 let path = get_lock_file_name(directory, name)?;
44 Ok(Self {
45 path,
46 name: name.to_string(),
47 })
48 }
49
50 pub fn path(&self) -> &PathBuf {
52 &self.path
53 }
54
55 async fn try_acquire_internal(&self) -> LockResult<Option<FileLockHandle>> {
59 use fd_lock::RwLock;
60 use std::fs::OpenOptions;
61 use std::io::ErrorKind;
62
63 if let Some(parent) = self.path.parent() {
65 std::fs::create_dir_all(parent).map_err(|e| match e.kind() {
66 ErrorKind::PermissionDenied => {
67 LockError::Connection(Box::new(std::io::Error::new(
68 ErrorKind::PermissionDenied,
69 format!(
70 "permission denied creating lock directory '{}': {}",
71 parent.display(),
72 e
73 ),
74 )))
75 }
76 ErrorKind::OutOfMemory => LockError::Connection(Box::new(std::io::Error::new(
77 ErrorKind::OutOfMemory,
78 format!(
79 "insufficient storage creating lock directory '{}': {}",
80 parent.display(),
81 e
82 ),
83 ))),
84 _ => LockError::Connection(Box::new(std::io::Error::other(format!(
85 "failed to create lock directory '{}': {}",
86 parent.display(),
87 e
88 )))),
89 })?;
90 }
91
92 let file = OpenOptions::new()
94 .read(true)
95 .write(true)
96 .create(true)
97 .truncate(true)
98 .open(&self.path)
99 .map_err(|e| {
100 match e.kind() {
101 ErrorKind::PermissionDenied => {
102 LockError::Connection(Box::new(std::io::Error::new(
103 ErrorKind::PermissionDenied,
104 format!(
105 "permission denied opening lock file '{}': {}",
106 self.path.display(),
107 e
108 ),
109 )))
110 }
111 ErrorKind::OutOfMemory => {
112 LockError::Connection(Box::new(std::io::Error::new(
113 ErrorKind::OutOfMemory,
114 format!(
115 "insufficient storage opening lock file '{}': {}",
116 self.path.display(),
117 e
118 ),
119 )))
120 }
121 ErrorKind::NotFound => {
122 LockError::Connection(Box::new(std::io::Error::new(
124 ErrorKind::NotFound,
125 format!(
126 "lock file '{}' not found (parent directory may have been removed): {}",
127 self.path.display(),
128 e
129 ),
130 )))
131 }
132 ErrorKind::AlreadyExists => {
133 LockError::Connection(Box::new(std::io::Error::new(
135 ErrorKind::AlreadyExists,
136 format!(
137 "lock file '{}' already exists unexpectedly: {}",
138 self.path.display(),
139 e
140 ),
141 )))
142 }
143 _ => LockError::Connection(Box::new(std::io::Error::other(
144 format!(
145 "failed to open lock file '{}': {}",
146 self.path.display(),
147 e
148 ),
149 ))),
150 }
151 })?;
152
153 let lock_file = RwLock::new(file);
154
155 match FileLockHandle::try_new(lock_file, self.path.clone()) {
158 Ok(handle) => Ok(Some(handle)),
159 Err(LockError::Backend(e)) => {
160 let error_msg = e.to_string().to_lowercase();
162 if error_msg.contains("already held")
163 || error_msg.contains("would block")
164 || error_msg.contains("resource temporarily unavailable")
165 {
166 Ok(None)
168 } else {
169 Err(LockError::Backend(Box::new(std::io::Error::other(
171 format!(
172 "failed to acquire lock on file '{}': {}",
173 self.path.display(),
174 e
175 ),
176 ))))
177 }
178 }
179 Err(e) => Err(e),
180 }
181 }
182}
183
184impl DistributedLock for FileDistributedLock {
185 type Handle = FileLockHandle;
186
187 fn name(&self) -> &str {
188 &self.name
189 }
190
191 #[instrument(skip(self), fields(lock.name = %self.name, lock.path = %self.path.display(), timeout = ?timeout, backend = "file"))]
192 async fn acquire(&self, timeout: Option<Duration>) -> LockResult<Self::Handle> {
193 let timeout_value = TimeoutValue::from(timeout);
194 let start = std::time::Instant::now();
195 Span::current().record("operation", "acquire");
196
197 let mut sleep_duration = Duration::from_millis(10);
200 const MAX_SLEEP: Duration = Duration::from_secs(1);
201 const MIN_SLEEP: Duration = Duration::from_millis(5);
202 const BACKOFF_MULTIPLIER: u32 = 2;
203
204 loop {
205 match self.try_acquire_internal().await {
206 Ok(Some(handle)) => {
207 let elapsed = start.elapsed();
208 Span::current().record("acquired", true);
209 Span::current().record("elapsed_ms", elapsed.as_millis() as u64);
210 return Ok(handle);
211 }
212 Ok(None) => {
213 if !timeout_value.is_infinite() {
215 let elapsed = start.elapsed();
216 let timeout_duration = timeout_value.as_duration().unwrap();
217 if elapsed >= timeout_duration {
218 Span::current().record("acquired", false);
219 Span::current().record("error", "timeout");
220 return Err(LockError::Timeout(timeout_duration));
221 }
222
223 let remaining = timeout_duration - elapsed;
225 if sleep_duration > remaining {
226 sleep_duration = remaining;
227 }
228 }
229
230 let jitter_range = sleep_duration.as_millis() as u64 / 4;
233 let jitter = if jitter_range > 0 {
234 let nanos = start.elapsed().as_nanos() as u64;
237 (nanos % (jitter_range * 2)).saturating_sub(jitter_range)
238 } else {
239 0
240 };
241
242 let sleep_with_jitter = sleep_duration
243 .checked_add(Duration::from_millis(jitter))
244 .unwrap_or(sleep_duration);
245
246 tokio::time::sleep(sleep_with_jitter).await;
248
249 sleep_duration = (sleep_duration * BACKOFF_MULTIPLIER)
251 .min(MAX_SLEEP)
252 .max(MIN_SLEEP);
253 }
254 Err(e) => return Err(e),
255 }
256 }
257 }
258
259 #[instrument(skip(self), fields(lock.name = %self.name, lock.path = %self.path.display(), backend = "file"))]
260 async fn try_acquire(&self) -> LockResult<Option<Self::Handle>> {
261 Span::current().record("operation", "try_acquire");
262 let result = self.try_acquire_internal().await;
263 match &result {
264 Ok(Some(_)) => {
265 Span::current().record("acquired", true);
266 }
267 Ok(None) => {
268 Span::current().record("acquired", false);
269 Span::current().record("reason", "lock_held");
270 }
271 Err(e) => {
272 Span::current().record("acquired", false);
273 Span::current().record("error", e.to_string());
274 }
275 }
276 result
277 }
278}