use std::{
fs::{create_dir_all, File, OpenOptions},
path::{Path, PathBuf},
sync::Arc,
};
use foyer_common::{asyncify::asyncify_with_runtime, bits};
use fs4::free_space;
use serde::{Deserialize, Serialize};
use super::{Dev, DevExt, DevOptions, RegionId};
use crate::{
device::ALIGN,
error::{Error, Result},
IoBytes, IoBytesMut, Runtime,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DirectFileDeviceOptions {
pub path: PathBuf,
pub capacity: usize,
pub region_size: usize,
}
#[derive(Debug, Clone)]
pub struct DirectFileDevice {
file: Arc<File>,
capacity: usize,
region_size: usize,
runtime: Runtime,
}
impl DevOptions for DirectFileDeviceOptions {
fn verify(&self) -> Result<()> {
if self.region_size == 0 || self.region_size % ALIGN != 0 {
return Err(anyhow::anyhow!(
"region size ({region_size}) must be a multiplier of ALIGN ({ALIGN})",
region_size = self.region_size,
)
.into());
}
if self.capacity == 0 || self.capacity % self.region_size != 0 {
return Err(anyhow::anyhow!(
"capacity ({capacity}) must be a multiplier of region size ({region_size})",
capacity = self.capacity,
region_size = self.region_size,
)
.into());
}
Ok(())
}
}
impl DirectFileDevice {
#[fastrace::trace(name = "foyer::storage::device::direct_file::pwrite")]
pub async fn pwrite(&self, buf: IoBytes, offset: u64) -> Result<()> {
let aligned = buf.as_aligned().len();
assert!(
offset as usize + aligned <= self.capacity(),
"offset ({offset}) + aligned ({aligned}) = total ({total}) <= capacity ({capacity})",
total = offset as usize + aligned,
capacity = self.capacity,
);
let file = self.file.clone();
asyncify_with_runtime(self.runtime.write(), move || {
#[cfg(target_family = "windows")]
let written = {
use std::os::windows::fs::FileExt;
file.seek_write(buf.as_aligned(), offset)?
};
#[cfg(target_family = "unix")]
let written = {
use std::os::unix::fs::FileExt;
file.write_at(buf.as_aligned(), offset)?
};
if written != aligned {
return Err(anyhow::anyhow!("written {written}, expected: {aligned}").into());
}
Ok(())
})
.await
}
#[fastrace::trace(name = "foyer::storage::device::direct_file::pread")]
pub async fn pread(&self, offset: u64, len: usize) -> Result<IoBytesMut> {
bits::assert_aligned(self.align() as u64, offset);
let aligned = bits::align_up(self.align(), len);
assert!(
offset as usize + aligned <= self.capacity(),
"offset ({offset}) + aligned ({aligned}) = total ({total}) <= capacity ({capacity})",
total = offset as usize + aligned,
capacity = self.capacity,
);
let mut buf = IoBytesMut::with_capacity(aligned);
unsafe {
buf.set_len(aligned);
}
let file = self.file.clone();
let mut buffer = asyncify_with_runtime(self.runtime.read(), move || {
#[cfg(target_family = "windows")]
let read = {
use std::os::windows::fs::FileExt;
file.seek_read(buf.as_mut(), offset)?
};
#[cfg(target_family = "unix")]
let read = {
use std::os::unix::fs::FileExt;
file.read_at(buf.as_mut(), offset)?
};
if read != aligned {
return Err(anyhow::anyhow!("read {read}, expected: {aligned}").into());
}
Ok::<_, Error>(buf)
})
.await?;
buffer.truncate(len);
Ok(buffer)
}
}
impl Dev for DirectFileDevice {
type Options = DirectFileDeviceOptions;
fn capacity(&self) -> usize {
self.capacity
}
fn region_size(&self) -> usize {
self.region_size
}
#[fastrace::trace(name = "foyer::storage::device::direct_file::open")]
async fn open(options: Self::Options, runtime: Runtime) -> Result<Self> {
options.verify()?;
let dir = options
.path
.parent()
.expect("path must not be the root directory")
.to_path_buf();
if !dir.exists() {
create_dir_all(dir)?;
}
let mut opts = OpenOptions::new();
opts.create(true).write(true).read(true);
#[cfg(target_os = "linux")]
{
use std::os::unix::fs::OpenOptionsExt;
opts.custom_flags(libc::O_DIRECT | libc::O_NOATIME);
}
let file = opts.open(&options.path)?;
if file.metadata().unwrap().is_file() {
tracing::warn!(
"{}\n{}\n{}",
"It seems a `DirectFileDevice` is used within a normal file system, which is inefficient.",
"Please use `DirectFileDevice` directly on a raw block device.",
"Or use `DirectFsDevice` within a normal file system.",
);
file.set_len(options.capacity as _)?;
}
let file = Arc::new(file);
Ok(Self {
file,
capacity: options.capacity,
region_size: options.region_size,
runtime,
})
}
#[fastrace::trace(name = "foyer::storage::device::direct_file::write")]
async fn write(&self, buf: IoBytes, region: RegionId, offset: u64) -> Result<()> {
let aligned = buf.as_aligned().len();
assert!(
offset as usize + aligned <= self.region_size(),
"offset ({offset}) + aligned ({aligned}) = total ({total}) <= region size ({region_size})",
total = offset as usize + aligned,
region_size = self.region_size(),
);
let poffset = offset + region as u64 * self.region_size as u64;
self.pwrite(buf, poffset).await
}
#[fastrace::trace(name = "foyer::storage::device::direct_file::read")]
async fn read(&self, region: RegionId, offset: u64, len: usize) -> Result<IoBytesMut> {
bits::assert_aligned(self.align() as u64, offset);
let aligned = bits::align_up(self.align(), len);
assert!(
offset as usize + aligned <= self.region_size(),
"offset ({offset}) + aligned ({aligned}) = total ({total}) <= region size ({region_size})",
total = offset as usize + aligned,
region_size = self.region_size(),
);
let poffset = offset + region as u64 * self.region_size as u64;
self.pread(poffset, len).await
}
#[fastrace::trace(name = "foyer::storage::device::direct_file::flush")]
async fn flush(&self, _: Option<RegionId>) -> Result<()> {
let file = self.file.clone();
asyncify_with_runtime(self.runtime.write(), move || file.sync_all().map_err(Error::from)).await
}
}
#[derive(Debug)]
pub struct DirectFileDeviceOptionsBuilder {
path: PathBuf,
capacity: Option<usize>,
region_size: Option<usize>,
}
impl DirectFileDeviceOptionsBuilder {
const DEFAULT_FILE_SIZE: usize = 64 * 1024 * 1024;
pub fn new(path: impl AsRef<Path>) -> Self {
Self {
path: path.as_ref().into(),
capacity: None,
region_size: None,
}
}
pub fn with_capacity(mut self, capacity: usize) -> Self {
self.capacity = Some(capacity);
self
}
pub fn with_region_size(mut self, region_size: usize) -> Self {
self.region_size = Some(region_size);
self
}
pub fn build(self) -> DirectFileDeviceOptions {
let path = self.path;
let align_v = |value: usize, align: usize| value - value % align;
let capacity = self.capacity.unwrap_or({
let dir = path.parent().expect("path must point to a file").to_path_buf();
create_dir_all(&dir).unwrap();
free_space(&dir).unwrap() as usize / 10 * 8
});
let capacity = align_v(capacity, ALIGN);
let region_size = self.region_size.unwrap_or(Self::DEFAULT_FILE_SIZE).min(capacity);
let region_size = align_v(region_size, ALIGN);
let capacity = align_v(capacity, region_size);
DirectFileDeviceOptions {
path,
capacity,
region_size,
}
}
}
#[cfg(test)]
mod tests {
use itertools::repeat_n;
use super::*;
#[test_log::test]
fn test_options_builder() {
let dir = tempfile::tempdir().unwrap();
let options = DirectFileDeviceOptionsBuilder::new(dir.path().join("test-direct-file")).build();
tracing::debug!("{options:?}");
options.verify().unwrap();
}
#[test_log::test]
fn test_options_builder_noent() {
let dir = tempfile::tempdir().unwrap();
let options = DirectFileDeviceOptionsBuilder::new(dir.path().join("noent").join("test-direct-file")).build();
tracing::debug!("{options:?}");
options.verify().unwrap();
}
#[test_log::test(tokio::test)]
async fn test_direct_file_device_io() {
let dir = tempfile::tempdir().unwrap();
let runtime = Runtime::current();
let options = DirectFileDeviceOptionsBuilder::new(dir.path().join("test-direct-file"))
.with_capacity(4 * 1024 * 1024)
.with_region_size(1024 * 1024)
.build();
tracing::debug!("{options:?}");
let device = DirectFileDevice::open(options.clone(), runtime.clone()).await.unwrap();
let mut buf = IoBytesMut::with_capacity(64 * 1024);
buf.extend(repeat_n(b'x', 64 * 1024 - 100));
let buf = buf.freeze();
device.write(buf.clone(), 0, 4096).await.unwrap();
let b = device.read(0, 4096, 64 * 1024 - 100).await.unwrap().freeze();
assert_eq!(buf, b);
device.flush(None).await.unwrap();
drop(device);
let device = DirectFileDevice::open(options, runtime).await.unwrap();
let b = device.read(0, 4096, 64 * 1024 - 100).await.unwrap().freeze();
assert_eq!(buf, b);
}
}