#![deny(clippy::cast_possible_truncation)]
use std::{
collections::BTreeSet,
fs::File,
io::Read,
path::{Path, PathBuf},
sync::Mutex,
};
use bytes::Bytes;
use crate::{
error::HeddleError,
fs_atomic::{enrich_fs_error, enrich_rename_error, sync_directory},
store::{Result, atomic::temp_path},
};
const MMAP_THRESHOLD_BYTES: u64 = 256 * 1024;
pub(super) enum FileBytes {
Vec(Vec<u8>),
Mmap(memmap2::Mmap),
}
impl FileBytes {
pub(super) fn as_slice(&self) -> &[u8] {
match self {
FileBytes::Vec(data) => data,
FileBytes::Mmap(data) => data,
}
}
pub(super) fn into_vec(self) -> Vec<u8> {
match self {
FileBytes::Vec(data) => data,
FileBytes::Mmap(data) => data.to_vec(),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(super) enum AtomicWriteMode {
Durable,
BatchDirectorySync,
NoSync,
}
pub(super) fn write_atomic(
path: &Path,
data: &[u8],
mode: AtomicWriteMode,
pending_directory_syncs: Option<&Mutex<BTreeSet<PathBuf>>>,
) -> Result<()> {
let parent = path
.parent()
.ok_or_else(|| std::io::Error::other("invalid atomic write path"))?;
std::fs::create_dir_all(parent)
.map_err(|e| HeddleError::Io(enrich_fs_error(parent, "creating", e)))?;
let temp_path = temp_path(path);
enum Op {
Write,
Rename,
SyncDir,
}
let mut failing_op = Op::Write;
let write_result: std::io::Result<()> = (|| {
let mut opts = std::fs::OpenOptions::new();
opts.write(true).create_new(true);
#[cfg(unix)]
{
use std::os::unix::fs::OpenOptionsExt;
opts.mode(0o644);
}
let mut file = opts.open(&temp_path)?;
use std::io::Write as _;
file.write_all(data)?;
match mode {
AtomicWriteMode::Durable => file.sync_all()?,
AtomicWriteMode::BatchDirectorySync => file.sync_data()?,
AtomicWriteMode::NoSync => {}
}
failing_op = Op::Rename;
std::fs::rename(&temp_path, path)?;
failing_op = Op::SyncDir;
match mode {
AtomicWriteMode::Durable => sync_directory(parent)?,
AtomicWriteMode::BatchDirectorySync => {
if let Some(pending) = pending_directory_syncs {
let mut dirs = pending.lock().map_err(|_| {
std::io::Error::other("failed to acquire pending directory sync lock")
})?;
dirs.insert(parent.to_path_buf());
}
}
AtomicWriteMode::NoSync => {}
}
Ok(())
})();
if let Err(err) = write_result {
let _ = std::fs::remove_file(&temp_path);
let wrapped = match failing_op {
Op::Write => enrich_fs_error(path, "writing", err),
Op::Rename => enrich_rename_error(&temp_path, path, err),
Op::SyncDir => enrich_fs_error(parent, "syncing", err),
};
return Err(HeddleError::Io(wrapped));
}
Ok(())
}
pub(super) fn read_file_header(path: &Path, header_len: usize) -> Result<Option<(Vec<u8>, u64)>> {
let mut file = match File::open(path) {
Ok(file) => file,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(e.into()),
};
let metadata = file.metadata()?;
let len = metadata.len();
let to_read = if len > header_len as u64 {
header_len
} else {
checked_file_len_to_usize(len)?
};
let mut header = vec![0u8; to_read];
if to_read > 0 {
use std::io::Read as _;
file.read_exact(&mut header)?;
}
Ok(Some((header, len)))
}
pub fn read_file_bytes_for_pack(path: &Path) -> Result<Bytes> {
let file = File::open(path)?;
let len = file.metadata()?.len();
if len == 0 {
return Ok(Bytes::new());
}
if len >= MMAP_THRESHOLD_BYTES {
let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? };
if mmap.len() != checked_file_len_to_usize(len)? {
return Err(HeddleError::InvalidObject(
"pack file size changed during memory mapping".to_string(),
));
}
return Ok(Bytes::from_owner(mmap));
}
let mut data = Vec::with_capacity(checked_file_len_to_usize(len)?);
let mut reader = file;
reader.read_to_end(&mut data)?;
Ok(Bytes::from(data))
}
pub(super) fn read_file_bytes(path: &Path) -> Result<Option<FileBytes>> {
let file = match File::open(path) {
Ok(file) => file,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(e.into()),
};
let metadata = file.metadata()?;
let len = metadata.len();
if len == 0 {
return Ok(Some(FileBytes::Vec(vec![])));
}
if len >= MMAP_THRESHOLD_BYTES {
let mmap = unsafe { memmap2::MmapOptions::new().map(&file)? };
if mmap.len() != checked_file_len_to_usize(len)? {
return Err(crate::store::HeddleError::InvalidObject(
"file size changed during memory mapping".to_string(),
));
}
return Ok(Some(FileBytes::Mmap(mmap)));
}
let mut data = Vec::with_capacity(checked_file_len_to_usize(len)?);
let mut reader = file;
reader.read_to_end(&mut data)?;
Ok(Some(FileBytes::Vec(data)))
}
fn checked_file_len_to_usize(len: u64) -> Result<usize> {
usize::try_from(len).map_err(|_| {
HeddleError::InvalidObject(format!("file length {len} exceeds platform limits"))
})
}
pub(super) fn list_hashes_from_dir(
dir: &std::path::Path,
) -> Result<Vec<crate::object::ContentHash>> {
use std::fs;
use tracing::debug;
if !dir.exists() {
return Ok(Vec::new());
}
let mut hashes = Vec::new();
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
let prefix = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
if prefix.len() == 2 {
for sub_entry in fs::read_dir(&path)? {
let sub_entry = sub_entry?;
let sub_path = sub_entry.path();
if let Some(name) = sub_path.file_name().and_then(|n| n.to_str()) {
let full_hash = format!("{}{}", prefix, name);
if let Ok(hash) = crate::object::ContentHash::from_hex(&full_hash) {
hashes.push(hash);
}
}
}
}
}
}
debug!(count = hashes.len(), "Listed hashes");
Ok(hashes)
}