distributed_lock_file/
lock.rs

1//! File-based distributed lock implementation.
2
3use std::path::PathBuf;
4use std::time::Duration;
5
6use distributed_lock_core::error::{LockError, LockResult};
7use distributed_lock_core::timeout::TimeoutValue;
8use distributed_lock_core::traits::DistributedLock;
9use tracing::{instrument, Span};
10
11use crate::handle::FileLockHandle;
12use crate::name::get_lock_file_name;
13
14/// A file-based distributed lock.
15///
16/// The lock is backed by a file in the provider's directory. The file name
17/// is derived from the lock name with proper escaping for filesystem safety.
18pub struct FileDistributedLock {
19    /// Full path to the lock file.
20    path: PathBuf,
21    /// Original lock name.
22    name: String,
23}
24
25impl FileDistributedLock {
26    /// Creates a lock for a specific file path.
27    ///
28    /// Use this when you want to lock a specific file rather than using
29    /// the provider's directory.
30    pub fn from_path(path: impl Into<PathBuf>) -> Result<Self, LockError> {
31        let path = path.into();
32        let name = path
33            .file_name()
34            .and_then(|n| n.to_str())
35            .ok_or_else(|| LockError::InvalidName("invalid file path".to_string()))?
36            .to_string();
37
38        Ok(Self { path, name })
39    }
40
41    /// Creates a lock from a directory and name.
42    pub(crate) fn new(directory: &std::path::Path, name: &str) -> LockResult<Self> {
43        let path = get_lock_file_name(directory, name)?;
44        Ok(Self {
45            path,
46            name: name.to_string(),
47        })
48    }
49
50    /// Returns the path to the lock file.
51    pub fn path(&self) -> &PathBuf {
52        &self.path
53    }
54
55    /// Attempts to acquire the lock without waiting.
56    ///
57    /// Returns `Ok(Some(handle))` if acquired, `Ok(None)` if unavailable.
58    async fn try_acquire_internal(&self) -> LockResult<Option<FileLockHandle>> {
59        use fd_lock::RwLock;
60        use std::fs::OpenOptions;
61        use std::io::ErrorKind;
62
63        // Ensure parent directory exists
64        if let Some(parent) = self.path.parent() {
65            std::fs::create_dir_all(parent).map_err(|e| match e.kind() {
66                ErrorKind::PermissionDenied => {
67                    LockError::Connection(Box::new(std::io::Error::new(
68                        ErrorKind::PermissionDenied,
69                        format!(
70                            "permission denied creating lock directory '{}': {}",
71                            parent.display(),
72                            e
73                        ),
74                    )))
75                }
76                ErrorKind::OutOfMemory => LockError::Connection(Box::new(std::io::Error::new(
77                    ErrorKind::OutOfMemory,
78                    format!(
79                        "insufficient storage creating lock directory '{}': {}",
80                        parent.display(),
81                        e
82                    ),
83                ))),
84                _ => LockError::Connection(Box::new(std::io::Error::other(format!(
85                    "failed to create lock directory '{}': {}",
86                    parent.display(),
87                    e
88                )))),
89            })?;
90        }
91
92        // Open or create the file
93        let file = OpenOptions::new()
94            .read(true)
95            .write(true)
96            .create(true)
97            .truncate(true)
98            .open(&self.path)
99            .map_err(|e| {
100                match e.kind() {
101                    ErrorKind::PermissionDenied => {
102                        LockError::Connection(Box::new(std::io::Error::new(
103                            ErrorKind::PermissionDenied,
104                            format!(
105                                "permission denied opening lock file '{}': {}",
106                                self.path.display(),
107                                e
108                            ),
109                        )))
110                    }
111                    ErrorKind::OutOfMemory => {
112                        LockError::Connection(Box::new(std::io::Error::new(
113                            ErrorKind::OutOfMemory,
114                            format!(
115                                "insufficient storage opening lock file '{}': {}",
116                                self.path.display(),
117                                e
118                            ),
119                        )))
120                    }
121                    ErrorKind::NotFound => {
122                        // This shouldn't happen since we use create(true), but handle it anyway
123                        LockError::Connection(Box::new(std::io::Error::new(
124                            ErrorKind::NotFound,
125                            format!(
126                                "lock file '{}' not found (parent directory may have been removed): {}",
127                                self.path.display(),
128                                e
129                            ),
130                        )))
131                    }
132                    ErrorKind::AlreadyExists => {
133                        // This shouldn't happen with create(true), but handle it
134                        LockError::Connection(Box::new(std::io::Error::new(
135                            ErrorKind::AlreadyExists,
136                            format!(
137                                "lock file '{}' already exists unexpectedly: {}",
138                                self.path.display(),
139                                e
140                            ),
141                        )))
142                    }
143                    _ => LockError::Connection(Box::new(std::io::Error::other(
144                        format!(
145                            "failed to open lock file '{}': {}",
146                            self.path.display(),
147                            e
148                        ),
149                    ))),
150                }
151            })?;
152
153        let lock_file = RwLock::new(file);
154
155        // Try to acquire the lock - the handle will do the actual acquisition
156        // This allows us to move lock_file without borrowing issues
157        match FileLockHandle::try_new(lock_file, self.path.clone()) {
158            Ok(handle) => Ok(Some(handle)),
159            Err(LockError::Backend(e)) => {
160                // Check if this is a "lock already held" error
161                let error_msg = e.to_string().to_lowercase();
162                if error_msg.contains("already held")
163                    || error_msg.contains("would block")
164                    || error_msg.contains("resource temporarily unavailable")
165                {
166                    // Lock is held by another process - this is expected
167                    Ok(None)
168                } else {
169                    // Unexpected backend error - wrap with context
170                    Err(LockError::Backend(Box::new(std::io::Error::other(
171                        format!(
172                            "failed to acquire lock on file '{}': {}",
173                            self.path.display(),
174                            e
175                        ),
176                    ))))
177                }
178            }
179            Err(e) => Err(e),
180        }
181    }
182}
183
184impl DistributedLock for FileDistributedLock {
185    type Handle = FileLockHandle;
186
187    fn name(&self) -> &str {
188        &self.name
189    }
190
191    #[instrument(skip(self), fields(lock.name = %self.name, lock.path = %self.path.display(), timeout = ?timeout, backend = "file"))]
192    async fn acquire(&self, timeout: Option<Duration>) -> LockResult<Self::Handle> {
193        let timeout_value = TimeoutValue::from(timeout);
194        let start = std::time::Instant::now();
195        Span::current().record("operation", "acquire");
196
197        // Busy-wait with exponential backoff and jitter
198        // Initial delay: 10ms (reduced from 50ms for faster initial retry)
199        let mut sleep_duration = Duration::from_millis(10);
200        const MAX_SLEEP: Duration = Duration::from_secs(1);
201        const MIN_SLEEP: Duration = Duration::from_millis(5);
202        const BACKOFF_MULTIPLIER: u32 = 2;
203
204        loop {
205            match self.try_acquire_internal().await {
206                Ok(Some(handle)) => {
207                    let elapsed = start.elapsed();
208                    Span::current().record("acquired", true);
209                    Span::current().record("elapsed_ms", elapsed.as_millis() as u64);
210                    return Ok(handle);
211                }
212                Ok(None) => {
213                    // Check timeout before sleeping
214                    if !timeout_value.is_infinite() {
215                        let elapsed = start.elapsed();
216                        let timeout_duration = timeout_value.as_duration().unwrap();
217                        if elapsed >= timeout_duration {
218                            Span::current().record("acquired", false);
219                            Span::current().record("error", "timeout");
220                            return Err(LockError::Timeout(timeout_duration));
221                        }
222
223                        // Don't sleep longer than remaining timeout
224                        let remaining = timeout_duration - elapsed;
225                        if sleep_duration > remaining {
226                            sleep_duration = remaining;
227                        }
228                    }
229
230                    // Add jitter (±25%) to avoid thundering herd problem
231                    // This helps when multiple processes are waiting for the same lock
232                    let jitter_range = sleep_duration.as_millis() as u64 / 4;
233                    let jitter = if jitter_range > 0 {
234                        // Use a simple hash of the current time for pseudo-random jitter
235                        // This avoids needing a random number generator dependency
236                        let nanos = start.elapsed().as_nanos() as u64;
237                        (nanos % (jitter_range * 2)).saturating_sub(jitter_range)
238                    } else {
239                        0
240                    };
241
242                    let sleep_with_jitter = sleep_duration
243                        .checked_add(Duration::from_millis(jitter))
244                        .unwrap_or(sleep_duration);
245
246                    // Sleep before retry
247                    tokio::time::sleep(sleep_with_jitter).await;
248
249                    // Exponential backoff: double the sleep duration, but cap at MAX_SLEEP
250                    sleep_duration = (sleep_duration * BACKOFF_MULTIPLIER)
251                        .min(MAX_SLEEP)
252                        .max(MIN_SLEEP);
253                }
254                Err(e) => return Err(e),
255            }
256        }
257    }
258
259    #[instrument(skip(self), fields(lock.name = %self.name, lock.path = %self.path.display(), backend = "file"))]
260    async fn try_acquire(&self) -> LockResult<Option<Self::Handle>> {
261        Span::current().record("operation", "try_acquire");
262        let result = self.try_acquire_internal().await;
263        match &result {
264            Ok(Some(_)) => {
265                Span::current().record("acquired", true);
266            }
267            Ok(None) => {
268                Span::current().record("acquired", false);
269                Span::current().record("reason", "lock_held");
270            }
271            Err(e) => {
272                Span::current().record("acquired", false);
273                Span::current().record("error", e.to_string());
274            }
275        }
276        result
277    }
278}