indexedlog/
utils.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::cell::RefCell;
9use std::fs;
10use std::fs::File;
11use std::hash::Hasher;
12use std::io;
13use std::io::Read;
14use std::io::Write;
15use std::path::Path;
16use std::sync::atomic;
17
18use memmap2::MmapOptions;
19use minibytes::Bytes;
20use twox_hash::XxHash;
21use twox_hash::XxHash32;
22
23use crate::config;
24use crate::errors::IoResultExt;
25use crate::errors::ResultExt;
26
27/// Return a read-only view of the entire file.
28///
29/// If `len` is `None`, detect the file length automatically.
30pub fn mmap_bytes(file: &File, len: Option<u64>) -> io::Result<Bytes> {
31    let actual_len = file.metadata()?.len();
32    let len = match len {
33        Some(len) => {
34            if len > actual_len {
35                return Err(io::Error::new(
36                    io::ErrorKind::UnexpectedEof,
37                    format!(
38                        "mmap length {} is greater than file size {}",
39                        len, actual_len
40                    ),
41                ));
42            } else {
43                len
44            }
45        }
46        None => actual_len,
47    };
48    if len == 0 {
49        Ok(Bytes::new())
50    } else {
51        let bytes = Bytes::from(unsafe { MmapOptions::new().len(len as usize).map(file) }?);
52        crate::page_out::track_mmap_buffer(&bytes);
53        Ok(bytes)
54    }
55}
56
57/// Similar to [`mmap_bytes`], but accepts a [`Path`] directly so the
58/// callsite does not need to open a [`File`].
59///
60/// Return [`crate::Result`], whcih makes it easier to use for error handling.
61pub fn mmap_path(path: &Path, len: u64) -> crate::Result<Bytes> {
62    if len == 0 {
63        Ok(Bytes::new())
64    } else {
65        let file = std::fs::OpenOptions::new()
66            .read(true)
67            .open(path)
68            .or_else(|err| {
69                if err.kind() == io::ErrorKind::NotFound {
70                    // This is marked as a corruption because proper NotFound
71                    // handling are on non-mmapped files. For example,
72                    // - Log uses "meta" not found to detect if a log is
73                    //   empty/newly created. "meta" is not mmapped. If
74                    //   "meta" is missing, it might be not a corruption,
75                    //   but just need to create Log in-place.
76                    // - RotateLog uses "latest" to detect if it is empty/
77                    //   newly created. "latest" is not mmapped. If "latest"
78                    //   is missing, it might be not a corruption, but just
79                    //   need to create RotateLog in-place.
80                    // - Index uses std::fs::OpenOptions to create new files
81                    //   on demand.
82                    // So mmapped files are not used to detect "whether we
83                    // should create a new empty structure, or not", the
84                    // NotFound issues are most likely "data corruption".
85                    Err(err).context(path, "cannot open for mmap").corruption()
86                } else {
87                    Err(err).context(path, "cannot open for mmap")
88                }
89            })?;
90        Ok(mmap_bytes(&file, Some(len)).context(path, "cannot mmap")?)
91    }
92}
93
94/// Open a path. Usually for locking purpose.
95///
96/// The path is assumed to be a directory. But this function does not do extra
97/// checks to make sure. If path is not a directory, this function might still
98/// succeed on unix systems.
99///
100/// Windows does not support opening a directory. This function will create a
101/// file called "lock" inside the directory and open that file instead.
102pub fn open_dir(lock_path: impl AsRef<Path>) -> io::Result<File> {
103    let path = lock_path.as_ref();
104    #[cfg(unix)]
105    {
106        File::open(path)
107    }
108    #[cfg(not(unix))]
109    {
110        let mut path = path.to_path_buf();
111        path.push("lock");
112        fs::OpenOptions::new().write(true).create(true).open(&path)
113    }
114}
115
116#[inline]
117pub fn xxhash<T: AsRef<[u8]>>(buf: T) -> u64 {
118    let mut xx = XxHash::default();
119    xx.write(buf.as_ref());
120    xx.finish()
121}
122
123#[inline]
124pub fn xxhash32<T: AsRef<[u8]>>(buf: T) -> u32 {
125    let mut xx = XxHash32::default();
126    xx.write(buf.as_ref());
127    xx.finish() as u32
128}
129
130/// Atomically create or replace a file with the given content.
131/// Attempt to use symlinks on unix if `SYMLINK_ATOMIC_WRITE` is set.
132pub fn atomic_write(
133    path: impl AsRef<Path>,
134    content: impl AsRef<[u8]>,
135    fsync: bool,
136) -> crate::Result<()> {
137    let path = path.as_ref();
138    let content = content.as_ref();
139    #[cfg(unix)]
140    {
141        // Try the symlink approach first. This makes sure the file is not
142        // empty.
143        //
144        // In theory the non-symlink approach (open, write, rename, close)
145        // should also result in a non-empty file. However, we have seen empty
146        // files sometimes without OS crashes (see https://fburl.com/bky2zu9e).
147        if config::SYMLINK_ATOMIC_WRITE.load(atomic::Ordering::SeqCst) {
148            if atomic_write_symlink(path, content).is_ok() {
149                return Ok(());
150            }
151        }
152    }
153    atomic_write_plain(path, content, fsync)
154}
155
156/// Atomically create or replace a file with the given content.
157/// Use a plain file. Do not use symlinks.
158pub fn atomic_write_plain(path: &Path, content: &[u8], fsync: bool) -> crate::Result<()> {
159    let result: crate::Result<_> = {
160        atomicfile::atomic_write(
161            path,
162            config::CHMOD_FILE.load(atomic::Ordering::SeqCst) as u32,
163            fsync || config::get_global_fsync(),
164            |file| {
165                file.write_all(content)?;
166                Ok(())
167            },
168        )
169        .context(path, "atomic_write error")?;
170
171        Ok(())
172    };
173    result.context(|| {
174        let content_desc = if content.len() < 128 {
175            format!("{:?}", content)
176        } else {
177            format!("<{}-byte slice>", content.len())
178        };
179        format!(
180            "  in atomic_write(path={:?}, content={}) ",
181            path, content_desc
182        )
183    })
184}
185
186/// Atomically create or replace a symlink with hex(content).
187#[cfg(unix)]
188fn atomic_write_symlink(path: &Path, content: &[u8]) -> io::Result<()> {
189    let encoded_content: String = {
190        // Use 'content' as-is if possible. Otherwise encode it using hex() and
191        // prefix with 'hex:'.
192        match std::str::from_utf8(content) {
193            Ok(s) if !s.starts_with("hex:") && !content.contains(&0) => s.to_string(),
194            _ => format!("hex:{}", hex::encode(content)),
195        }
196    };
197    let temp_path = loop {
198        let temp_path = path.with_extension(format!(".temp{}", rand::random::<u16>()));
199        match std::os::unix::fs::symlink(&encoded_content, &temp_path) {
200            Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
201                // Try another temp_path.
202                continue;
203            }
204            Err(e) => return Err(e),
205            Ok(_) => break temp_path,
206        }
207    };
208    let _ = fix_perm_symlink(&temp_path);
209    match fs::rename(&temp_path, path) {
210        Ok(_) => Ok(()),
211        Err(e) => {
212            // Clean up: Remove the temp file.
213            let _ = fs::remove_file(&temp_path);
214            Err(e)
215        }
216    }
217}
218
219/// Read the entire file written by `atomic_write`.
220///
221/// The read itself is only atomic if the file was written by `atomic_write`.
222/// This function handles format differences (symlink vs normal files)
223/// transparently.
224pub fn atomic_read(path: &Path) -> io::Result<Vec<u8>> {
225    #[cfg(unix)]
226    {
227        if let Ok(data) = atomic_read_symlink(path) {
228            return Ok(data);
229        }
230    }
231    let mut file = fs::OpenOptions::new().read(true).open(path)?;
232    let mut buf = Vec::new();
233    file.read_to_end(&mut buf)?;
234    Ok(buf)
235}
236
237/// Read and decode the symlink content.
238#[cfg(unix)]
239fn atomic_read_symlink(path: &Path) -> io::Result<Vec<u8>> {
240    use std::os::unix::ffi::OsStrExt;
241    let encoded_content = path.read_link()?;
242    let encoded_content = encoded_content.as_os_str().as_bytes();
243    if encoded_content.starts_with(b"hex:") {
244        // Decode hex.
245        Ok(hex::decode(&encoded_content[4..]).map_err(|_e| {
246            io::Error::new(
247                io::ErrorKind::InvalidData,
248                format!(
249                    "{:?}: cannot decode hex content {:?}",
250                    path, &encoded_content,
251                ),
252            )
253        })?)
254    } else {
255        Ok(encoded_content.to_vec())
256    }
257}
258/// Similar to `fs::create_dir_all`, but also attempts to chmod
259/// newly created directories on Unix.
260pub(crate) fn mkdir_p(dir: impl AsRef<Path>) -> crate::Result<()> {
261    let dir = dir.as_ref();
262    let try_mkdir_once = || -> io::Result<()> {
263        fs::create_dir(dir).map(|_| {
264            // fix_perm_path issues are not fatal
265            let _ = fix_perm_path(dir, true);
266        })
267    };
268    {
269        try_mkdir_once().or_else(|err| {
270            match err.kind() {
271                io::ErrorKind::AlreadyExists => return Ok(()),
272                io::ErrorKind::NotFound => {
273                    // Try to create the parent directory first.
274                    if let Some(parent) = dir.parent() {
275                        mkdir_p(parent)
276                            .context(|| format!("while trying to mkdir_p({:?})", dir))?;
277                        return try_mkdir_once()
278                            .context(dir, "cannot mkdir after mkdir its parent");
279                    }
280                }
281                io::ErrorKind::PermissionDenied => {
282                    // Try to fix permission aggressively.
283                    if let Some(parent) = dir.parent() {
284                        if fix_perm_path(parent, true).is_ok() {
285                            return try_mkdir_once().context(dir, "cannot mkdir").context(|| {
286                                format!(
287                                    "while trying to mkdir {:?} after fix_perm {:?}",
288                                    &dir, &parent
289                                )
290                            });
291                        }
292                    }
293                }
294                _ => {}
295            }
296            Err(err).context(dir, "cannot mkdir")
297        })
298    }
299}
300
301/// Attempt to chmod a path.
302pub(crate) fn fix_perm_path(path: &Path, is_dir: bool) -> io::Result<()> {
303    #[cfg(unix)]
304    {
305        let file = fs::OpenOptions::new().read(true).open(path)?;
306        fix_perm_file(&file, is_dir)?;
307    }
308    #[cfg(windows)]
309    {
310        let _ = (path, is_dir);
311    }
312    Ok(())
313}
314
315/// Attempt to chmod a file.
316pub(crate) fn fix_perm_file(file: &File, is_dir: bool) -> io::Result<()> {
317    #[cfg(unix)]
318    {
319        // chmod
320        let mode = if is_dir {
321            config::CHMOD_DIR.load(atomic::Ordering::SeqCst)
322        } else {
323            config::CHMOD_FILE.load(atomic::Ordering::SeqCst)
324        };
325        if mode >= 0 {
326            let perm = std::os::unix::fs::PermissionsExt::from_mode(mode as u32);
327            file.set_permissions(perm)?;
328        }
329    }
330    #[cfg(windows)]
331    {
332        let _ = (file, is_dir);
333    }
334    Ok(())
335}
336
337/// Attempt to chmod a symlink at the given path.
338pub(crate) fn fix_perm_symlink(path: &Path) -> io::Result<()> {
339    #[cfg(unix)]
340    {
341        use std::ffi::CString;
342        use std::os::unix::ffi::OsStrExt;
343
344        let path = CString::new(path.as_os_str().as_bytes())?;
345
346        // chmod
347        let mode = config::CHMOD_FILE.load(atomic::Ordering::SeqCst);
348        if mode >= 0 {
349            unsafe {
350                libc::fchmodat(
351                    libc::AT_FDCWD,
352                    path.as_ptr(),
353                    mode as _,
354                    libc::AT_SYMLINK_NOFOLLOW,
355                )
356            };
357        }
358    }
359    #[cfg(windows)]
360    {
361        let _ = path;
362    }
363    Ok(())
364}
365
366thread_local! {
367    static THREAD_RAND_U64: RefCell<u64> = RefCell::new(0);
368}
369
370/// Return a value that is likely changing over time.
371/// This is used to detect non-append-only cases.
372pub(crate) fn rand_u64() -> u64 {
373    if cfg!(test) {
374        // For tests, generate different numbers each time.
375        let count = THREAD_RAND_U64.with(|i| {
376            *i.borrow_mut() += 1;
377            *i.borrow()
378        });
379        // Ensure the vlq representation is likely stable by setting a high bit.
380        count | (1u64 << 63)
381    } else {
382        rand::random()
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389
390    fn check_atomic_read_write(data: &[u8]) {
391        config::SYMLINK_ATOMIC_WRITE.store(true, atomic::Ordering::SeqCst);
392        let dir = tempfile::tempdir().unwrap();
393        let path = dir.path().join("a");
394        let fsync = false;
395        atomic_write(&path, data, fsync).unwrap();
396        let read = atomic_read(&path).unwrap();
397        assert_eq!(data, &read[..]);
398    }
399
400    #[test]
401    fn test_atomic_read_write_roundtrip() {
402        for data in [
403            &b""[..],
404            b"hex",
405            b"hex:",
406            b"hex:abc",
407            b"hex:hex:abc",
408            b"abc",
409            b"\xe4\xbd\xa0\xe5\xa5\xbd",
410            b"hex:\xe4\xbd\xa0\xe5\xa5\xbd",
411            b"a\0b\0c\0",
412            b"hex:a\0b\0c\0",
413            b"\0\0\0\0\0\0",
414        ] {
415            check_atomic_read_write(data);
416        }
417    }
418
419    quickcheck::quickcheck! {
420        fn quickcheck_atomic_read_write_roundtrip(data: Vec<u8>) -> bool {
421            check_atomic_read_write(&data);
422            true
423        }
424    }
425}