#![allow(clippy::manual_async_fn)]
use std::future::Future;
use std::path::PathBuf;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, Ordering};
use ringline::{AsyncEventHandler, Config, ConnCtx, DirectIoConfig, RinglineBuilder};
fn direct_io_test_config() -> Config {
let mut config = Config::default();
config.worker.threads = 1;
config.worker.pin_to_core = false;
config.sq_entries = 64;
config.recv_buffer.ring_size = 64;
config.recv_buffer.buffer_size = 4096;
config.max_connections = 8;
config.send_copy_count = 8;
config.tick_timeout_us = 1000; config.direct_io = Some(DirectIoConfig {
max_files: 4,
max_commands_in_flight: 32,
});
config
}
fn temp_file_path(name: &str) -> PathBuf {
std::env::current_dir().unwrap().join(name)
}
#[cfg(target_os = "linux")]
fn io_uring_supported() -> bool {
let ret = unsafe { libc::syscall(libc::SYS_io_uring_setup, 1u32, std::ptr::null_mut::<u8>()) };
ret != -1 || std::io::Error::last_os_error().raw_os_error() != Some(libc::ENOSYS)
}
#[cfg(not(target_os = "linux"))]
fn io_uring_supported() -> bool {
false
}
#[cfg(target_os = "linux")]
fn o_direct_supported() -> bool {
let path = temp_file_path(".krio_direct_io_probe");
let c_path = std::ffi::CString::new(path.to_str().unwrap()).unwrap();
let fd = unsafe {
libc::open(
c_path.as_ptr(),
libc::O_CREAT | libc::O_RDWR | libc::O_DIRECT,
0o644,
)
};
if fd >= 0 {
unsafe {
libc::close(fd);
libc::unlink(c_path.as_ptr());
}
true
} else {
let _ = std::fs::remove_file(&path);
false
}
}
#[cfg(not(target_os = "linux"))]
fn o_direct_supported() -> bool {
true
}
static ROUNDTRIP_DONE: AtomicBool = AtomicBool::new(false);
static ROUNDTRIP_OK: AtomicBool = AtomicBool::new(false);
static ROUNDTRIP_ERR: OnceLock<String> = OnceLock::new();
struct RoundtripTickHandler;
impl AsyncEventHandler for RoundtripTickHandler {
fn on_accept(&self, _conn: ConnCtx) -> impl Future<Output = ()> + 'static {
async {}
}
fn on_tick(&mut self, ctx: &mut ringline::DriverCtx) {
static STARTED: AtomicBool = AtomicBool::new(false);
if STARTED.swap(true, Ordering::AcqRel) {
return;
}
let path = temp_file_path(".krio_direct_io_roundtrip_test");
if let Err(e) = std::fs::write(&path, [0u8; 4096]) {
let _ = ROUNDTRIP_ERR.set(format!("file create failed: {e}"));
ROUNDTRIP_DONE.store(true, Ordering::Release);
ctx.request_shutdown();
return;
}
let path_str = path.to_str().unwrap();
match ctx.open_direct_io_file(path_str) {
Ok(_file) => {
ROUNDTRIP_OK.store(true, Ordering::Release);
ROUNDTRIP_DONE.store(true, Ordering::Release);
ctx.request_shutdown();
}
Err(e) => {
let _ = ROUNDTRIP_ERR.set(format!("open failed: {e}"));
ROUNDTRIP_DONE.store(true, Ordering::Release);
ctx.request_shutdown();
}
}
}
fn create_for_worker(_id: usize) -> Self {
RoundtripTickHandler
}
}
#[test]
fn direct_io_write_fsync_read_roundtrip() {
if ringline::backend() == ringline::Backend::IoUring && !io_uring_supported() {
eprintln!("SKIP: io_uring not supported on this kernel");
return;
}
if !o_direct_supported() {
eprintln!("SKIP: O_DIRECT not supported on this filesystem");
return;
}
ROUNDTRIP_DONE.store(false, Ordering::Release);
ROUNDTRIP_OK.store(false, Ordering::Release);
let (_shutdown, handles) = RinglineBuilder::new(direct_io_test_config())
.launch::<RoundtripTickHandler>()
.expect("launch failed");
for h in handles {
h.join().unwrap().unwrap();
}
let path = temp_file_path(".krio_direct_io_roundtrip_test");
let _ = std::fs::remove_file(&path);
if let Some(err) = ROUNDTRIP_ERR.get() {
panic!("direct I/O roundtrip failed: {err}");
}
assert!(
ROUNDTRIP_DONE.load(Ordering::Acquire),
"test did not complete"
);
assert!(
ROUNDTRIP_OK.load(Ordering::Acquire),
"data verification failed"
);
}
static MULTI_FILE_DONE: AtomicBool = AtomicBool::new(false);
static MULTI_FILE_OK: AtomicBool = AtomicBool::new(false);
static MULTI_FILE_ERR: OnceLock<String> = OnceLock::new();
struct MultiFileTickHandler;
impl AsyncEventHandler for MultiFileTickHandler {
fn on_accept(&self, _conn: ConnCtx) -> impl Future<Output = ()> + 'static {
async {}
}
fn on_tick(&mut self, ctx: &mut ringline::DriverCtx) {
static STARTED: AtomicBool = AtomicBool::new(false);
if STARTED.swap(true, Ordering::AcqRel) {
return;
}
let mut paths = Vec::new();
for i in 0..3 {
let path = temp_file_path(&format!(".krio_direct_io_multi_{i}"));
if let Err(e) = std::fs::write(&path, [0u8; 4096]) {
let _ = MULTI_FILE_ERR.set(format!("file create failed: {e}"));
MULTI_FILE_DONE.store(true, Ordering::Release);
ctx.request_shutdown();
return;
}
paths.push(path);
}
let mut files = Vec::new();
for path in &paths {
let path_str = path.to_str().unwrap();
match ctx.open_direct_io_file(path_str) {
Ok(file) => files.push(file),
Err(e) => {
let _ = MULTI_FILE_ERR.set(format!("open failed: {e}"));
MULTI_FILE_DONE.store(true, Ordering::Release);
ctx.request_shutdown();
return;
}
}
}
for file in &files {
let _ = ctx.close_direct_io_file(*file);
}
MULTI_FILE_OK.store(true, Ordering::Release);
MULTI_FILE_DONE.store(true, Ordering::Release);
ctx.request_shutdown();
}
fn create_for_worker(_id: usize) -> Self {
MultiFileTickHandler
}
}
#[test]
fn direct_io_multiple_files() {
if ringline::backend() == ringline::Backend::IoUring && !io_uring_supported() {
eprintln!("SKIP: io_uring not supported on this kernel");
return;
}
if !o_direct_supported() {
eprintln!("SKIP: O_DIRECT not supported on this filesystem");
return;
}
MULTI_FILE_DONE.store(false, Ordering::Release);
MULTI_FILE_OK.store(false, Ordering::Release);
let (_shutdown, handles) = RinglineBuilder::new(direct_io_test_config())
.launch::<MultiFileTickHandler>()
.expect("launch failed");
for h in handles {
h.join().unwrap().unwrap();
}
for i in 0..3 {
let path = temp_file_path(&format!(".krio_direct_io_multi_{i}"));
let _ = std::fs::remove_file(&path);
}
if let Some(err) = MULTI_FILE_ERR.get() {
panic!("multi-file test failed: {err}");
}
assert!(MULTI_FILE_DONE.load(Ordering::Acquire));
assert!(MULTI_FILE_OK.load(Ordering::Acquire));
}