1use std::cell::RefCell;
9use std::fs;
10use std::fs::File;
11use std::hash::Hasher;
12use std::io;
13use std::io::Read;
14use std::io::Write;
15use std::path::Path;
16use std::sync::atomic;
17
18use memmap2::MmapOptions;
19use minibytes::Bytes;
20use twox_hash::XxHash;
21use twox_hash::XxHash32;
22
23use crate::config;
24use crate::errors::IoResultExt;
25use crate::errors::ResultExt;
26
27pub fn mmap_bytes(file: &File, len: Option<u64>) -> io::Result<Bytes> {
31 let actual_len = file.metadata()?.len();
32 let len = match len {
33 Some(len) => {
34 if len > actual_len {
35 return Err(io::Error::new(
36 io::ErrorKind::UnexpectedEof,
37 format!(
38 "mmap length {} is greater than file size {}",
39 len, actual_len
40 ),
41 ));
42 } else {
43 len
44 }
45 }
46 None => actual_len,
47 };
48 if len == 0 {
49 Ok(Bytes::new())
50 } else {
51 let bytes = Bytes::from(unsafe { MmapOptions::new().len(len as usize).map(file) }?);
52 crate::page_out::track_mmap_buffer(&bytes);
53 Ok(bytes)
54 }
55}
56
57pub fn mmap_path(path: &Path, len: u64) -> crate::Result<Bytes> {
62 if len == 0 {
63 Ok(Bytes::new())
64 } else {
65 let file = std::fs::OpenOptions::new()
66 .read(true)
67 .open(path)
68 .or_else(|err| {
69 if err.kind() == io::ErrorKind::NotFound {
70 Err(err).context(path, "cannot open for mmap").corruption()
86 } else {
87 Err(err).context(path, "cannot open for mmap")
88 }
89 })?;
90 Ok(mmap_bytes(&file, Some(len)).context(path, "cannot mmap")?)
91 }
92}
93
94pub fn open_dir(lock_path: impl AsRef<Path>) -> io::Result<File> {
103 let path = lock_path.as_ref();
104 #[cfg(unix)]
105 {
106 File::open(path)
107 }
108 #[cfg(not(unix))]
109 {
110 let mut path = path.to_path_buf();
111 path.push("lock");
112 fs::OpenOptions::new().write(true).create(true).open(&path)
113 }
114}
115
116#[inline]
117pub fn xxhash<T: AsRef<[u8]>>(buf: T) -> u64 {
118 let mut xx = XxHash::default();
119 xx.write(buf.as_ref());
120 xx.finish()
121}
122
123#[inline]
124pub fn xxhash32<T: AsRef<[u8]>>(buf: T) -> u32 {
125 let mut xx = XxHash32::default();
126 xx.write(buf.as_ref());
127 xx.finish() as u32
128}
129
130pub fn atomic_write(
133 path: impl AsRef<Path>,
134 content: impl AsRef<[u8]>,
135 fsync: bool,
136) -> crate::Result<()> {
137 let path = path.as_ref();
138 let content = content.as_ref();
139 #[cfg(unix)]
140 {
141 if config::SYMLINK_ATOMIC_WRITE.load(atomic::Ordering::SeqCst) {
148 if atomic_write_symlink(path, content).is_ok() {
149 return Ok(());
150 }
151 }
152 }
153 atomic_write_plain(path, content, fsync)
154}
155
156pub fn atomic_write_plain(path: &Path, content: &[u8], fsync: bool) -> crate::Result<()> {
159 let result: crate::Result<_> = {
160 atomicfile::atomic_write(
161 path,
162 config::CHMOD_FILE.load(atomic::Ordering::SeqCst) as u32,
163 fsync || config::get_global_fsync(),
164 |file| {
165 file.write_all(content)?;
166 Ok(())
167 },
168 )
169 .context(path, "atomic_write error")?;
170
171 Ok(())
172 };
173 result.context(|| {
174 let content_desc = if content.len() < 128 {
175 format!("{:?}", content)
176 } else {
177 format!("<{}-byte slice>", content.len())
178 };
179 format!(
180 " in atomic_write(path={:?}, content={}) ",
181 path, content_desc
182 )
183 })
184}
185
186#[cfg(unix)]
188fn atomic_write_symlink(path: &Path, content: &[u8]) -> io::Result<()> {
189 let encoded_content: String = {
190 match std::str::from_utf8(content) {
193 Ok(s) if !s.starts_with("hex:") && !content.contains(&0) => s.to_string(),
194 _ => format!("hex:{}", hex::encode(content)),
195 }
196 };
197 let temp_path = loop {
198 let temp_path = path.with_extension(format!(".temp{}", rand::random::<u16>()));
199 match std::os::unix::fs::symlink(&encoded_content, &temp_path) {
200 Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
201 continue;
203 }
204 Err(e) => return Err(e),
205 Ok(_) => break temp_path,
206 }
207 };
208 let _ = fix_perm_symlink(&temp_path);
209 match fs::rename(&temp_path, path) {
210 Ok(_) => Ok(()),
211 Err(e) => {
212 let _ = fs::remove_file(&temp_path);
214 Err(e)
215 }
216 }
217}
218
219pub fn atomic_read(path: &Path) -> io::Result<Vec<u8>> {
225 #[cfg(unix)]
226 {
227 if let Ok(data) = atomic_read_symlink(path) {
228 return Ok(data);
229 }
230 }
231 let mut file = fs::OpenOptions::new().read(true).open(path)?;
232 let mut buf = Vec::new();
233 file.read_to_end(&mut buf)?;
234 Ok(buf)
235}
236
237#[cfg(unix)]
239fn atomic_read_symlink(path: &Path) -> io::Result<Vec<u8>> {
240 use std::os::unix::ffi::OsStrExt;
241 let encoded_content = path.read_link()?;
242 let encoded_content = encoded_content.as_os_str().as_bytes();
243 if encoded_content.starts_with(b"hex:") {
244 Ok(hex::decode(&encoded_content[4..]).map_err(|_e| {
246 io::Error::new(
247 io::ErrorKind::InvalidData,
248 format!(
249 "{:?}: cannot decode hex content {:?}",
250 path, &encoded_content,
251 ),
252 )
253 })?)
254 } else {
255 Ok(encoded_content.to_vec())
256 }
257}
258pub(crate) fn mkdir_p(dir: impl AsRef<Path>) -> crate::Result<()> {
261 let dir = dir.as_ref();
262 let try_mkdir_once = || -> io::Result<()> {
263 fs::create_dir(dir).map(|_| {
264 let _ = fix_perm_path(dir, true);
266 })
267 };
268 {
269 try_mkdir_once().or_else(|err| {
270 match err.kind() {
271 io::ErrorKind::AlreadyExists => return Ok(()),
272 io::ErrorKind::NotFound => {
273 if let Some(parent) = dir.parent() {
275 mkdir_p(parent)
276 .context(|| format!("while trying to mkdir_p({:?})", dir))?;
277 return try_mkdir_once()
278 .context(dir, "cannot mkdir after mkdir its parent");
279 }
280 }
281 io::ErrorKind::PermissionDenied => {
282 if let Some(parent) = dir.parent() {
284 if fix_perm_path(parent, true).is_ok() {
285 return try_mkdir_once().context(dir, "cannot mkdir").context(|| {
286 format!(
287 "while trying to mkdir {:?} after fix_perm {:?}",
288 &dir, &parent
289 )
290 });
291 }
292 }
293 }
294 _ => {}
295 }
296 Err(err).context(dir, "cannot mkdir")
297 })
298 }
299}
300
301pub(crate) fn fix_perm_path(path: &Path, is_dir: bool) -> io::Result<()> {
303 #[cfg(unix)]
304 {
305 let file = fs::OpenOptions::new().read(true).open(path)?;
306 fix_perm_file(&file, is_dir)?;
307 }
308 #[cfg(windows)]
309 {
310 let _ = (path, is_dir);
311 }
312 Ok(())
313}
314
315pub(crate) fn fix_perm_file(file: &File, is_dir: bool) -> io::Result<()> {
317 #[cfg(unix)]
318 {
319 let mode = if is_dir {
321 config::CHMOD_DIR.load(atomic::Ordering::SeqCst)
322 } else {
323 config::CHMOD_FILE.load(atomic::Ordering::SeqCst)
324 };
325 if mode >= 0 {
326 let perm = std::os::unix::fs::PermissionsExt::from_mode(mode as u32);
327 file.set_permissions(perm)?;
328 }
329 }
330 #[cfg(windows)]
331 {
332 let _ = (file, is_dir);
333 }
334 Ok(())
335}
336
337pub(crate) fn fix_perm_symlink(path: &Path) -> io::Result<()> {
339 #[cfg(unix)]
340 {
341 use std::ffi::CString;
342 use std::os::unix::ffi::OsStrExt;
343
344 let path = CString::new(path.as_os_str().as_bytes())?;
345
346 let mode = config::CHMOD_FILE.load(atomic::Ordering::SeqCst);
348 if mode >= 0 {
349 unsafe {
350 libc::fchmodat(
351 libc::AT_FDCWD,
352 path.as_ptr(),
353 mode as _,
354 libc::AT_SYMLINK_NOFOLLOW,
355 )
356 };
357 }
358 }
359 #[cfg(windows)]
360 {
361 let _ = path;
362 }
363 Ok(())
364}
365
366thread_local! {
367 static THREAD_RAND_U64: RefCell<u64> = RefCell::new(0);
368}
369
370pub(crate) fn rand_u64() -> u64 {
373 if cfg!(test) {
374 let count = THREAD_RAND_U64.with(|i| {
376 *i.borrow_mut() += 1;
377 *i.borrow()
378 });
379 count | (1u64 << 63)
381 } else {
382 rand::random()
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389
390 fn check_atomic_read_write(data: &[u8]) {
391 config::SYMLINK_ATOMIC_WRITE.store(true, atomic::Ordering::SeqCst);
392 let dir = tempfile::tempdir().unwrap();
393 let path = dir.path().join("a");
394 let fsync = false;
395 atomic_write(&path, data, fsync).unwrap();
396 let read = atomic_read(&path).unwrap();
397 assert_eq!(data, &read[..]);
398 }
399
400 #[test]
401 fn test_atomic_read_write_roundtrip() {
402 for data in [
403 &b""[..],
404 b"hex",
405 b"hex:",
406 b"hex:abc",
407 b"hex:hex:abc",
408 b"abc",
409 b"\xe4\xbd\xa0\xe5\xa5\xbd",
410 b"hex:\xe4\xbd\xa0\xe5\xa5\xbd",
411 b"a\0b\0c\0",
412 b"hex:a\0b\0c\0",
413 b"\0\0\0\0\0\0",
414 ] {
415 check_atomic_read_write(data);
416 }
417 }
418
419 quickcheck::quickcheck! {
420 fn quickcheck_atomic_read_write_roundtrip(data: Vec<u8>) -> bool {
421 check_atomic_read_write(&data);
422 true
423 }
424 }
425}