raft_engine/env/log_fd/
unix.rs

1// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.
2
3use crate::env::{Handle, Permission};
4
5use fail::fail_point;
6use log::error;
7
8use std::io::Result as IoResult;
9use std::os::unix::io::RawFd;
10
11use nix::errno::Errno;
12use nix::fcntl::{self, OFlag};
13use nix::sys::stat::Mode;
14use nix::sys::uio::{pread, pwrite};
15use nix::unistd::{close, ftruncate, lseek, Whence};
16use nix::NixPath;
17
18fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error {
19    let kind = std::io::Error::from(e).kind();
20    std::io::Error::new(kind, custom)
21}
22
23impl From<Permission> for OFlag {
24    fn from(value: Permission) -> OFlag {
25        match value {
26            Permission::ReadOnly => OFlag::O_RDONLY,
27            Permission::ReadWrite => OFlag::O_RDWR,
28        }
29    }
30}
31
32/// A RAII-style low-level file. Errors occurred during automatic resource
33/// release are logged and ignored.
34///
35/// A [`LogFd`] is essentially a thin wrapper around [`RawFd`]. It's only
36/// supported on *Unix*, and primarily optimized for *Linux*.
37///
38/// All [`LogFd`] instances are opened with read and write permission.
39pub struct LogFd(RawFd);
40
41impl LogFd {
42    /// Opens a file with the given `path`.
43    pub fn open<P: ?Sized + NixPath>(path: &P, perm: Permission) -> IoResult<Self> {
44        // Permission 644
45        let mode = Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IRGRP | Mode::S_IROTH;
46        fail_point!("log_fd::open::fadvise_dontneed", |_| {
47            let fd =
48                LogFd(fcntl::open(path, perm.into(), mode).map_err(|e| from_nix_error(e, "open"))?);
49            #[cfg(target_os = "linux")]
50            unsafe {
51                extern crate libc;
52                libc::posix_fadvise64(fd.0, 0, fd.file_size()? as i64, libc::POSIX_FADV_DONTNEED);
53            }
54            Ok(fd)
55        });
56        Ok(LogFd(
57            fcntl::open(path, perm.into(), mode).map_err(|e| from_nix_error(e, "open"))?,
58        ))
59    }
60
61    /// Opens a file with the given `path`. The specified file will be created
62    /// first if not exists.
63    pub fn create<P: ?Sized + NixPath>(path: &P) -> IoResult<Self> {
64        let flags = OFlag::O_RDWR | OFlag::O_CREAT;
65        // Permission 644
66        let mode = Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IRGRP | Mode::S_IROTH;
67        let fd = fcntl::open(path, flags, mode).map_err(|e| from_nix_error(e, "open"))?;
68        Ok(LogFd(fd))
69    }
70
71    /// Closes the file.
72    pub fn close(&self) -> IoResult<()> {
73        fail_point!("log_fd::close::err", |_| {
74            Err(from_nix_error(nix::Error::EINVAL, "fp"))
75        });
76        close(self.0).map_err(|e| from_nix_error(e, "close"))
77    }
78
79    /// Reads some bytes starting at `offset` from this file into the specified
80    /// buffer. Returns how many bytes were read.
81    pub fn read(&self, mut offset: usize, buf: &mut [u8]) -> IoResult<usize> {
82        let mut readed = 0;
83        while readed < buf.len() {
84            let bytes = match pread(self.0, &mut buf[readed..], offset as i64) {
85                Ok(bytes) => bytes,
86                Err(Errno::EINTR) => continue,
87                Err(e) => return Err(from_nix_error(e, "pread")),
88            };
89            // EOF
90            if bytes == 0 {
91                break;
92            }
93            readed += bytes;
94            offset += bytes;
95        }
96        Ok(readed)
97    }
98
99    /// Writes some bytes to this file starting at `offset`. Returns how many
100    /// bytes were written.
101    pub fn write(&self, mut offset: usize, content: &[u8]) -> IoResult<usize> {
102        fail_point!("log_fd::write::no_space_err", |_| {
103            Err(from_nix_error(nix::Error::ENOSPC, "nospace"))
104        });
105        let mut written = 0;
106        while written < content.len() {
107            let bytes = match pwrite(self.0, &content[written..], offset as i64) {
108                Ok(bytes) => bytes,
109                Err(Errno::EINTR) => continue,
110                Err(e) if e == Errno::ENOSPC => return Err(from_nix_error(e, "nospace")),
111                Err(e) => return Err(from_nix_error(e, "pwrite")),
112            };
113            if bytes == 0 {
114                break;
115            }
116            written += bytes;
117            offset += bytes;
118        }
119        Ok(written)
120    }
121
122    /// Truncates all data after `offset`.
123    pub fn truncate(&self, offset: usize) -> IoResult<()> {
124        ftruncate(self.0, offset as i64).map_err(|e| from_nix_error(e, "ftruncate"))
125    }
126
127    /// Attempts to allocate space for `size` bytes starting at `offset`.
128    #[allow(unused_variables)]
129    pub fn allocate(&self, offset: usize, size: usize) -> IoResult<()> {
130        #[cfg(target_os = "linux")]
131        {
132            if let Err(e) = fcntl::fallocate(
133                self.0,
134                fcntl::FallocateFlags::empty(),
135                offset as i64,
136                size as i64,
137            ) {
138                if e != nix::Error::EOPNOTSUPP {
139                    return Err(from_nix_error(e, "fallocate"));
140                }
141            }
142        }
143        Ok(())
144    }
145}
146
147impl Handle for LogFd {
148    #[inline]
149    fn truncate(&self, offset: usize) -> IoResult<()> {
150        self.truncate(offset)
151    }
152
153    #[inline]
154    fn file_size(&self) -> IoResult<usize> {
155        fail_point!("log_fd::file_size::err", |_| {
156            Err(from_nix_error(nix::Error::EINVAL, "fp"))
157        });
158        lseek(self.0, 0, Whence::SeekEnd)
159            .map(|n| n as usize)
160            .map_err(|e| from_nix_error(e, "lseek"))
161    }
162
163    #[inline]
164    fn sync(&self) -> IoResult<()> {
165        fail_point!("log_fd::sync::err", |_| {
166            Err(from_nix_error(nix::Error::EINVAL, "fp"))
167        });
168        #[cfg(target_os = "linux")]
169        {
170            nix::unistd::fdatasync(self.0).map_err(|e| from_nix_error(e, "fdatasync"))
171        }
172        #[cfg(not(target_os = "linux"))]
173        {
174            nix::unistd::fsync(self.0).map_err(|e| from_nix_error(e, "fsync"))
175        }
176    }
177}
178
179impl Drop for LogFd {
180    fn drop(&mut self) {
181        if let Err(e) = self.close() {
182            error!("error while closing file: {e}");
183        }
184    }
185}