distributed_lock_file/
lock.rs

1//! File-based distributed lock implementation.
2
3use std::path::PathBuf;
4use std::time::Duration;
5
6use distributed_lock_core::error::{LockError, LockResult};
7use distributed_lock_core::timeout::TimeoutValue;
8use distributed_lock_core::traits::DistributedLock;
9use tracing::{Span, instrument};
10
11use crate::handle::FileLockHandle;
12use crate::name::get_lock_file_name;
13
14/// A file-based distributed lock.
15///
16/// The lock is backed by a file in the provider's directory. The file name
17/// is derived from the lock name with proper escaping for filesystem safety.
18pub struct FileDistributedLock {
19    /// Full path to the lock file.
20    path: PathBuf,
21    /// Original lock name.
22    name: String,
23}
24
25impl FileDistributedLock {
26    /// Creates a lock for a specific file path.
27    ///
28    /// Use this when you want to lock a specific file rather than using
29    /// the provider's directory.
30    pub fn from_path(path: impl Into<PathBuf>) -> Result<Self, LockError> {
31        let path = path.into();
32        let name = path
33            .file_name()
34            .and_then(|n| n.to_str())
35            .ok_or_else(|| LockError::InvalidName("invalid file path".to_string()))?
36            .to_string();
37
38        Ok(Self { path, name })
39    }
40
41    /// Creates a lock from a directory and name.
42    pub(crate) fn new(directory: &std::path::Path, name: &str) -> LockResult<Self> {
43        let path = get_lock_file_name(directory, name)?;
44        Ok(Self {
45            path,
46            name: name.to_string(),
47        })
48    }
49
50    /// Returns the path to the lock file.
51    pub fn path(&self) -> &PathBuf {
52        &self.path
53    }
54
55    /// Attempts to acquire the lock without waiting.
56    ///
57    /// Returns `Ok(Some(handle))` if acquired, `Ok(None)` if unavailable.
58    async fn try_acquire_internal(&self) -> LockResult<Option<FileLockHandle>> {
59        use fd_lock::RwLock;
60        use std::fs::OpenOptions;
61        use std::io::ErrorKind;
62
63        const MAX_RETRIES: i32 = 1600;
64        let mut retry_count = 0;
65
66        loop {
67            // Ensure parent directory exists
68            if let Some(parent) = self.path.parent() {
69                let mut dir_retry_count = 0;
70                loop {
71                    match std::fs::create_dir_all(parent) {
72                        Ok(_) => break,
73                        Err(e)
74                            if dir_retry_count < MAX_RETRIES
75                                && (e.kind() == ErrorKind::PermissionDenied
76                                    || e.kind() == ErrorKind::AlreadyExists) =>
77                        {
78                            dir_retry_count += 1;
79                            continue;
80                        }
81                        Err(e) => {
82                            return Err(LockError::Connection(Box::new(std::io::Error::new(
83                                e.kind(),
84                                format!(
85                                    "failed to ensure lock directory '{}' exists: {}",
86                                    parent.display(),
87                                    e
88                                ),
89                            ))));
90                        }
91                    }
92                }
93            }
94
95            // Open or create the file
96            // We DON'T use truncate(true) here to avoid race conditions where a waiting
97            // process might truncate a held lock file.
98            let file_result = OpenOptions::new()
99                .read(true)
100                .write(true)
101                .create(true)
102                .truncate(false)
103                .open(&self.path);
104
105            let file = match file_result {
106                Ok(f) => f,
107                Err(e) => {
108                    match e.kind() {
109                        ErrorKind::PermissionDenied | ErrorKind::IsADirectory => {
110                            // The path might be a directory
111                            if self.path.is_dir() {
112                                return Err(LockError::InvalidName(format!(
113                                    "Failed to create lock file '{}' because it is already the name of a directory",
114                                    self.path.display()
115                                )));
116                            }
117
118                            // If it's a file but we got PermissionDenied, it might be a transient
119                            // error during concurrent creation/deletion, or it might be read-only.
120                            if retry_count < MAX_RETRIES && e.kind() == ErrorKind::PermissionDenied
121                            {
122                                retry_count += 1;
123                                continue;
124                            }
125
126                            return Err(LockError::Connection(Box::new(std::io::Error::new(
127                                e.kind(),
128                                format!(
129                                    "failed to open lock file '{}': {}",
130                                    self.path.display(),
131                                    e
132                                ),
133                            ))));
134                        }
135                        ErrorKind::NotFound => {
136                            // Transient error during concurrent creation/deletion
137                            if retry_count < MAX_RETRIES {
138                                retry_count += 1;
139                                continue;
140                            }
141                            return Err(LockError::Connection(Box::new(e)));
142                        }
143                        _ => return Err(LockError::Connection(Box::new(e))),
144                    }
145                }
146            };
147
148            let lock_file = RwLock::new(file);
149
150            // Try to acquire the lock - the handle will do the actual acquisition
151            match FileLockHandle::try_new(lock_file, self.path.clone()) {
152                Ok(handle) => return Ok(Some(handle)),
153                Err(LockError::Backend(e)) => {
154                    // Check if this is a "lock already held" error
155                    // Most OS lock errors will fall into this category when the lock is held
156                    let error_msg = e.to_string().to_lowercase();
157                    if error_msg.contains("already held")
158                        || error_msg.contains("would block")
159                        || error_msg.contains("resource temporarily unavailable")
160                        || error_msg.contains("access is denied")
161                    // Windows error for locked file
162                    {
163                        // Lock is held by another process - this is expected
164                        return Ok(None);
165                    } else {
166                        // Unexpected backend error
167                        return Err(LockError::Backend(e));
168                    }
169                }
170                Err(e) => return Err(e),
171            }
172        }
173    }
174}
175
176impl DistributedLock for FileDistributedLock {
177    type Handle = FileLockHandle;
178
179    fn name(&self) -> &str {
180        &self.name
181    }
182
183    #[instrument(skip(self), fields(lock.name = %self.name, lock.path = %self.path.display(), timeout = ?timeout, backend = "file"))]
184    async fn acquire(&self, timeout: Option<Duration>) -> LockResult<Self::Handle> {
185        let timeout_value = TimeoutValue::from(timeout);
186        let start = std::time::Instant::now();
187        Span::current().record("operation", "acquire");
188
189        // Busy-wait with exponential backoff and jitter
190        // Initial delay: 10ms (reduced from 50ms for faster initial retry)
191        let mut sleep_duration = Duration::from_millis(10);
192        const MAX_SLEEP: Duration = Duration::from_secs(1);
193        const MIN_SLEEP: Duration = Duration::from_millis(5);
194        const BACKOFF_MULTIPLIER: u32 = 2;
195
196        loop {
197            match self.try_acquire_internal().await {
198                Ok(Some(handle)) => {
199                    let elapsed = start.elapsed();
200                    Span::current().record("acquired", true);
201                    Span::current().record("elapsed_ms", elapsed.as_millis() as u64);
202                    return Ok(handle);
203                }
204                Ok(None) => {
205                    // Check timeout before sleeping
206                    if !timeout_value.is_infinite() {
207                        let elapsed = start.elapsed();
208                        let timeout_duration = timeout_value.as_duration().unwrap();
209                        if elapsed >= timeout_duration {
210                            Span::current().record("acquired", false);
211                            Span::current().record("error", "timeout");
212                            return Err(LockError::Timeout(timeout_duration));
213                        }
214
215                        // Don't sleep longer than remaining timeout
216                        let remaining = timeout_duration - elapsed;
217                        if sleep_duration > remaining {
218                            sleep_duration = remaining;
219                        }
220                    }
221
222                    // Add jitter (±25%) to avoid thundering herd problem
223                    // This helps when multiple processes are waiting for the same lock
224                    let jitter_range = sleep_duration.as_millis() as u64 / 4;
225                    let jitter = if jitter_range > 0 {
226                        // Use a simple hash of the current time for pseudo-random jitter
227                        // This avoids needing a random number generator dependency
228                        let nanos = start.elapsed().as_nanos() as u64;
229                        (nanos % (jitter_range * 2)).saturating_sub(jitter_range)
230                    } else {
231                        0
232                    };
233
234                    let sleep_with_jitter = sleep_duration
235                        .checked_add(Duration::from_millis(jitter))
236                        .unwrap_or(sleep_duration);
237
238                    // Sleep before retry
239                    tokio::time::sleep(sleep_with_jitter).await;
240
241                    // Exponential backoff: double the sleep duration, but cap at MAX_SLEEP
242                    sleep_duration = (sleep_duration * BACKOFF_MULTIPLIER)
243                        .min(MAX_SLEEP)
244                        .max(MIN_SLEEP);
245                }
246                Err(e) => return Err(e),
247            }
248        }
249    }
250
251    #[instrument(skip(self), fields(lock.name = %self.name, lock.path = %self.path.display(), backend = "file"))]
252    async fn try_acquire(&self) -> LockResult<Option<Self::Handle>> {
253        Span::current().record("operation", "try_acquire");
254        let result = self.try_acquire_internal().await;
255        match &result {
256            Ok(Some(_)) => {
257                Span::current().record("acquired", true);
258            }
259            Ok(None) => {
260                Span::current().record("acquired", false);
261                Span::current().record("reason", "lock_held");
262            }
263            Err(e) => {
264                Span::current().record("acquired", false);
265                Span::current().record("error", e.to_string());
266            }
267        }
268        result
269    }
270}