#[cfg(not(target_arch = "wasm32"))]
use crate::platform_specific::FileMapping;
use crate::{from_bytes, from_bytes_mut, NoatunStorable, Target};
use anyhow::{bail, Context, Result};
use std::fmt::{Debug, Formatter};
use std::fs::{create_dir_all, File, OpenOptions};
use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
use std::marker::PhantomData;
use metrics::{describe_gauge, gauge, Gauge, Unit};
use std::slice;
pub trait FileBackend {
fn page_size(&self) -> usize;
fn sync_all(&self) -> Result<()>;
fn sync_range(&self, start: usize, len: usize) -> Result<()>;
fn ptr(&self) -> *mut u8;
fn len(&self) -> usize;
fn maximum_size(&self) -> usize;
fn shrink_committed_mapping(&mut self, new_size: usize) -> Result<()>;
fn grow_committed_mapping(&mut self, new_size: usize) -> Result<()>;
fn try_lock_exclusive(&self) -> Result<()>;
}
pub(crate) struct FileAccessor {
mapping: Box<dyn FileBackend + Send + Sync>,
ptr: *mut u8,
committed_size: usize,
seek_pos: usize,
committed_size_gauge: Gauge,
}
unsafe impl Send for FileAccessor {}
unsafe impl Sync for FileAccessor {}
pub(crate) struct ReadonlyFileAccessor<'a> {
ptr: *mut u8,
size: usize,
seek_pos: usize,
phantom: PhantomData<&'a ()>,
}
impl Read for ReadonlyFileAccessor<'_> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.seek_pos == self.size {
return Ok(0);
}
let getnow = (self.size - self.seek_pos).min(buf.len());
let m = self.map();
buf[0..getnow].copy_from_slice(&m[self.seek_pos..self.seek_pos + getnow]);
self.seek_pos += getnow;
Ok(getnow)
}
}
impl ReadonlyFileAccessor<'_> {
pub(crate) fn map(&self) -> &[u8] {
let used = self.size;
unsafe { slice::from_raw_parts(self.ptr.wrapping_add(FileAccessor::HEADER_SIZE), used) }
}
pub fn with_bytes<R>(&mut self, bytes: usize, mut f: impl FnMut(&[u8]) -> R) -> Result<R> {
if self.seek_pos + bytes > self.size {
bail!("requested number of bytes not available in file");
}
let data = &self.map()[self.seek_pos..self.seek_pos + bytes];
let ret = f(data);
self.seek_pos += bytes;
Ok(ret)
}
}
impl Seek for ReadonlyFileAccessor<'_> {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
match pos {
SeekFrom::Start(s) => {
self.seek_pos = s.try_into().map_err(|_| {
std::io::Error::new(ErrorKind::InvalidInput, "invalid seek position")
})?;
}
SeekFrom::End(e) => {
if e == 0 {
self.seek_pos = self.size;
} else {
self.seek_pos = self
.size
.try_into()
.ok()
.and_then(|x: i64| x.checked_sub(e))
.and_then(|x| x.try_into().ok())
.ok_or_else(|| {
std::io::Error::new(ErrorKind::InvalidInput, "invalid seek position")
})?;
}
}
SeekFrom::Current(delta) => {
self.seek_pos = self
.seek_pos
.try_into()
.ok()
.and_then(|x: i64| x.checked_add(delta))
.and_then(|x| x.try_into().ok())
.ok_or_else(|| {
std::io::Error::new(ErrorKind::InvalidInput, "invalid seek position")
})?;
}
}
Ok(self.seek_pos as u64)
}
}
impl Debug for FileAccessor {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "FileAccessor({})", self.committed_size)
}
}
impl Debug for ReadonlyFileAccessor<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ReadonlyFileAccessor({})", self.size)
}
}
impl Seek for FileAccessor {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
match pos {
SeekFrom::Start(s) => {
self.seek_pos = s.try_into().map_err(|_| {
std::io::Error::new(ErrorKind::InvalidInput, "invalid seek position")
})?;
}
SeekFrom::End(e) => {
if e == 0 {
self.seek_pos = self.used_space();
} else {
self.seek_pos = self
.used_space()
.try_into()
.ok()
.and_then(|x: i64| x.checked_sub(e))
.and_then(|x| x.try_into().ok())
.ok_or_else(|| {
std::io::Error::new(ErrorKind::InvalidInput, "invalid seek position")
})?;
}
}
SeekFrom::Current(delta) => {
self.seek_pos = self
.seek_pos
.try_into()
.ok()
.and_then(|x: i64| x.checked_add(delta))
.and_then(|x| x.try_into().ok())
.ok_or_else(|| {
std::io::Error::new(ErrorKind::InvalidInput, "invalid seek position")
})?;
}
}
Ok(self.seek_pos as u64)
}
}
impl Read for FileAccessor {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.seek_pos == self.used_space() {
return Ok(0);
}
let getnow = (self.used_space() - self.seek_pos).min(buf.len());
let m = self.map();
buf[0..getnow].copy_from_slice(&m[self.seek_pos..self.seek_pos + getnow]);
self.seek_pos += getnow;
Ok(getnow)
}
}
impl FileAccessor {
pub fn disk_space_used_bytes(&self) -> u64 {
self.committed_size as u64
}
fn make_gauge(name: &str, description: &str) -> Gauge {
let committed_size_gauge = gauge!(name.to_string());
describe_gauge!(name.to_string(), Unit::Bytes, description.to_string(),);
committed_size_gauge
}
}
impl Write for FileAccessor {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if self.seek_pos + buf.len() > self.used_space() {
self.grow(self.seek_pos + buf.len())
.map_err(std::io::Error::other)?;
}
let dest = unsafe {
slice::from_raw_parts_mut(
self.ptr
.wrapping_add(Self::HEADER_SIZE)
.wrapping_add(self.seek_pos),
buf.len(),
)
};
dest.copy_from_slice(buf);
self.seek_pos += buf.len();
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl FileAccessor {
const HEADER_SIZE: usize = 16;
pub fn seek_to(&mut self, offset: usize) -> std::io::Result<()> {
self.seek(SeekFrom::Start(offset as u64))?;
Ok(())
}
pub(crate) fn readonly(&self) -> ReadonlyFileAccessor<'_> {
ReadonlyFileAccessor {
ptr: self.ptr,
size: self.used_space(),
seek_pos: self.seek_pos,
phantom: Default::default(),
}
}
pub unsafe fn access_pod<R: NoatunStorable>(&self, offset: usize) -> Result<&R> {
if offset + size_of::<R>() > self.used_space() {
bail!("requested number of bytes not available in file");
}
let raw = unsafe {
slice::from_raw_parts(
self.ptr.wrapping_add(FileAccessor::HEADER_SIZE + offset),
size_of::<R>(),
)
};
Ok(from_bytes(raw))
}
#[allow(clippy::mut_from_ref)]
pub unsafe fn access_pod_mut<R: NoatunStorable>(&self, offset: usize) -> Result<&mut R> {
if offset + size_of::<R>() > self.used_space() {
bail!("requested number of bytes not available in file");
}
let raw = unsafe {
slice::from_raw_parts_mut(
self.ptr.wrapping_add(FileAccessor::HEADER_SIZE + offset),
size_of::<R>(),
)
};
Ok(from_bytes_mut(raw))
}
#[inline(always)]
pub(crate) fn used_space(&self) -> usize {
unsafe { *(self.ptr as *const usize) }
}
pub(crate) fn set_used_space(&self, new_value: usize) {
if self.committed_size == 0 {
dbg!(new_value, Self::HEADER_SIZE, self.committed_size);
}
assert!(
new_value
.checked_add(Self::HEADER_SIZE)
.expect("arithmetic overflow")
<= self.committed_size
);
unsafe {
*(self.mapping.ptr() as *mut usize) = new_value;
}
}
pub(crate) fn set_used_space_to_full_file(&mut self) {
self.set_used_space(self.committed_size.saturating_sub(Self::HEADER_SIZE));
}
pub(crate) fn map_const_ptr(&self) -> *const u8 {
self.ptr.wrapping_add(Self::HEADER_SIZE)
}
#[inline]
pub(crate) fn map_mut_ptr(&self) -> *mut u8 {
self.ptr.wrapping_add(Self::HEADER_SIZE)
}
pub(crate) fn map_all_raw(&self) -> &[u8] {
unsafe {
slice::from_raw_parts(
self.ptr.wrapping_add(Self::HEADER_SIZE),
self.committed_size.saturating_sub(Self::HEADER_SIZE),
)
}
}
pub(crate) fn map_all_raw_mut(&mut self) -> &mut [u8] {
unsafe {
slice::from_raw_parts_mut(
self.ptr.wrapping_add(Self::HEADER_SIZE),
self.committed_size.saturating_sub(Self::HEADER_SIZE),
)
}
}
pub(crate) fn map(&self) -> &[u8] {
let used = self.used_space();
unsafe { slice::from_raw_parts(self.ptr.wrapping_add(Self::HEADER_SIZE), used) }
}
pub(crate) fn map_mut(&mut self) -> &mut [u8] {
let used = self.used_space();
unsafe { slice::from_raw_parts_mut(self.ptr.wrapping_add(Self::HEADER_SIZE), used) }
}
pub(crate) fn from_mapping(
mut mapping: impl FileBackend + Send + Sync + 'static,
name: &str,
descr: &str,
) -> Self {
let initial_len = Self::HEADER_SIZE;
mapping.grow_committed_mapping(initial_len).unwrap();
let committed_size_gauge = FileAccessor::make_gauge(name, descr);
committed_size_gauge.set(mapping.len() as f64);
Self {
ptr: mapping.ptr(),
committed_size: mapping.len(),
mapping: Box::new(mapping),
seek_pos: 0,
committed_size_gauge,
}
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn new(
target: &Target,
file: &str,
initial_size: usize,
max_size: usize,
name: &str,
description: &str,
) -> Result<(Self, bool)> {
if max_size == 0 {
bail!("max_size must not be 0");
}
if initial_size > max_size {
bail!(
"initial_size ({}) must not be greater than max_size ({})",
initial_size,
max_size
);
}
create_dir_all(target.path()).context("creating directory for data file")?;
let path = target.path().join(format!("{file}.bin"));
let mut overwrite = target.overwrite();
let create = target.create();
let existed = if std::fs::metadata(&path).is_err() {
overwrite = true;
false
} else {
true
};
let file = OpenOptions::new()
.read(true)
.write(true)
.create(create)
.truncate(overwrite)
.open(&path)
.with_context(|| format!("opening file {path:?}"))?;
let page_size = FileMapping::page_size();
let mut len = File::metadata(&file)?.len() as usize;
if len < initial_size + Self::HEADER_SIZE || !len.is_multiple_of(page_size) {
len = len
.max(initial_size + Self::HEADER_SIZE)
.next_multiple_of(page_size);
file.set_len((len) as u64)
.with_context(|| format!("Resizing file {:?} to {} bytes", &path, len))?;
file.sync_all().context("fsync")?;
}
let filename = path.to_string_lossy();
let mapping = FileMapping::new(
file,
len,
(max_size + Self::HEADER_SIZE).next_multiple_of(page_size),
&filename,
)
.with_context(|| format!("failed to memory map file {filename}"))?;
let committed_size_gauge = FileAccessor::make_gauge(name, description);
committed_size_gauge.set(mapping.committed_size() as f64);
let temp = FileAccessor {
committed_size: mapping.committed_size(),
ptr: mapping.ptr(),
mapping: Box::new(mapping),
seek_pos: 0,
committed_size_gauge,
};
let claimed_used_size = temp.used_space();
let new_used_size = claimed_used_size.min(len.saturating_sub(Self::HEADER_SIZE));
temp.set_used_space(new_used_size);
Ok((temp, existed))
}
}
impl FileAccessor {
pub fn try_lock_exclusive(&self) -> Result<()> {
self.mapping.try_lock_exclusive()
}
pub fn write_zeroes(&mut self, bytes: usize) -> Result<()> {
if self.seek_pos + bytes > self.used_space() {
self.grow(self.seek_pos + bytes)?;
}
unsafe {
slice::from_raw_parts_mut(
self.ptr
.wrapping_add(self.seek_pos)
.wrapping_add(Self::HEADER_SIZE),
bytes,
)
.fill(0)
}
self.seek_pos += bytes;
Ok(())
}
pub fn copy_to(&mut self, bytes: usize, target: &mut FileAccessor) -> Result<()> {
if self.seek_pos + bytes > self.used_space() {
bail!("requested number of bytes not available in file");
}
let src_buf = &self.map()[self.seek_pos..self.seek_pos + bytes];
if target.seek_pos + bytes > target.used_space() {
target.grow(target.seek_pos.checked_add(bytes).unwrap())?;
}
let target_seek_pos = target.seek_pos;
let dst_buf = &mut target.map_mut()[target_seek_pos..target_seek_pos + bytes];
dst_buf.copy_from_slice(src_buf);
self.seek_pos += bytes;
target.seek_pos += bytes;
Ok(())
}
pub fn with_all_bytes<R>(&mut self, mut f: impl FnMut(&[u8]) -> R) -> R {
let data = &self.map_all_raw();
let ret = f(data);
ret
}
pub fn with_bytes<R>(&mut self, bytes: usize, mut f: impl FnMut(&[u8]) -> R) -> Result<R> {
if self.seek_pos + bytes > self.used_space() {
bail!(
"requested number of bytes not available in file. Requested: {}, had: {} (seek pos: {})",
bytes,
self.used_space().saturating_sub(self.seek_pos),
self.seek_pos
);
}
let data = &self.map()[self.seek_pos..self.seek_pos + bytes];
let ret = f(data);
self.seek_pos += bytes;
Ok(ret)
}
pub fn with_bytes_at<R>(
&mut self,
offset: usize,
bytes: usize,
mut f: impl FnMut(&[u8]) -> R,
) -> Result<R> {
if offset + bytes > self.used_space() {
bail!(
"requested number of bytes not available in file. Requested: {}, had: {} (seek pos: {})",
bytes,
self.used_space().saturating_sub(self.seek_pos),
self.seek_pos
);
}
let data = &self.map()[offset..offset + bytes];
let ret = f(data);
Ok(ret)
}
pub(crate) fn grow(&mut self, new_size: usize) -> Result<()> {
if new_size + Self::HEADER_SIZE > self.committed_size {
let max_size = self.mapping.maximum_size();
if new_size + Self::HEADER_SIZE >= max_size {
bail!(
"maximum file size exceeded. Requested new size: {}. Max size: {}",
new_size + Self::HEADER_SIZE,
max_size
);
}
let new_file_size = ((self.committed_size + new_size + Self::HEADER_SIZE) * 2)
.next_multiple_of(self.mapping.page_size())
.min(max_size);
self.mapping.grow_committed_mapping(new_file_size)?;
self.committed_size = new_file_size;
self.committed_size_gauge.set(new_file_size as f64);
}
self.set_used_space(new_size);
Ok(())
}
pub(crate) fn sync_range(&self, offset: usize, len: usize) -> Result<()> {
if offset < self.mapping.page_size() {
self.mapping
.sync_range(0, offset + Self::HEADER_SIZE + len)?;
} else {
self.mapping.sync_range(offset + Self::HEADER_SIZE, len)?;
self.mapping.sync_range(0, Self::HEADER_SIZE)?;
}
Ok(())
}
pub(crate) fn sync_all(&self) -> Result<()> {
self.mapping.sync_all()?;
Ok(())
}
pub(crate) fn fast_truncate(&self, new_size: usize) {
if self.used_space() > new_size {
self.set_used_space(new_size);
}
}
pub(crate) fn truncate(&mut self, new_size: usize) -> Result<()> {
let new_alloc_size =
(new_size + Self::HEADER_SIZE).next_multiple_of(self.mapping.page_size());
if new_alloc_size < self.committed_size {
self.mapping.shrink_committed_mapping(new_alloc_size)?;
self.committed_size_gauge.set(new_alloc_size as f64);
self.committed_size = new_alloc_size;
}
self.set_used_space(new_size);
self.map_all_raw_mut().fill(0);
Ok(())
}
}