raft_engine/env/log_fd/
unix.rs1use crate::env::{Handle, Permission};
4
5use fail::fail_point;
6use log::error;
7
8use std::io::Result as IoResult;
9use std::os::unix::io::RawFd;
10
11use nix::errno::Errno;
12use nix::fcntl::{self, OFlag};
13use nix::sys::stat::Mode;
14use nix::sys::uio::{pread, pwrite};
15use nix::unistd::{close, ftruncate, lseek, Whence};
16use nix::NixPath;
17
18fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error {
19 let kind = std::io::Error::from(e).kind();
20 std::io::Error::new(kind, custom)
21}
22
23impl From<Permission> for OFlag {
24 fn from(value: Permission) -> OFlag {
25 match value {
26 Permission::ReadOnly => OFlag::O_RDONLY,
27 Permission::ReadWrite => OFlag::O_RDWR,
28 }
29 }
30}
31
32pub struct LogFd(RawFd);
40
41impl LogFd {
42 pub fn open<P: ?Sized + NixPath>(path: &P, perm: Permission) -> IoResult<Self> {
44 let mode = Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IRGRP | Mode::S_IROTH;
46 fail_point!("log_fd::open::fadvise_dontneed", |_| {
47 let fd =
48 LogFd(fcntl::open(path, perm.into(), mode).map_err(|e| from_nix_error(e, "open"))?);
49 #[cfg(target_os = "linux")]
50 unsafe {
51 extern crate libc;
52 libc::posix_fadvise64(fd.0, 0, fd.file_size()? as i64, libc::POSIX_FADV_DONTNEED);
53 }
54 Ok(fd)
55 });
56 Ok(LogFd(
57 fcntl::open(path, perm.into(), mode).map_err(|e| from_nix_error(e, "open"))?,
58 ))
59 }
60
61 pub fn create<P: ?Sized + NixPath>(path: &P) -> IoResult<Self> {
64 let flags = OFlag::O_RDWR | OFlag::O_CREAT;
65 let mode = Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IRGRP | Mode::S_IROTH;
67 let fd = fcntl::open(path, flags, mode).map_err(|e| from_nix_error(e, "open"))?;
68 Ok(LogFd(fd))
69 }
70
71 pub fn close(&self) -> IoResult<()> {
73 fail_point!("log_fd::close::err", |_| {
74 Err(from_nix_error(nix::Error::EINVAL, "fp"))
75 });
76 close(self.0).map_err(|e| from_nix_error(e, "close"))
77 }
78
79 pub fn read(&self, mut offset: usize, buf: &mut [u8]) -> IoResult<usize> {
82 let mut readed = 0;
83 while readed < buf.len() {
84 let bytes = match pread(self.0, &mut buf[readed..], offset as i64) {
85 Ok(bytes) => bytes,
86 Err(Errno::EINTR) => continue,
87 Err(e) => return Err(from_nix_error(e, "pread")),
88 };
89 if bytes == 0 {
91 break;
92 }
93 readed += bytes;
94 offset += bytes;
95 }
96 Ok(readed)
97 }
98
99 pub fn write(&self, mut offset: usize, content: &[u8]) -> IoResult<usize> {
102 fail_point!("log_fd::write::no_space_err", |_| {
103 Err(from_nix_error(nix::Error::ENOSPC, "nospace"))
104 });
105 let mut written = 0;
106 while written < content.len() {
107 let bytes = match pwrite(self.0, &content[written..], offset as i64) {
108 Ok(bytes) => bytes,
109 Err(Errno::EINTR) => continue,
110 Err(e) if e == Errno::ENOSPC => return Err(from_nix_error(e, "nospace")),
111 Err(e) => return Err(from_nix_error(e, "pwrite")),
112 };
113 if bytes == 0 {
114 break;
115 }
116 written += bytes;
117 offset += bytes;
118 }
119 Ok(written)
120 }
121
122 pub fn truncate(&self, offset: usize) -> IoResult<()> {
124 ftruncate(self.0, offset as i64).map_err(|e| from_nix_error(e, "ftruncate"))
125 }
126
127 #[allow(unused_variables)]
129 pub fn allocate(&self, offset: usize, size: usize) -> IoResult<()> {
130 #[cfg(target_os = "linux")]
131 {
132 if let Err(e) = fcntl::fallocate(
133 self.0,
134 fcntl::FallocateFlags::empty(),
135 offset as i64,
136 size as i64,
137 ) {
138 if e != nix::Error::EOPNOTSUPP {
139 return Err(from_nix_error(e, "fallocate"));
140 }
141 }
142 }
143 Ok(())
144 }
145}
146
147impl Handle for LogFd {
148 #[inline]
149 fn truncate(&self, offset: usize) -> IoResult<()> {
150 self.truncate(offset)
151 }
152
153 #[inline]
154 fn file_size(&self) -> IoResult<usize> {
155 fail_point!("log_fd::file_size::err", |_| {
156 Err(from_nix_error(nix::Error::EINVAL, "fp"))
157 });
158 lseek(self.0, 0, Whence::SeekEnd)
159 .map(|n| n as usize)
160 .map_err(|e| from_nix_error(e, "lseek"))
161 }
162
163 #[inline]
164 fn sync(&self) -> IoResult<()> {
165 fail_point!("log_fd::sync::err", |_| {
166 Err(from_nix_error(nix::Error::EINVAL, "fp"))
167 });
168 #[cfg(target_os = "linux")]
169 {
170 nix::unistd::fdatasync(self.0).map_err(|e| from_nix_error(e, "fdatasync"))
171 }
172 #[cfg(not(target_os = "linux"))]
173 {
174 nix::unistd::fsync(self.0).map_err(|e| from_nix_error(e, "fsync"))
175 }
176 }
177}
178
179impl Drop for LogFd {
180 fn drop(&mut self) {
181 if let Err(e) = self.close() {
182 error!("error while closing file: {e}");
183 }
184 }
185}