use std::cell::RefCell;
use std::fs;
use std::fs::File;
use std::hash::Hasher;
use std::io;
use std::io::Read;
use std::io::Write;
use std::path::Path;
use std::sync::atomic;
use memmap2::MmapOptions;
use minibytes::Bytes;
use twox_hash::XxHash;
use twox_hash::XxHash32;
use crate::config;
use crate::errors::IoResultExt;
use crate::errors::ResultExt;
pub fn mmap_bytes(file: &File, len: Option<u64>) -> io::Result<Bytes> {
let actual_len = file.metadata()?.len();
let len = match len {
Some(len) => {
if len > actual_len {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
"mmap length {} is greater than file size {}",
len, actual_len
),
));
} else {
len
}
}
None => actual_len,
};
if len == 0 {
Ok(Bytes::new())
} else {
let bytes = Bytes::from(unsafe { MmapOptions::new().len(len as usize).map(file) }?);
crate::page_out::track_mmap_buffer(&bytes);
Ok(bytes)
}
}
pub fn mmap_path(path: &Path, len: u64) -> crate::Result<Bytes> {
if len == 0 {
Ok(Bytes::new())
} else {
let file = std::fs::OpenOptions::new()
.read(true)
.open(path)
.or_else(|err| {
if err.kind() == io::ErrorKind::NotFound {
Err(err).context(path, "cannot open for mmap").corruption()
} else {
Err(err).context(path, "cannot open for mmap")
}
})?;
Ok(mmap_bytes(&file, Some(len)).context(path, "cannot mmap")?)
}
}
pub fn open_dir(lock_path: impl AsRef<Path>) -> io::Result<File> {
let path = lock_path.as_ref();
#[cfg(unix)]
{
File::open(path)
}
#[cfg(not(unix))]
{
let mut path = path.to_path_buf();
path.push("lock");
fs::OpenOptions::new().write(true).create(true).open(&path)
}
}
#[inline]
pub fn xxhash<T: AsRef<[u8]>>(buf: T) -> u64 {
let mut xx = XxHash::default();
xx.write(buf.as_ref());
xx.finish()
}
#[inline]
pub fn xxhash32<T: AsRef<[u8]>>(buf: T) -> u32 {
let mut xx = XxHash32::default();
xx.write(buf.as_ref());
xx.finish() as u32
}
pub fn atomic_write(
path: impl AsRef<Path>,
content: impl AsRef<[u8]>,
fsync: bool,
) -> crate::Result<()> {
let path = path.as_ref();
let content = content.as_ref();
#[cfg(unix)]
{
if config::SYMLINK_ATOMIC_WRITE.load(atomic::Ordering::SeqCst) {
if atomic_write_symlink(path, content).is_ok() {
return Ok(());
}
}
}
atomic_write_plain(path, content, fsync)
}
pub fn atomic_write_plain(path: &Path, content: &[u8], fsync: bool) -> crate::Result<()> {
let result: crate::Result<_> = {
atomicfile::atomic_write(
path,
config::CHMOD_FILE.load(atomic::Ordering::SeqCst) as u32,
fsync || config::get_global_fsync(),
|file| {
file.write_all(content)?;
Ok(())
},
)
.context(path, "atomic_write error")?;
Ok(())
};
result.context(|| {
let content_desc = if content.len() < 128 {
format!("{:?}", content)
} else {
format!("<{}-byte slice>", content.len())
};
format!(
" in atomic_write(path={:?}, content={}) ",
path, content_desc
)
})
}
#[cfg(unix)]
fn atomic_write_symlink(path: &Path, content: &[u8]) -> io::Result<()> {
let encoded_content: String = {
match std::str::from_utf8(content) {
Ok(s) if !s.starts_with("hex:") && !content.contains(&0) => s.to_string(),
_ => format!("hex:{}", hex::encode(content)),
}
};
let temp_path = loop {
let temp_path = path.with_extension(format!(".temp{}", rand::random::<u16>()));
match std::os::unix::fs::symlink(&encoded_content, &temp_path) {
Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
continue;
}
Err(e) => return Err(e),
Ok(_) => break temp_path,
}
};
let _ = fix_perm_symlink(&temp_path);
match fs::rename(&temp_path, path) {
Ok(_) => Ok(()),
Err(e) => {
let _ = fs::remove_file(&temp_path);
Err(e)
}
}
}
pub fn atomic_read(path: &Path) -> io::Result<Vec<u8>> {
#[cfg(unix)]
{
if let Ok(data) = atomic_read_symlink(path) {
return Ok(data);
}
}
let mut file = fs::OpenOptions::new().read(true).open(path)?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
Ok(buf)
}
#[cfg(unix)]
fn atomic_read_symlink(path: &Path) -> io::Result<Vec<u8>> {
use std::os::unix::ffi::OsStrExt;
let encoded_content = path.read_link()?;
let encoded_content = encoded_content.as_os_str().as_bytes();
if encoded_content.starts_with(b"hex:") {
Ok(hex::decode(&encoded_content[4..]).map_err(|_e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!(
"{:?}: cannot decode hex content {:?}",
path, &encoded_content,
),
)
})?)
} else {
Ok(encoded_content.to_vec())
}
}
pub(crate) fn mkdir_p(dir: impl AsRef<Path>) -> crate::Result<()> {
let dir = dir.as_ref();
let try_mkdir_once = || -> io::Result<()> {
fs::create_dir(dir).map(|_| {
let _ = fix_perm_path(dir, true);
})
};
{
try_mkdir_once().or_else(|err| {
match err.kind() {
io::ErrorKind::AlreadyExists => return Ok(()),
io::ErrorKind::NotFound => {
if let Some(parent) = dir.parent() {
mkdir_p(parent)
.context(|| format!("while trying to mkdir_p({:?})", dir))?;
return try_mkdir_once()
.context(dir, "cannot mkdir after mkdir its parent");
}
}
io::ErrorKind::PermissionDenied => {
if let Some(parent) = dir.parent() {
if fix_perm_path(parent, true).is_ok() {
return try_mkdir_once().context(dir, "cannot mkdir").context(|| {
format!(
"while trying to mkdir {:?} after fix_perm {:?}",
&dir, &parent
)
});
}
}
}
_ => {}
}
Err(err).context(dir, "cannot mkdir")
})
}
}
pub(crate) fn fix_perm_path(path: &Path, is_dir: bool) -> io::Result<()> {
#[cfg(unix)]
{
let file = fs::OpenOptions::new().read(true).open(path)?;
fix_perm_file(&file, is_dir)?;
}
#[cfg(windows)]
{
let _ = (path, is_dir);
}
Ok(())
}
pub(crate) fn fix_perm_file(file: &File, is_dir: bool) -> io::Result<()> {
#[cfg(unix)]
{
let mode = if is_dir {
config::CHMOD_DIR.load(atomic::Ordering::SeqCst)
} else {
config::CHMOD_FILE.load(atomic::Ordering::SeqCst)
};
if mode >= 0 {
let perm = std::os::unix::fs::PermissionsExt::from_mode(mode as u32);
file.set_permissions(perm)?;
}
}
#[cfg(windows)]
{
let _ = (file, is_dir);
}
Ok(())
}
pub(crate) fn fix_perm_symlink(path: &Path) -> io::Result<()> {
#[cfg(unix)]
{
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
let path = CString::new(path.as_os_str().as_bytes())?;
let mode = config::CHMOD_FILE.load(atomic::Ordering::SeqCst);
if mode >= 0 {
unsafe {
libc::fchmodat(
libc::AT_FDCWD,
path.as_ptr(),
mode as _,
libc::AT_SYMLINK_NOFOLLOW,
)
};
}
}
#[cfg(windows)]
{
let _ = path;
}
Ok(())
}
thread_local! {
static THREAD_RAND_U64: RefCell<u64> = RefCell::new(0);
}
pub(crate) fn rand_u64() -> u64 {
if cfg!(test) {
let count = THREAD_RAND_U64.with(|i| {
*i.borrow_mut() += 1;
*i.borrow()
});
count | (1u64 << 63)
} else {
rand::random()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn check_atomic_read_write(data: &[u8]) {
config::SYMLINK_ATOMIC_WRITE.store(true, atomic::Ordering::SeqCst);
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("a");
let fsync = false;
atomic_write(&path, data, fsync).unwrap();
let read = atomic_read(&path).unwrap();
assert_eq!(data, &read[..]);
}
#[test]
fn test_atomic_read_write_roundtrip() {
for data in [
&b""[..],
b"hex",
b"hex:",
b"hex:abc",
b"hex:hex:abc",
b"abc",
b"\xe4\xbd\xa0\xe5\xa5\xbd",
b"hex:\xe4\xbd\xa0\xe5\xa5\xbd",
b"a\0b\0c\0",
b"hex:a\0b\0c\0",
b"\0\0\0\0\0\0",
] {
check_atomic_read_write(data);
}
}
quickcheck::quickcheck! {
fn quickcheck_atomic_read_write_roundtrip(data: Vec<u8>) -> bool {
check_atomic_read_write(&data);
true
}
}
}