indexedlog/
lock.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::fs;
9use std::fs::File;
10use std::io;
11use std::path::Path;
12use std::path::PathBuf;
13
14use fs2::FileExt;
15use memmap2::MmapMut;
16use memmap2::MmapOptions;
17
18use crate::change_detect::SharedChangeDetector;
19use crate::errors::IoResultExt;
20use crate::utils;
21
22/// RAII style file locking.
23pub struct ScopedFileLock<'a> {
24    file: &'a mut File,
25}
26
27impl<'a> ScopedFileLock<'a> {
28    pub fn new(file: &'a mut File, exclusive: bool) -> io::Result<Self> {
29        if exclusive {
30            file.lock_exclusive()?;
31        } else {
32            file.lock_shared()?;
33        }
34        Ok(ScopedFileLock { file })
35    }
36}
37
38impl<'a> AsRef<File> for ScopedFileLock<'a> {
39    fn as_ref(&self) -> &File {
40        self.file
41    }
42}
43
44impl<'a> AsMut<File> for ScopedFileLock<'a> {
45    fn as_mut(&mut self) -> &mut File {
46        self.file
47    }
48}
49
50impl<'a> Drop for ScopedFileLock<'a> {
51    fn drop(&mut self) {
52        self.file.unlock().expect("unlock");
53    }
54}
55
56/// Prove that a directory was locked.
57pub struct ScopedDirLock {
58    file: File,
59    path: PathBuf,
60}
61
62/// Options for directory locking.
63pub struct DirLockOptions {
64    pub exclusive: bool,
65    pub non_blocking: bool,
66    pub file_name: &'static str,
67}
68
69/// Lock used to indicate that a reader is alive.
70///
71/// This crate generally depends on "append-only" for lock-free reads
72/// (appending data won't invalidate existing readers' mmaps).
73///
74/// However, certain operations (ex. repair) aren't "append-only".
75/// This reader lock is used to detect if any readers are alive so
76/// non-append-only operations can know whether it's safe to go on.
77pub(crate) static READER_LOCK_OPTS: DirLockOptions = DirLockOptions {
78    exclusive: false,
79    non_blocking: false,
80    // The reader lock uses a different file name from the write lock,
81    // because readers do not block normal writes (append-only + atomic
82    // replace), and normal writes do not block readers.
83    //
84    // If this is "" (using default lock file), then active readers will
85    // prevent normal writes, which is undesirable.
86    file_name: "rlock",
87};
88
89impl ScopedDirLock {
90    /// Lock the given directory with default options (exclusive, blocking).
91    pub fn new(path: &Path) -> crate::Result<Self> {
92        const DEFAULT_OPTIONS: DirLockOptions = DirLockOptions {
93            exclusive: true,
94            non_blocking: false,
95            file_name: "",
96        };
97        Self::new_with_options(path, &DEFAULT_OPTIONS)
98    }
99
100    /// Lock the given directory with advanced options.
101    ///
102    /// - `opts.file_name`: decides the lock file name. A directory can have
103    ///   multiple locks independent from one another using different `file_name`s.
104    /// - `opts.non_blocking`: if true, do not wait and return an error if lock
105    ///   cannot be obtained; if false, wait forever for the lock to be available.
106    /// - `opts.exclusive`: if true, ensure that no other locks are present for
107    ///   for the (dir, file_name); if false, allow other non-exclusive locks
108    ///   to co-exist.
109    pub fn new_with_options(dir: &Path, opts: &DirLockOptions) -> crate::Result<Self> {
110        let (path, file) = if opts.file_name.is_empty() {
111            let file = utils::open_dir(dir).context(dir, "cannot open for locking")?;
112            (dir.to_path_buf(), file)
113        } else {
114            let path = dir.join(opts.file_name);
115
116            // Write permission is used for mutable mmap.
117            // Read permission is to make some bogus NFS implementation happy:
118            //
119            //     # strace
120            //     openat(AT_FDCWD, "some/rlock", O_WRONLY|O_CREAT|O_CLOEXEC, 0666) = 11
121            //     flock(11, LOCK_SH)         = -1 EBADF (Bad file descriptor)
122            //
123            // Changing O_WRONLY to O_RDWR makes flock succeed. Not all NFS filesystems
124            // need this.
125            #[allow(unused_mut)]
126            let mut file = match fs::OpenOptions::new().read(true).write(true).open(&path) {
127                Ok(f) => f,
128                Err(e) if e.kind() == io::ErrorKind::NotFound => {
129                    // Create the file.
130                    utils::mkdir_p(dir)?;
131                    fs::OpenOptions::new()
132                        .read(true)
133                        .write(true)
134                        .create(true)
135                        .open(&path)
136                        .context(&path, "cannot create for locking")?
137                }
138                Err(e) => {
139                    return Err(e).context(&path, "cannot open for locking");
140                }
141            };
142
143            // Attempt to relax the permission for other users to use mmap.
144            #[cfg(unix)]
145            {
146                use std::fs::Permissions;
147                use std::os::unix::fs::PermissionsExt;
148                if let Ok(metadata) = file.metadata() {
149                    let mode = metadata.permissions().mode();
150                    let desired_mode = 0o666;
151                    if (mode & desired_mode) != desired_mode {
152                        let _ = file.set_permissions(Permissions::from_mode(mode | desired_mode));
153                    }
154                }
155            }
156
157            (path, file)
158        };
159
160        // Lock
161        lock_file(&file, opts.exclusive, opts.non_blocking).context(&path, || {
162            format!(
163                "cannot lock (exclusive: {}, non_blocking: {})",
164                opts.exclusive, opts.non_blocking,
165            )
166        })?;
167
168        let result = Self { file, path };
169        Ok(result)
170    }
171
172    /// Get the path to the directory being locked.
173    pub fn path(&self) -> &Path {
174        &self.path
175    }
176
177    /// Get a shared mutable mmap buffer of `len` bytes, backed by the lock file.
178    ///
179    /// The permission of the file is relaxed (rwrwrw). See [`ScopedDirLock::new`].
180    /// Avoid storing data that should be protected by filesystem ACL.
181    ///
182    /// The file is zero-filled on demand.
183    ///
184    /// The callsite should keep the return value to keep the mmap alive.
185    pub(crate) fn shared_mmap_mut(&self, len: usize) -> crate::Result<MmapMut> {
186        let metadata = self
187            .file
188            .metadata()
189            .context(&self.path, "cannot read metadata")?;
190        if len as u64 > metadata.len() {
191            self.file
192                .set_len(len as u64)
193                .context(&self.path, "cannot resize for mmap buffer")?;
194        }
195        unsafe { MmapOptions::new().len(len).map_mut(&self.file) }
196            .context(&self.path, "cannot mmap read-write")
197    }
198
199    /// Provide the `SharedChangeDetector` based on mmap.
200    pub(crate) fn shared_change_detector(&self) -> crate::Result<SharedChangeDetector> {
201        let mmap = self.shared_mmap_mut(std::mem::size_of::<u64>())?;
202        Ok(SharedChangeDetector::new(mmap))
203    }
204}
205
206impl Drop for ScopedDirLock {
207    fn drop(&mut self) {
208        let _ = unlock_file(&self.file);
209    }
210}
211
212fn lock_file(file: &File, exclusive: bool, non_blocking: bool) -> io::Result<()> {
213    #[cfg(windows)]
214    unsafe {
215        use std::os::windows::io::AsRawHandle;
216
217        use winapi::shared::minwindef::DWORD;
218        use winapi::um::fileapi::LockFileEx;
219        use winapi::um::minwinbase::LOCKFILE_EXCLUSIVE_LOCK;
220        use winapi::um::minwinbase::LOCKFILE_FAIL_IMMEDIATELY;
221        use winapi::um::minwinbase::OVERLAPPED;
222
223        let mut flags: DWORD = 0;
224        if exclusive {
225            flags |= LOCKFILE_EXCLUSIVE_LOCK;
226        }
227        if non_blocking {
228            flags |= LOCKFILE_FAIL_IMMEDIATELY;
229        }
230
231        // `overlapped` specifies the start position (u64) of locking.
232        let mut overlapped: OVERLAPPED = std::mem::zeroed();
233        overlapped.u.s_mut().Offset = u32::MAX - 1;
234        overlapped.u.s_mut().OffsetHigh = u32::MAX;
235
236        // Only lock 1 byte at the end of the u64 range, not the whole file.
237        let ret = LockFileEx(file.as_raw_handle(), flags, 0, 1, 0, &mut overlapped);
238        if ret == 0 {
239            return Err(io::Error::last_os_error());
240        }
241    }
242    #[cfg(not(windows))]
243    match (exclusive, non_blocking) {
244        (true, false) => file.lock_exclusive()?,
245        (true, true) => file.try_lock_exclusive()?,
246        (false, false) => file.lock_shared()?,
247        (false, true) => file.try_lock_shared()?,
248    }
249    Ok(())
250}
251
252fn unlock_file(file: &File) -> io::Result<()> {
253    #[cfg(windows)]
254    unsafe {
255        use std::os::windows::io::AsRawHandle;
256
257        use winapi::um::fileapi::UnlockFile;
258
259        // Only unlock the last 1 byte of the u64 range.
260        let ret = UnlockFile(file.as_raw_handle(), u32::MAX - 1, u32::MAX, 1, 0);
261        if ret == 0 {
262            return Err(io::Error::last_os_error());
263        }
264    }
265    #[cfg(not(windows))]
266    {
267        file.unlock()?;
268    }
269    Ok(())
270}
271
272#[cfg(test)]
273mod tests {
274    use std::fs::OpenOptions;
275    use std::io::Read;
276    use std::io::Seek;
277    use std::io::SeekFrom;
278    use std::io::Write;
279    use std::thread;
280
281    use tempfile::tempdir;
282
283    use super::*;
284
285    #[test]
286    fn test_file_lock() {
287        let dir = tempdir().unwrap();
288        let _file = OpenOptions::new()
289            .write(true)
290            .create(true)
291            .open(dir.path().join("f"))
292            .unwrap();
293
294        const N: usize = 40;
295
296        // Spawn N threads. Half read-only, half read-write.
297        let threads: Vec<_> = (0..N)
298            .map(|i| {
299                let i = i;
300                let path = dir.path().join("f");
301                thread::spawn(move || {
302                    let write = i % 2 == 0;
303                    let mut file = OpenOptions::new()
304                        .write(write)
305                        .read(true)
306                        .open(path)
307                        .unwrap();
308                    let mut lock = ScopedFileLock::new(&mut file, write).unwrap();
309                    let len = lock.as_mut().seek(SeekFrom::End(0)).unwrap();
310                    let ptr1 = lock.as_mut() as *const File;
311                    let ptr2 = lock.as_ref() as *const File;
312                    assert_eq!(ptr1, ptr2);
313                    assert_eq!(len % 227, 0);
314                    if write {
315                        for j in 0..227 {
316                            lock.as_mut().write_all(&[j]).expect("write");
317                            lock.as_mut().flush().expect("flush");
318                        }
319                    }
320                })
321            })
322            .collect();
323
324        // Wait for them
325        for thread in threads {
326            thread.join().expect("joined");
327        }
328
329        // Verify the file still has a correct content
330        let mut file = OpenOptions::new()
331            .read(true)
332            .open(dir.path().join("f"))
333            .unwrap();
334        let mut buf = [0u8; 227];
335        let expected: Vec<u8> = (0..227).collect();
336        for _ in 0..(N / 2) {
337            file.read_exact(&mut buf).expect("read");
338            assert_eq!(&buf[..], &expected[..]);
339        }
340    }
341
342    #[test]
343    fn test_dir_lock() {
344        let dir = tempdir().unwrap();
345        let _file = OpenOptions::new()
346            .write(true)
347            .create(true)
348            .open(dir.path().join("f"))
349            .unwrap();
350
351        const N: usize = 40;
352
353        // Spawn N threads. Half read-only, half read-write.
354        let threads: Vec<_> = (0..N)
355            .map(|i| {
356                let i = i;
357                let path = dir.path().join("f");
358                let dir_path = dir.path().to_path_buf();
359                thread::spawn(move || {
360                    let write = i % 2 == 0;
361                    let mut _lock = ScopedDirLock::new(&dir_path).unwrap();
362                    let mut file = OpenOptions::new()
363                        .write(write)
364                        .read(true)
365                        .open(path)
366                        .unwrap();
367                    let len = file.seek(SeekFrom::End(0)).unwrap();
368                    assert_eq!(len % 227, 0);
369                    if write {
370                        for j in 0..227 {
371                            file.write_all(&[j]).expect("write");
372                            file.flush().expect("flush");
373                        }
374                    }
375                })
376            })
377            .collect();
378
379        // Wait for them
380        for thread in threads {
381            thread.join().expect("joined");
382        }
383
384        // Verify the file still has a correct content
385        let mut file = OpenOptions::new()
386            .read(true)
387            .open(dir.path().join("f"))
388            .unwrap();
389        let mut buf = [0u8; 227];
390        let expected: Vec<u8> = (0..227).collect();
391        for _ in 0..(N / 2) {
392            file.read_exact(&mut buf).expect("read");
393            assert_eq!(&buf[..], &expected[..]);
394        }
395    }
396
397    #[test]
398    fn test_dir_lock_with_options() {
399        let dir = tempdir().unwrap();
400        let path = dir.path();
401        let opts = DirLockOptions {
402            file_name: "foo",
403            exclusive: false,
404            non_blocking: false,
405        };
406
407        // Multiple shared locks obtained with blocking on and off.
408        let l1 = ScopedDirLock::new_with_options(path, &opts).unwrap();
409        let l2 = ScopedDirLock::new_with_options(path, &opts).unwrap();
410
411        let opts = DirLockOptions {
412            non_blocking: true,
413            ..opts
414        };
415        let l3 = ScopedDirLock::new_with_options(path, &opts).unwrap();
416
417        // Exclusive lock cannot be obtained while shared locks are present.
418        let opts = DirLockOptions {
419            exclusive: true,
420            ..opts
421        };
422        assert!(ScopedDirLock::new_with_options(path, &opts).is_err());
423
424        // Exclusive lock can be obtained after releasing shared locks.
425        drop((l1, l2, l3));
426        let l4 = ScopedDirLock::new_with_options(path, &opts).unwrap();
427
428        // Exclusive lock cannot be obtained while other locks are present.
429        assert!(ScopedDirLock::new_with_options(path, &opts).is_err());
430
431        // Exclusive lock cannot be obtained with a different file name.
432        let opts = DirLockOptions {
433            file_name: "bar",
434            ..opts
435        };
436        assert!(ScopedDirLock::new_with_options(path, &opts).is_ok());
437
438        drop(l4);
439    }
440
441    #[test]
442    fn test_dir_lock_shared_buffer() {
443        let dir = tempdir().unwrap();
444        let path = dir.path();
445        let opts = DirLockOptions {
446            file_name: "foo",
447            exclusive: false,
448            non_blocking: false,
449        };
450
451        let mut v1 = &[1u8, 2, 3, 4, 5, 6, 7, 8][..];
452        let mut v2 = vec![0; v1.len()];
453
454        let l1 = ScopedDirLock::new_with_options(path, &opts).unwrap();
455        let mut buf1 = l1.shared_mmap_mut(v1.len()).unwrap();
456        buf1.as_mut().write_all(&v1).unwrap();
457
458        let l2 = ScopedDirLock::new_with_options(path, &opts).unwrap();
459        let buf2 = l2.shared_mmap_mut(v1.len()).unwrap();
460        buf2.as_ref().read_exact(&mut v2).unwrap();
461        assert_eq!(v1, v2);
462
463        // The buffer can be used even if the ScopedDirLock is dropped (which closes the files).
464        drop((l1, l2));
465        v1 = &[99u8, 98, 97, 96, 95, 94, 93, 92][..];
466        buf1.as_mut().write_all(&v1).unwrap();
467        buf2.as_ref().read_exact(&mut v2).unwrap();
468        assert_eq!(v1, v2);
469
470        // Buffer content is presisted on filesystem after dropping both lock and mmap states.
471        drop((buf1, buf2));
472        let l3 = ScopedDirLock::new_with_options(path, &opts).unwrap();
473        let buf3 = l3.shared_mmap_mut(v1.len()).unwrap();
474        buf3.as_ref().read_exact(&mut v2).unwrap();
475        assert_eq!(v1, v2);
476
477        // The mmap buffer can be used for SharedChangeDetector.
478        let d1 = l3.shared_change_detector().unwrap();
479        let d2 = l3.shared_change_detector().unwrap();
480        let d3 = d2.clone();
481
482        assert!(!d1.is_changed());
483        assert!(!d2.is_changed());
484        assert!(!d3.is_changed());
485
486        d1.set(1);
487
488        assert!(!d1.is_changed());
489        assert!(d2.is_changed());
490        assert!(d3.is_changed());
491
492        d2.set(1);
493        assert!(!d2.is_changed());
494        assert!(d3.is_changed());
495
496        d3.set(2);
497        assert!(d1.is_changed());
498        assert!(d2.is_changed());
499        assert!(!d3.is_changed());
500
501        d2.set(3);
502        assert!(d1.is_changed());
503        assert!(!d2.is_changed());
504        assert!(d3.is_changed());
505    }
506}