use crate::io::glommio_file::GlommioFile;
use crate::io::read_result::ReadResult;
use crate::parking::Reactor;
use crate::sys::sysfs;
use crate::sys::{DmaBuffer, PollableStatus};
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::path::Path;
use std::rc::Rc;
pub(crate) fn align_up(v: u64, align: u64) -> u64 {
(v + align - 1) & !(align - 1)
}
pub(crate) fn align_down(v: u64, align: u64) -> u64 {
v & !(align - 1)
}
#[derive(Debug)]
pub struct DmaFile {
file: GlommioFile,
o_direct_alignment: u64,
pollable: PollableStatus,
}
impl DmaFile {
pub fn align_up(&self, v: u64) -> u64 {
align_up(v, self.o_direct_alignment)
}
pub fn align_down(&self, v: u64) -> u64 {
align_down(v, self.o_direct_alignment)
}
}
impl AsRawFd for DmaFile {
fn as_raw_fd(&self) -> RawFd {
self.file.as_raw_fd()
}
}
impl DmaFile {
pub fn is_same(&self, other: &DmaFile) -> bool {
self.file.is_same(&other.file)
}
async fn open_at(
dir: RawFd,
path: &Path,
flags: libc::c_int,
mode: libc::c_int,
) -> io::Result<DmaFile> {
let mut pollable = PollableStatus::Pollable;
let res = GlommioFile::open_at(dir, path, flags, mode).await;
let file = match res {
Err(os_err) => {
if os_err.raw_os_error().unwrap() == libc::EINVAL {
pollable = PollableStatus::NonPollable;
GlommioFile::open_at(dir, path, flags & !libc::O_DIRECT, mode).await
} else {
Err(os_err)
}
}
Ok(res) => Ok(res),
}?;
if sysfs::BlockDevice::is_md(file.dev_major as _, file.dev_minor as _) {
pollable = PollableStatus::NonPollable;
}
Ok(DmaFile {
file,
o_direct_alignment: 4096,
pollable,
})
}
pub fn alloc_dma_buffer(size: usize) -> DmaBuffer {
Reactor::get().alloc_dma_buffer(size)
}
pub async fn create<P: AsRef<Path>>(path: P) -> io::Result<DmaFile> {
let flags =
libc::O_DIRECT | libc::O_CLOEXEC | libc::O_CREAT | libc::O_TRUNC | libc::O_WRONLY;
let res = DmaFile::open_at(-1 as _, path.as_ref(), flags, 0o644).await;
let mut f = enhanced_try!(res, "Creating", Some(path.as_ref()), None)?;
f.o_direct_alignment = 4096;
Ok(f)
}
pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<DmaFile> {
let flags = libc::O_DIRECT | libc::O_CLOEXEC | libc::O_RDONLY;
let res = DmaFile::open_at(-1 as _, path.as_ref(), flags, 0o644).await;
let mut f = enhanced_try!(res, "Opening", Some(path.as_ref()), None)?;
f.o_direct_alignment = 512;
Ok(f)
}
pub async fn write_at(&self, buf: DmaBuffer, pos: u64) -> io::Result<usize> {
let source = Reactor::get().write_dma(self.as_raw_fd(), buf, pos, self.pollable);
enhanced_try!(source.collect_rw().await, "Writing", self.file)
}
pub async fn read_at_aligned(&self, pos: u64, size: usize) -> io::Result<ReadResult> {
let mut source = Reactor::get().read_dma(self.as_raw_fd(), pos, size, self.pollable);
let read_size = enhanced_try!(source.collect_rw().await, "Reading", self.file)?;
let mut buffer = source.extract_dma_buffer();
buffer.trim_to_size(read_size);
Ok(ReadResult::from_whole_buffer(buffer))
}
pub async fn read_at(&self, pos: u64, size: usize) -> io::Result<ReadResult> {
let eff_pos = self.align_down(pos);
let b = (pos - eff_pos) as usize;
let eff_size = self.align_up((size + b) as u64) as usize;
let mut source =
Reactor::get().read_dma(self.as_raw_fd(), eff_pos, eff_size, self.pollable);
let read_size = enhanced_try!(source.collect_rw().await, "Reading", self.file)?;
let mut buffer = source.extract_dma_buffer();
buffer.trim_front(b);
buffer.trim_to_size(std::cmp::min(read_size, size));
Ok(ReadResult::from_whole_buffer(buffer))
}
pub async fn fdatasync(&self) -> io::Result<()> {
self.file.fdatasync().await
}
pub async fn pre_allocate(&self, size: u64) -> io::Result<()> {
self.file.pre_allocate(size).await
}
pub async fn hint_extent_size(&self, size: usize) -> nix::Result<i32> {
self.file.hint_extent_size(size).await
}
pub async fn truncate(&self, size: u64) -> io::Result<()> {
self.file.truncate(size).await
}
pub async fn rename<P: AsRef<Path>>(&mut self, new_path: P) -> io::Result<()> {
self.file.rename(new_path).await
}
pub async fn remove(&self) -> io::Result<()> {
self.file.remove().await
}
pub async fn file_size(&self) -> io::Result<u64> {
self.file.file_size().await
}
pub async fn close(self) -> io::Result<()> {
self.file.close().await
}
pub(crate) async fn close_rc(self: Rc<DmaFile>) -> io::Result<()> {
match Rc::try_unwrap(self) {
Err(file) => Err(io::Error::new(
io::ErrorKind::Other,
format!("{} references to file still held", Rc::strong_count(&file)),
)),
Ok(file) => file.close().await,
}
}
}
#[cfg(test)]
pub(crate) mod test {
use super::*;
use crate::Local;
use std::path::PathBuf;
#[derive(Copy, Clone)]
pub(crate) enum TestDirectoryKind {
TempFs,
StorageMedia,
}
pub(crate) struct TestDirectory {
pub(crate) path: PathBuf,
pub(crate) kind: TestDirectoryKind,
}
impl Drop for TestDirectory {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}
#[cfg(test)]
pub(crate) fn make_test_directories(test_name: &str) -> std::vec::Vec<TestDirectory> {
let mut vec = Vec::new();
match std::env::var("SCIPIO_TEST_POLLIO_ROOTDIR") {
Err(_) => {
eprintln!(
"Glommio currently only supports NVMe-backed volumes formatted with XFS \
or EXT4. To run poll io-related tests, please set SCIPIO_TEST_POLLIO_ROOTDIR to a \
NVMe-backed directory path in your environment.\nPoll io tests will not run."
);
}
Ok(path) => {
let mut dir = PathBuf::from(path);
std::assert!(dir.exists());
dir.push(test_name);
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
vec.push(TestDirectory {
path: dir,
kind: TestDirectoryKind::StorageMedia,
})
}
};
let mut dir = std::env::temp_dir();
dir.push(test_name);
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
vec.push(TestDirectory {
path: dir,
kind: TestDirectoryKind::TempFs,
});
return vec;
}
macro_rules! dma_file_test {
( $name:ident, $dir:ident, $kind:ident, $code:block) => {
#[test]
fn $name() {
for dir in make_test_directories(&format!("dma-{}", stringify!($name))) {
let $dir = dir.path.clone();
let $kind = dir.kind;
test_executor!(async move { $code });
}
}
};
}
dma_file_test!(fallback_drop_closes_the_file, path, _k, {
let fd;
{
let file = DmaFile::create(path.join("testfile"))
.await
.expect("failed to create file");
fd = file.as_raw_fd();
std::fs::remove_file(path.join("testfile")).unwrap();
}
assert!(fd != -1);
let ret = unsafe { libc::close(fd) };
assert_eq!(ret, -1);
let err = std::io::Error::last_os_error().raw_os_error().unwrap();
assert_eq!(err, libc::EBADF);
});
dma_file_test!(file_create_close, path, _k, {
let new_file = DmaFile::create(path.join("testfile"))
.await
.expect("failed to create file");
new_file.close().await.expect("failed to close file");
std::assert!(path.join("testfile").exists());
});
dma_file_test!(file_open, path, _k, {
let new_file = DmaFile::create(path.join("testfile"))
.await
.expect("failed to create file");
new_file.close().await.expect("failed to close file");
let file = DmaFile::open(path.join("testfile"))
.await
.expect("failed to open file");
file.close().await.expect("failed to close file");
std::assert!(path.join("testfile").exists());
});
dma_file_test!(file_open_nonexistent, path, _k, {
DmaFile::open(path.join("testfile"))
.await
.expect_err("opened nonexistent file");
std::assert!(!path.join("testfile").exists());
});
dma_file_test!(file_rename, path, _k, {
let mut new_file = DmaFile::create(path.join("testfile"))
.await
.expect("failed to create file");
new_file
.rename(path.join("testfile2"))
.await
.expect("failed to rename file");
std::assert!(!path.join("testfile").exists());
std::assert!(path.join("testfile2").exists());
new_file.close().await.expect("failed to close file");
});
dma_file_test!(file_rename_noop, path, _k, {
let mut new_file = DmaFile::create(path.join("testfile"))
.await
.expect("failed to create file");
new_file
.rename(path.join("testfile"))
.await
.expect("failed to rename file");
std::assert!(path.join("testfile").exists());
new_file.close().await.expect("failed to close file");
});
dma_file_test!(file_fallocate_alocatee, path, kind, {
let new_file = DmaFile::create(path.join("testfile"))
.await
.expect("failed to create file");
let res = new_file.pre_allocate(4096).await;
if let TestDirectoryKind::TempFs = kind {
res.expect_err("fallocate should error on tmpfs");
return;
}
res.expect("fallocate failed");
std::assert_eq!(
new_file.file_size().await.unwrap(),
4096,
"file doesn't have expected size"
);
let metadata = std::fs::metadata(path.join("testfile")).unwrap();
std::assert_eq!(metadata.len(), 4096);
new_file.pre_allocate(2048).await.expect("fallocate failed");
std::assert_eq!(
new_file.file_size().await.unwrap(),
4096,
"file doesn't have expected size"
);
let metadata = std::fs::metadata(path.join("testfile")).unwrap();
std::assert_eq!(metadata.len(), 4096);
new_file.close().await.expect("failed to close file");
});
dma_file_test!(file_fallocate_zero, path, _k, {
let new_file = DmaFile::create(path.join("testfile"))
.await
.expect("failed to create file");
new_file
.pre_allocate(0)
.await
.expect_err("fallocate should fail with len == 0");
new_file.close().await.expect("failed to close file");
});
dma_file_test!(file_simple_readwrite, path, _k, {
let new_file = DmaFile::create(path.join("testfile"))
.await
.expect("failed to create file");
let mut buf = DmaBuffer::new(4096).expect("failed to allocate dma buffer");
buf.memset(42);
let res = new_file.write_at(buf, 0).await.expect("failed to write");
assert_eq!(res, 4096);
new_file.close().await.expect("failed to close file");
let new_file = DmaFile::open(path.join("testfile"))
.await
.expect("failed to create file");
let read_buf = new_file.read_at(0, 500).await.expect("failed to read");
std::assert_eq!(read_buf.len(), 500);
for i in 0..read_buf.len() {
std::assert_eq!(read_buf.as_bytes()[i], 42);
}
let read_buf = new_file
.read_at_aligned(0, 4096)
.await
.expect("failed to read");
std::assert_eq!(read_buf.len(), 4096);
for i in 0..read_buf.len() {
std::assert_eq!(read_buf.as_bytes()[i], 42);
}
new_file.close().await.expect("failed to close file");
});
dma_file_test!(file_invalid_readonly_write, path, _k, {
let file = std::fs::File::create(path.join("testfile")).expect("failed to create file");
let mut perms = file
.metadata()
.expect("failed to fetch metadata")
.permissions();
perms.set_readonly(true);
file.set_permissions(perms)
.expect("failed to update file permissions");
let new_file = DmaFile::open(path.join("testfile"))
.await
.expect("open failed");
let buf = DmaBuffer::new(4096).expect("failed to allocate dma buffer");
new_file
.write_at(buf, 0)
.await
.expect_err("writes to read-only files should fail");
new_file
.pre_allocate(4096)
.await
.expect_err("pre allocating read-only files should fail");
new_file.close().await.expect("failed to close file");
});
dma_file_test!(file_empty_read, path, _k, {
std::fs::File::create(path.join("testfile")).expect("failed to create file");
let new_file = DmaFile::open(path.join("testfile"))
.await
.expect("failed to open file");
let buf = new_file.read_at(0, 512).await.expect("failed to read");
std::assert_eq!(buf.len(), 0);
new_file.close().await.expect("failed to close file");
});
dma_file_test!(cancellation_doest_crash_futures_not_polled, path, _k, {
let file = DmaFile::create(path.join("testfile"))
.await
.expect("failed to create file");
let size: usize = 4096;
file.truncate(size as u64).await.unwrap();
let mut futs = vec![];
for _ in 0..200 {
let mut buf = DmaFile::alloc_dma_buffer(size);
let bytes = buf.as_bytes_mut();
bytes[0] = 'x' as u8;
let f = file.write_at(buf, 0);
futs.push(f);
}
let mut all = join_all(futs);
let _ = futures::poll!(&mut all);
drop(all);
file.close().await.unwrap();
});
dma_file_test!(cancellation_doest_crash_futures_polled, p, _k, {
let mut handles = vec![];
for i in 0..200 {
let path = p.clone();
handles.push(
Local::local(async move {
let mut path = path.join("testfile");
path.set_extension(i.to_string());
let file = DmaFile::create(&path).await.expect("failed to create file");
let size: usize = 4096;
file.truncate(size as u64).await.unwrap();
let mut buf = DmaFile::alloc_dma_buffer(size);
let bytes = buf.as_bytes_mut();
bytes[0] = 'x' as u8;
file.write_at(buf, 0).await.unwrap();
file.close().await.unwrap();
})
.detach(),
);
}
for h in &handles {
h.cancel();
}
for h in handles {
h.await;
}
});
dma_file_test!(is_same_file, path, _k, {
let wfile = DmaFile::create(path.join("testfile")).await.unwrap();
let rfile = DmaFile::open(path.join("testfile")).await.unwrap();
let wfile_other = DmaFile::create(path.join("testfile_other")).await.unwrap();
assert_ne!(wfile.as_raw_fd(), rfile.as_raw_fd());
assert!(wfile.is_same(&rfile));
assert!(!wfile.is_same(&wfile_other));
wfile.close().await.unwrap();
wfile_other.close().await.unwrap();
rfile.close().await.unwrap();
});
}