#[cfg(not(target_pointer_width = "64"))]
compile_error!("file io works only on 64-bit platforms");
use std::{
cmp, fs,
hash::{Hash, Hasher},
io,
num::NonZeroU64,
path::{Path, PathBuf},
sync::atomic,
};
use crate::{Blocks, BlocksAllocator};
#[cfg(not(all(feature = "libc", target_os = "linux")))]
use std::io::{Read, Seek, Write};
#[cfg(all(feature = "libc", target_os = "linux"))]
use std::{os::fd::AsRawFd, ptr::NonNull};
#[derive(Debug)]
pub struct File {
inner: Option<fs::File>,
block_count: u64,
block_shift: u32,
stamp: Option<NonZeroU64>,
index: Option<WrappingSeq>,
}
impl File {
fn verify_input(block_count: u64, block_shift: u32) -> io::Result<u64> {
if !(12..=28).contains(&block_shift) {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"block shift must be between 12 and 28 inclusive",
));
}
if block_count == 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"block count must be non-zero",
));
}
let total_size = block_count << block_shift;
if total_size > i64::MAX as u64 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"total blocks size too large",
));
}
Ok(total_size)
}
fn verify_bufs<T: core::ops::Deref<Target = [u8]>>(
bufs: &[T],
block_size: usize,
) -> io::Result<()> {
if bufs.len() & 1 != 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"odd number of bufs",
));
}
if bufs.len() > i32::MAX as usize {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "too many bufs"));
}
let mut uneven_bufs = bufs
.chunks(2)
.skip_while(|pair| pair[0].len() + pair[1].len() == block_size);
if let Some([left, right]) = uneven_bufs.next() {
if left.len() + right.len() > block_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"uneven pair of bufs is too large",
));
}
}
if uneven_bufs.next().is_some() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"uneven pair of bufs is not the last pair",
));
}
Ok(())
}
pub fn create<P: AsRef<Path>>(path: P, block_count: u64, block_shift: u32) -> io::Result<File> {
let size = File::verify_input(block_count, block_shift)?;
let file = fs::File::options()
.create_new(true)
.read(true)
.write(true)
.open(path)?;
file.set_len(size)?;
#[cfg(all(feature = "libc", target_os = "linux"))]
{
reserve(&file, size)?;
double_readahead_pages(&file, size)?;
}
file.sync_all()?;
Ok(File {
block_count,
block_shift,
inner: Some(file),
index: None,
stamp: None,
})
}
pub fn open<P: AsRef<Path>>(path: P, block_shift: u32) -> io::Result<File> {
File::verify_input(1, block_shift)?;
let file = fs::File::options().read(true).write(true).open(path)?;
let size = file.metadata()?.len();
if size == 0 {
return Err(io::Error::new(io::ErrorKind::InvalidData, "zero file size"));
}
if size > i64::MAX as u64 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"file size too large",
));
}
if size & ((1 << block_shift) - 1) != 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"file size not aligned to block size",
));
}
#[cfg(all(feature = "libc", target_os = "linux"))]
double_readahead_pages(&file, size)?;
Ok(File {
block_count: size >> block_shift,
block_shift,
inner: Some(file),
index: None,
stamp: None,
})
}
#[inline(always)]
#[must_use]
fn with_alloc_info(mut self, stamp: NonZeroU64, index: WrappingSeq) -> File {
self.stamp = Some(stamp);
self.index = Some(index);
self
}
#[inline(always)]
fn inner(&mut self) -> io::Result<&mut fs::File> {
self.inner
.as_mut()
.ok_or(io::Error::new(io::ErrorKind::Other, FileSyncError))
}
}
impl Blocks for File {
#[inline(always)]
fn block_count(&self) -> u64 {
self.block_count
}
#[inline(always)]
fn block_shift(&self) -> u32 {
self.block_shift
}
fn load_from(&mut self, block: u64, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<()> {
if bufs.len() > i32::MAX as usize {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "too many bufs"));
}
self.read_exact_vectored_at(bufs, block << self.block_shift)
}
fn store_at(&mut self, block: u64, bufs: &mut [io::IoSlice<'_>]) -> io::Result<()> {
let offset = block << self.block_shift;
let total_len = bufs.iter().map(|buf| buf.len()).sum::<usize>() as u64;
if offset.saturating_add(total_len) > self.block_count << self.block_shift {
return Err(io::Error::new(
io::ErrorKind::OutOfMemory,
"write exceeds file capacity",
));
}
Self::verify_bufs(bufs, 1 << self.block_shift)?;
self.write_all_vectored_at(bufs, offset)
}
}
#[cfg(all(feature = "libc", target_os = "linux"))]
impl File {
#[inline(always)]
#[must_use]
fn chunk_shift(block_shift: u32) -> u32 {
let iovec_block_limit = libc::UIO_MAXIOV.ilog2() - 1;
cmp::max(cmp::min(block_shift + iovec_block_limit, 23), block_shift)
}
#[inline(always)]
fn free_cached_pages(&mut self, offset: u64, size: u64) -> io::Result<()> {
let ret = unsafe {
#[allow(clippy::cast_possible_wrap)]
libc::posix_fadvise(
self.inner()?.as_raw_fd(),
offset as libc::off64_t,
size as libc::off64_t,
libc::POSIX_FADV_DONTNEED,
)
};
if ret == 0 {
return Ok(());
}
Err(io::Error::from_raw_os_error(ret))
}
#[inline(always)]
fn maybe_free_cached_pages(&mut self, offset: u64, size: u64, shift: u32) -> io::Result<()> {
let start = offset >> shift;
let end = (offset + size) >> shift;
if start == end {
return Ok(());
}
self.free_cached_pages(start << shift, end << shift)
}
#[inline(always)]
fn sync_file_range(&mut self, offset: u64, size: u64, flags: libc::c_uint) -> io::Result<()> {
let ret = unsafe {
#[allow(clippy::cast_possible_wrap)]
libc::sync_file_range(
self.inner()?.as_raw_fd(),
offset as libc::off64_t,
size as libc::off64_t,
flags,
)
};
match ret {
0 => Ok(()),
-1 => Err(io::Error::last_os_error()),
_ => unreachable!("sync_file_range: unexpected return {ret}"),
}
}
#[inline(always)]
fn sync_file_range_start(&mut self, offset: u64, size: u64) -> io::Result<()> {
self.sync_file_range(offset, size, libc::SYNC_FILE_RANGE_WRITE)
}
#[inline(always)]
fn sync_file_range_wait(&mut self, offset: u64, size: u64) -> io::Result<()> {
self.sync_file_range(
offset,
size,
libc::SYNC_FILE_RANGE_WAIT_BEFORE
| libc::SYNC_FILE_RANGE_WRITE
| libc::SYNC_FILE_RANGE_WAIT_AFTER,
)
}
#[inline(always)]
fn read_vectored_at(
&mut self,
bufs: &mut [io::IoSliceMut<'_>],
offset: u64,
) -> io::Result<usize> {
let ret = unsafe {
#[allow(clippy::cast_possible_wrap)]
libc::preadv(
self.inner()?.as_raw_fd(),
bufs.as_mut_ptr().cast::<libc::iovec>(),
#[allow(clippy::cast_possible_truncation)]
cmp::min(bufs.len() as libc::c_int, libc::UIO_MAXIOV),
offset as libc::off64_t,
)
};
if ret >= 0 {
#[allow(clippy::cast_sign_loss)]
Ok(ret as usize)
} else if ret == -1 {
Err(io::Error::last_os_error())
} else {
unreachable!("preadv: unexpected return {ret}")
}
}
#[inline(always)]
fn write_vectored_at(&mut self, bufs: &[io::IoSlice<'_>], offset: u64) -> io::Result<usize> {
let ret = unsafe {
#[allow(clippy::cast_possible_wrap)]
libc::pwritev(
self.inner()?.as_raw_fd(),
bufs.as_ptr().cast::<libc::iovec>(),
#[allow(clippy::cast_possible_truncation)]
cmp::min(bufs.len() as libc::c_int, libc::UIO_MAXIOV),
offset as libc::off64_t,
)
};
if ret >= 0 {
#[allow(clippy::cast_sign_loss)]
Ok(ret as usize)
} else if ret == -1 {
Err(io::Error::last_os_error())
} else {
unreachable!("pwritev: unexpected return {ret}")
}
}
#[inline(always)]
fn write_vectored_at_dsync(
&mut self,
bufs: &[io::IoSlice<'_>],
offset: u64,
) -> io::Result<usize> {
let ret = unsafe {
#[allow(clippy::cast_possible_wrap)]
libc::pwritev2(
self.inner()?.as_raw_fd(),
bufs.as_ptr().cast::<libc::iovec>(),
#[allow(clippy::cast_possible_truncation)]
cmp::min(bufs.len() as libc::c_int, libc::UIO_MAXIOV),
offset as libc::off64_t,
libc::RWF_DSYNC,
)
};
if ret >= 0 {
#[allow(clippy::cast_sign_loss)]
Ok(ret as usize)
} else if ret == -1 {
Err(io::Error::last_os_error())
} else {
unreachable!("pwritev2: unexpected return {ret}")
}
}
#[inline(always)]
fn read_exact_vectored_at(
&mut self,
mut bufs: &mut [io::IoSliceMut<'_>],
offset: u64,
) -> io::Result<()> {
let mut current = offset;
while !bufs.is_empty() {
match self.read_vectored_at(bufs, current) {
Ok(0) => break,
Ok(read) => {
advance_iovec(&mut bufs, read, |buf| {
io::IoSliceMut::new(unsafe {
core::slice::from_raw_parts_mut(buf.as_ptr().cast_mut(), buf.len())
})
});
self.free_cached_pages(current, read as u64)?;
current = current.saturating_add(read as u64);
}
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => {}
Err(err) => return Err(err),
}
}
if !bufs.is_empty() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to fill all buffers",
));
}
Ok(())
}
#[inline(always)]
fn write_all_vectored_at(
&mut self,
mut bufs: &mut [io::IoSlice<'_>],
offset: u64,
) -> io::Result<()> {
let chunk_shift = File::chunk_shift(self.block_shift);
let total_len = bufs.iter().map(|buf| buf.len()).sum::<usize>();
if total_len <= 1 << chunk_shift {
let mut total_written = 0usize;
while !bufs.is_empty() {
match self
.write_vectored_at_dsync(bufs, offset.saturating_add(total_written as u64))
{
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write all buffers",
));
}
Ok(written) => {
total_written = total_written.saturating_add(written);
advance_iovec(&mut bufs, written, io::IoSlice::new);
}
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => {}
Err(err) => return Err(err),
}
}
return self.maybe_free_cached_pages(offset, total_written as u64, chunk_shift);
}
let bufs_per_chunk = 2 << (chunk_shift - self.block_shift);
let mut offset = offset;
let mut chunks = bufs.chunks_mut(bufs_per_chunk).peekable();
let mut is_first = true;
while let Some(mut chunk) = chunks.next() {
let current = offset;
while !chunk.is_empty() {
match self.write_vectored_at(chunk, offset) {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write all buffers",
));
}
Ok(written) => {
advance_iovec(&mut chunk, written, io::IoSlice::new);
offset = offset.saturating_add(written as u64);
}
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => {}
Err(err) => return Err(err),
}
}
let written = offset - current;
if !is_first {
let chunk_size = 1 << chunk_shift;
let previous = current - chunk_size;
let size = chunk_size + chunks.peek().map_or(written, |_| 0);
if let Err(err) = self.sync_file_range_wait(previous, size) {
self.inner.take();
return Err(err);
}
self.maybe_free_cached_pages(previous, size, chunk_shift)?;
}
if chunks.peek().is_some() {
if let Err(err) = self.sync_file_range_start(current, written) {
self.inner.take();
return Err(err);
}
}
is_first = false;
}
Ok(())
}
}
#[cfg(not(all(feature = "libc", target_os = "linux")))]
impl File {
#[inline(always)]
fn read_exact_vectored_at(
&mut self,
mut bufs: &mut [io::IoSliceMut<'_>],
offset: u64,
) -> io::Result<()> {
let file = self.inner()?;
file.seek(io::SeekFrom::Start(offset))?;
while !bufs.is_empty() {
match file.read_vectored(bufs) {
Ok(0) => break,
Ok(read) => {
advance_iovec(&mut bufs, read, |buf| {
io::IoSliceMut::new(unsafe {
core::slice::from_raw_parts_mut(buf.as_ptr().cast_mut(), buf.len())
})
});
}
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => {}
Err(err) => return Err(err),
}
}
if !bufs.is_empty() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to fill all buffers",
));
}
Ok(())
}
#[inline(always)]
fn write_all_vectored_at(
&mut self,
mut bufs: &mut [io::IoSlice<'_>],
offset: u64,
) -> io::Result<()> {
let file = self.inner()?;
file.seek(io::SeekFrom::Start(offset))?;
while !bufs.is_empty() {
match file.write_vectored(bufs) {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write all buffers",
));
}
Ok(written) => advance_iovec(&mut bufs, written, io::IoSlice::new),
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => {}
Err(err) => return Err(err),
}
}
if let Err(err) = file.flush() {
self.inner.take();
return Err(err);
}
Ok(())
}
}
#[derive(Debug)]
pub struct FileSyncError;
impl std::error::Error for FileSyncError {}
impl core::fmt::Display for FileSyncError {
#[inline(always)]
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "file is unavailable due to prior sync error")
}
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub enum FileSequenceError {
ParseOptsError(ParseOptsErrorKind, String),
Broken,
Duplicate(u16),
Locked,
OutOfOrder(u16),
OutOfRange(u16),
TooLong(u16, u16),
Unrecognized,
}
impl From<FileSequenceError> for io::Error {
#[inline(always)]
fn from(value: FileSequenceError) -> io::Error {
io::Error::new(io::ErrorKind::Other, value)
}
}
impl std::error::Error for FileSequenceError {}
impl core::fmt::Display for FileSequenceError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::ParseOptsError(kind, ref opts) => match kind {
ParseOptsErrorKind::BadIndex => {
write!(f, "filesequence: bad index: '{opts}'")
}
ParseOptsErrorKind::BadShift => {
write!(f, "filesequence: bad shift: '{opts}'")
}
ParseOptsErrorKind::MissingFields => {
write!(f, "filesequence: fields missing: '{opts}'")
}
ParseOptsErrorKind::TooManyFields => {
write!(f, "filesequence: too many fields: '{opts}'")
}
},
Self::Broken => write!(f, "filesequence: indexes are not monotonically increasing"),
Self::Duplicate(index) => write!(f, "filesequence: duplicate index {index:#06x}"),
Self::Locked => write!(f, "filesequence: lock file is held by another instance"),
Self::OutOfOrder(index) => write!(f, "filesequence: index {index:#06x} out of order"),
Self::OutOfRange(index) => write!(f, "filesequence: index {index:#06x} out of range"),
Self::TooLong(start, end) => {
write!(f, "filesequence: {start:#06x}..{end:#06x} is too long")
}
Self::Unrecognized => write!(f, "filesequence: file does not belong to this sequence"),
}
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub enum ParseOptsErrorKind {
BadIndex,
BadShift,
MissingFields,
TooManyFields,
}
#[derive(Debug)]
pub struct FileSequence {
root: Dir,
name: String,
block_count: u64,
block_shift: u32,
stamp: NonZeroU64,
start: atomic::AtomicU16,
end: atomic::AtomicU16,
lock: LockFile,
}
impl FileSequence {
#[inline(always)]
fn parse_opts(opts: &str) -> Result<(WrappingSeq, u32), FileSequenceError> {
use ParseOptsErrorKind::{BadIndex, BadShift, MissingFields, TooManyFields};
let mut parts = opts.split('-');
let index = WrappingSeq(
parts
.next()
.map(|index| u16::from_str_radix(index, 16))
.expect("first field is always present")
.map_err(|_| FileSequenceError::ParseOptsError(BadIndex, opts.to_owned()))?,
);
let block_shift = parts
.next()
.map(str::parse)
.ok_or_else(|| FileSequenceError::ParseOptsError(MissingFields, opts.to_string()))?
.map_err(|_| FileSequenceError::ParseOptsError(BadShift, opts.to_string()))?;
if parts.next().is_some() {
return Err(FileSequenceError::ParseOptsError(
TooManyFields,
opts.to_string(),
));
}
Ok((index, block_shift))
}
pub fn open<P: AsRef<Path>>(
root: P,
name: &str,
block_count: u64,
block_shift: u32,
) -> io::Result<FileSequence> {
File::verify_input(block_count, block_shift)?;
let root = Dir::open(root.as_ref().canonicalize()?)?;
let lock = LockFile::acquire(root.path.join(format!("{name}.lock"))).map_err(|err| {
if err.kind() == io::ErrorKind::AlreadyExists {
FileSequenceError::Locked.into()
} else {
err
}
})?;
root.sync()?;
let (mut start, mut end): (Option<WrappingSeq>, Option<WrappingSeq>) = (None, None);
for entry in root.read_dir()? {
let path = entry?.path();
if !path.is_file() {
continue;
}
if let Some((prefix, opts)) = path.file_stem().zip(path.extension()) {
if prefix != name {
continue;
}
let (index, _) = FileSequence::parse_opts(opts.to_str().unwrap_or_default())?;
let start_index = *start.get_or_insert(index);
let end_index = *end.get_or_insert(index.inc());
if index >= start_index && index < end_index {
continue;
} else if index < start_index {
start.replace(index);
} else if index >= end_index {
end.replace(index.inc());
} else {
return Err(FileSequenceError::OutOfRange(index.into()).into());
}
}
}
start
.zip(end)
.map(|(start, end)| match start.partial_cmp(&end) {
Some(cmp::Ordering::Greater | cmp::Ordering::Equal) => {
unreachable!("start never reached end")
}
Some(_) => Ok(()),
None => Err(FileSequenceError::TooLong(start.into(), end.into())),
})
.transpose()?;
let mut stamp = std::collections::hash_map::DefaultHasher::new();
root.path.hash(&mut stamp);
name.hash(&mut stamp);
Ok(FileSequence {
root,
name: name.into(),
block_count,
block_shift,
stamp: stamp.finish().try_into().expect("stamp should be non-zero"),
start: start.map_or(1, WrappingSeq::into).into(),
end: end.map_or(1, WrappingSeq::into).into(),
lock,
})
}
pub fn close(mut self) -> io::Result<()> {
self.lock.release().and_then(|()| self.root.sync())
}
#[inline(always)]
#[must_use]
fn get_filename(&self, index: WrappingSeq, block_shift: u32) -> PathBuf {
self.root.path.join(format!(
"{name}.{index:04x}-{shift}",
name = self.name,
shift = block_shift
))
}
}
unsafe impl BlocksAllocator for FileSequence {
type Blocks = File;
fn alloc(&self) -> io::Result<File> {
let start = WrappingSeq(self.start.load(atomic::Ordering::Relaxed));
let end = WrappingSeq(self.end.load(atomic::Ordering::Relaxed));
if start.distance(end.inc()).is_none() {
return Err(FileSequenceError::TooLong(start.into(), end.into()).into());
}
let path = self.get_filename(end, self.block_shift);
let file = File::create(&path, self.block_count, self.block_shift)?
.with_alloc_info(self.stamp, end);
if end < WrappingSeq(self.start.load(atomic::Ordering::Relaxed)) {
fs::remove_file(path)?;
return Err(FileSequenceError::OutOfRange(end.into()).into());
}
self.root.sync()?;
atomic::compiler_fence(atomic::Ordering::SeqCst);
assert_eq!(self.end.fetch_add(1, atomic::Ordering::Relaxed), end.into());
Ok(file)
}
fn release(&self, blocks: File) -> Result<(), (File, io::Error)> {
if blocks.stamp.filter(|stamp| *stamp == self.stamp).is_none() {
return Err((blocks, FileSequenceError::Unrecognized.into()));
}
let Some(index) = blocks.index else {
return Err((blocks, FileSequenceError::Unrecognized.into()));
};
let start = WrappingSeq(self.start.load(atomic::Ordering::Relaxed));
let end = WrappingSeq(self.end.load(atomic::Ordering::Relaxed));
if index > start && index < end {
return Err((blocks, FileSequenceError::OutOfOrder(index.into()).into()));
} else if index != start || start == end {
return Err((blocks, FileSequenceError::OutOfRange(index.into()).into()));
}
let path = self.get_filename(index, blocks.block_shift);
match fs::remove_file(path) {
Ok(()) => {}
Err(err) => return Err((blocks, err)),
}
match self.root.sync() {
Ok(()) => {}
Err(err) => return Err((blocks, err)),
}
atomic::compiler_fence(atomic::Ordering::SeqCst);
assert_eq!(
self.start.fetch_add(1, atomic::Ordering::Relaxed),
index.into()
);
Ok(())
}
fn retrieve(&self, mut f: impl FnMut(File)) -> io::Result<()> {
let start = WrappingSeq(self.start.load(atomic::Ordering::Relaxed));
let end = WrappingSeq(self.end.load(atomic::Ordering::Relaxed));
if start == end {
return Ok(());
}
let paths = self
.root
.read_dir()?
.map(|entry| entry.map(|entry| entry.path()).ok())
.filter(|path| {
path.as_ref().is_some_and(|path| {
path.is_file()
&& path
.file_stem()
.and_then(std::ffi::OsStr::to_str)
.is_some_and(|name| name == self.name)
})
});
let size = usize::from(
start
.distance(end)
.expect("distance between start and end is always correct")
.unsigned_abs(),
);
let mut files: Vec<Option<(WrappingSeq, u32, PathBuf)>> = vec![None; size];
for path in paths {
let (index, shift) = FileSequence::parse_opts(
path.as_ref()
.and_then(|path| path.extension())
.and_then(std::ffi::OsStr::to_str)
.unwrap_or_default(),
)?;
if index < start || index >= end {
return Err(FileSequenceError::OutOfRange(index.into()).into());
}
let pos = usize::from(
index
.distance(start)
.ok_or_else(|| FileSequenceError::OutOfRange(index.into()))?
.unsigned_abs(),
);
if files[pos].is_some() {
return Err(FileSequenceError::Duplicate(index.into()).into());
}
files[pos] = Some((index, shift, path.unwrap()));
}
if files.iter().any(Option::is_none) {
return Err(FileSequenceError::Broken.into());
}
for file in files.into_iter().map(Option::unwrap) {
f(File::open(file.2, file.1)?.with_alloc_info(self.stamp, file.0));
}
Ok(())
}
}
unsafe impl Send for FileSequence {}
unsafe impl Sync for FileSequence {}
#[derive(Debug)]
struct Dir {
path: PathBuf,
#[cfg(all(feature = "libc", target_os = "linux"))]
ptr: NonNull<libc::DIR>,
}
#[cfg(all(feature = "libc", target_os = "linux"))]
impl Dir {
fn open<P: AsRef<Path>>(path: P) -> io::Result<Dir> {
use std::os::unix::ffi::OsStrExt;
let bytes = path.as_ref().as_os_str().as_bytes();
if bytes.len() > 255 {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "path too long"));
}
let mut buf = core::mem::MaybeUninit::<[u8; 256]>::uninit();
let buf_ptr = buf.as_mut_ptr().cast::<u8>();
unsafe {
core::ptr::copy_nonoverlapping(bytes.as_ptr(), buf_ptr, bytes.len());
buf_ptr.add(bytes.len()).write(0);
}
let cstr = core::ffi::CStr::from_bytes_with_nul(unsafe {
core::slice::from_raw_parts(buf_ptr, bytes.len() + 1)
})
.map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
"path contains unexpected NUL byte",
)
})?;
let ptr = NonNull::new(unsafe { libc::opendir(cstr.as_ptr()) })
.ok_or_else(io::Error::last_os_error)?;
Ok(Dir {
path: path.as_ref().to_path_buf(),
ptr,
})
}
#[inline(always)]
fn sync(&self) -> io::Result<()> {
let fd = match unsafe { libc::dirfd(self.ptr.as_ptr()) } {
fd if fd >= 0 => fd,
-1 => return Err(io::Error::last_os_error()),
ret => unreachable!("dirfd: unexpected return {ret}"),
};
match unsafe { libc::fsync(fd) } {
0 => Ok(()),
-1 => Err(io::Error::last_os_error()),
ret => unreachable!("fsync: unexpected return {ret}"),
}
}
}
#[cfg(not(all(feature = "libc", target_os = "linux")))]
impl Dir {
#[allow(clippy::unnecessary_wraps)]
#[inline(always)]
fn open<P: AsRef<Path>>(path: P) -> io::Result<Dir> {
Ok(Dir {
path: path.as_ref().to_path_buf(),
})
}
#[allow(clippy::unused_self, clippy::unnecessary_wraps)]
#[inline(always)]
fn sync(&self) -> io::Result<()> {
Ok(())
}
}
impl Dir {
#[inline(always)]
fn read_dir(&self) -> io::Result<fs::ReadDir> {
self.path.read_dir()
}
}
#[cfg(all(feature = "libc", target_os = "linux"))]
impl Drop for Dir {
#[inline(always)]
fn drop(&mut self) {
let ret = unsafe { libc::closedir(self.ptr.as_ptr()) };
assert!(
ret == 0
|| (ret == -1 && io::Error::last_os_error().kind() == io::ErrorKind::Interrupted),
"closedir: unexpected return {ret}: error: {:?}",
io::Error::last_os_error()
);
}
}
#[derive(Debug)]
struct LockFile(Option<PathBuf>);
impl LockFile {
#[inline(always)]
fn acquire(path: PathBuf) -> io::Result<LockFile> {
std::os::unix::fs::symlink(std::process::id().to_string(), &path)?;
Ok(LockFile(Some(path)))
}
#[inline(always)]
fn release(&mut self) -> io::Result<()> {
fs::remove_file(self.0.take().expect("double release"))
}
}
impl Drop for LockFile {
#[inline(always)]
fn drop(&mut self) {
if self.0.is_some() {
let _ = self.release();
}
}
}
#[cfg(all(feature = "libc", target_os = "linux"))]
#[inline(always)]
fn double_readahead_pages<F: AsRawFd>(f: &F, size: u64) -> io::Result<()> {
let advise = libc::POSIX_FADV_SEQUENTIAL;
#[allow(clippy::cast_possible_wrap)]
let ret = unsafe { libc::posix_fadvise(f.as_raw_fd(), 0, size as libc::off64_t, advise) };
if ret == 0 {
return Ok(());
}
Err(io::Error::from_raw_os_error(ret))
}
#[cfg(all(feature = "libc", target_os = "linux"))]
#[inline(always)]
fn reserve<F: AsRawFd>(f: &F, size: u64) -> io::Result<()> {
let ret = unsafe {
#[allow(clippy::cast_possible_wrap)]
libc::fallocate(
f.as_raw_fd(),
0,
0,
size as libc::off64_t,
)
};
match ret {
0 => Ok(()),
-1 => Err(io::Error::last_os_error()),
_ => unreachable!("fallocate: unexpected return {ret}"),
}
}
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)] #[repr(transparent)]
struct WrappingSeq(u16);
impl WrappingSeq {
#[inline(always)]
#[must_use]
fn inc(self) -> WrappingSeq {
WrappingSeq(self.0.wrapping_add(1))
}
#[inline(always)]
#[must_use]
fn distance(self, other: WrappingSeq) -> Option<i16> {
#[allow(clippy::cast_possible_wrap)]
let distance = self.0.wrapping_sub(other.0) as i16;
if distance <= i16::MIN >> 1 || distance > i16::MAX >> 1 {
None
} else {
Some(distance)
}
}
}
impl From<WrappingSeq> for u16 {
#[inline(always)]
fn from(value: WrappingSeq) -> u16 {
value.0
}
}
impl cmp::PartialOrd for WrappingSeq {
#[inline(always)]
fn partial_cmp(&self, other: &WrappingSeq) -> Option<cmp::Ordering> {
self.distance(*other)
.and_then(|value| value.partial_cmp(&0))
}
}
impl core::fmt::LowerHex for WrappingSeq {
#[inline(always)]
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
self.0.fmt(f)
}
}
#[allow(clippy::mut_mut)]
#[inline(always)]
fn advance_iovec<'a, T: core::ops::Deref<Target = [u8]>, F>(iovec: &mut &mut [T], n: usize, new: F)
where
F: Fn(&'a [u8]) -> T,
{
let mut next = 0;
let mut remaining = n;
for bytes in iovec.iter() {
if bytes.len() > remaining {
break;
}
next += 1;
remaining -= bytes.len();
}
*iovec = &mut core::mem::take(iovec)[next..];
if iovec.is_empty() {
assert_eq!(remaining, 0, "advancing iovec beyond length");
return;
}
let bytes = new(unsafe {
let slice = &iovec[0][remaining..];
core::slice::from_raw_parts(slice.as_ptr(), slice.len())
});
let _ = core::mem::replace(&mut iovec[0], bytes);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn file_verify_input() {
let case = "returns total size";
assert_eq!(File::verify_input(4, 12).expect(case), 16384, "{case}");
let case = "block shift outside range";
let err = File::verify_input(1, 11).expect_err(case);
let msg = "block shift must be between 12 and 28 inclusive";
assert_eq!(err.to_string(), msg, "{case}");
let err = File::verify_input(1, 29).expect_err(case);
assert_eq!(err.to_string(), msg, "{case}");
let case = "zero block count";
let err = File::verify_input(0, 12).expect_err(case);
let msg = "block count must be non-zero";
assert_eq!(err.to_string(), msg, "{case}");
let case = "total size too large";
let err = File::verify_input(1 << (63 - 28), 28).expect_err(case);
let msg = "total blocks size too large";
assert_eq!(err.to_string(), msg, "{case}");
}
#[test]
fn file_verify_bufs() {
let bytes = &[1, 2, 3, 4, 5];
let case = "odd bufs";
#[rustfmt::skip]
let bufs = [
&bytes[..2], &bytes[..4],
&bytes[..2],
];
let err = File::verify_bufs(&bufs, 6).expect_err(case);
let msg = "odd number of bufs";
assert_eq!(err.to_string(), msg, "{case}");
let case = "large uneven pair";
#[rustfmt::skip]
let bufs = [
&bytes[..2], &bytes[..4],
&bytes[..2], &bytes[..5],
&bytes[..2], &bytes[..4],
];
let err = File::verify_bufs(&bufs, 6).expect_err(case);
let msg = "uneven pair of bufs is too large";
assert_eq!(err.to_string(), msg, "{case}");
let case = "uneven pair before last";
#[rustfmt::skip]
let bufs = [
&bytes[..2], &bytes[..4],
&bytes[..2], &bytes[..2],
&bytes[..2], &bytes[..2],
];
let err = File::verify_bufs(&bufs, 6).expect_err(case);
let msg = "uneven pair of bufs is not the last pair";
assert_eq!(err.to_string(), msg, "{case}");
#[rustfmt::skip]
let bufs = [
&bytes[..2], &bytes[..4],
&bytes[..2], &bytes[..2],
&bytes[..2], &bytes[..4],
];
let err = File::verify_bufs(&bufs, 6).expect_err(case);
assert_eq!(err.to_string(), msg, "{case}");
#[rustfmt::skip]
let bufs = [
&bytes[..2], &bytes[..4],
&[], &[],
&bytes[..2], &bytes[..4]
];
let err = File::verify_bufs(&bufs, 6).expect_err(case);
assert_eq!(err.to_string(), msg, "{case}");
}
#[cfg(all(feature = "libc", target_os = "linux"))]
#[test]
fn file_chunk_shift() {
assert_eq!(File::chunk_shift(12), 21);
assert_eq!(File::chunk_shift(13), 22);
for shift in 14..=23 {
assert_eq!(File::chunk_shift(shift), 23);
}
for shift in 24..=28 {
assert_eq!(File::chunk_shift(shift), shift);
}
}
#[test]
fn filesequence_parse_opts() {
let case = "good opts";
let parsed = FileSequence::parse_opts("0ccc-7").expect(case);
assert_eq!(parsed, (WrappingSeq(0x0ccc), 7), "{case}");
let parsed = FileSequence::parse_opts("cc-09").expect(case);
assert_eq!(parsed, (WrappingSeq(0x00cc), 9), "{case}");
let parsed = FileSequence::parse_opts("0000000f-0").expect(case);
assert_eq!(parsed, (WrappingSeq(0x000f), 0), "{case}");
let case = "bad opts";
let table = [
("", "filesequence: bad index: ''"),
("-9", "filesequence: bad index: '-9'"),
("x0-9", "filesequence: bad index: 'x0-9'"),
("f0", "filesequence: fields missing: 'f0'"),
("f0-a", "filesequence: bad shift: 'f0-a'"),
("f0-9-", "filesequence: too many fields: 'f0-9-'"),
];
for row in table {
let err = FileSequence::parse_opts(row.0).expect_err(case);
assert_eq!(err.to_string(), row.1, "{case}");
}
}
#[test]
fn wrappingseq_parital_cmp() {
let zero = WrappingSeq(0);
let one = WrappingSeq(1);
let quarter = WrappingSeq(u16::MAX >> 2);
let half = WrappingSeq(u16::MAX >> 1);
let max = WrappingSeq(u16::MAX);
assert!(max < one);
assert!(!(max > one));
assert!(max != one);
assert!(!(one < max));
assert!(one > max);
assert!(one != max);
assert!(zero < quarter);
assert!(!(zero > quarter));
assert!(zero != quarter);
assert!(!(quarter < zero));
assert!(quarter > zero);
assert!(quarter != zero);
assert!(!(max < quarter));
assert!(!(max > quarter));
assert!(max != quarter);
assert_eq!(max.partial_cmp(&quarter), None);
assert!(!(quarter < max));
assert!(!(quarter > max));
assert!(quarter != max);
assert_eq!(quarter.partial_cmp(&max), None);
assert!(!(max < half));
assert!(!(max > half));
assert!(max != half);
assert_eq!(max.partial_cmp(&half), None);
assert!(!(half < max));
assert!(!(half > max));
assert!(half != max);
assert_eq!(half.partial_cmp(&max), None);
}
#[test]
fn advance_iovec_fn() {
let bytes = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08];
let mut iovec = [
io::IoSlice::new(&bytes[0..2]),
io::IoSlice::new(&bytes[2..4]),
io::IoSlice::new(&bytes[4..8]),
];
let mut iovec = iovec.as_mut_slice();
let concat = |iovec: &[io::IoSlice]| {
iovec
.iter()
.flat_map(|bytes| bytes.iter().copied())
.collect::<Vec<_>>()
};
advance_iovec(&mut iovec, 0, io::IoSlice::new);
assert_eq!(iovec.len(), 3);
assert_eq!(concat(iovec), &bytes[0..8]);
advance_iovec(&mut iovec, 1, io::IoSlice::new);
assert_eq!(iovec.len(), 3);
assert_eq!(concat(iovec), &bytes[1..8]);
advance_iovec(&mut iovec, 1, io::IoSlice::new);
assert_eq!(iovec.len(), 2);
assert_eq!(concat(iovec), &bytes[2..8]);
advance_iovec(&mut iovec, 3, io::IoSlice::new);
assert_eq!(iovec.len(), 1);
assert_eq!(concat(iovec), &bytes[5..8]);
advance_iovec(&mut iovec, 3, io::IoSlice::new);
assert_eq!(iovec.len(), 0);
assert_eq!(concat(iovec), &[]);
let mut iovec = [io::IoSlice::new(&[0x01, 0x02]), io::IoSlice::new(&[0x03])];
let mut iovec = iovec.as_mut_slice();
advance_iovec(&mut iovec, 3, io::IoSlice::new);
assert_eq!(iovec.len(), 0);
assert_eq!(concat(&iovec), &[]);
let mut iovec = [
io::IoSlice::new(&[0x01, 0x02]),
io::IoSlice::new(&[]),
io::IoSlice::new(&[0x03]),
io::IoSlice::new(&[]),
];
let mut iovec = iovec.as_mut_slice();
advance_iovec(&mut iovec, 3, io::IoSlice::new);
assert_eq!(iovec.len(), 0);
assert_eq!(concat(&iovec), &[]);
}
#[test]
#[should_panic(expected = "advancing iovec beyond length")]
fn advance_iovec_fn_beyond_length() {
let mut iovec = [io::IoSlice::new(&[0x01, 0x02]), io::IoSlice::new(&[0x03])];
advance_iovec(&mut &mut iovec.as_mut_slice(), 4, io::IoSlice::new);
}
}