#![cfg(all(target_os = "linux", feature = "async"))]
#![allow(dead_code)]
use crate::async_io::completion_driver::{AsyncIoUring, Op};
use crate::{Error, Result};
use std::os::fd::RawFd;
use tokio::sync::oneshot;
pub(crate) async fn write_at_native(
ring: &AsyncIoUring,
fd: RawFd,
buf: &[u8],
offset: u64,
) -> Result<usize> {
let buf_ptr = buf.as_ptr() as usize;
let buf_len = buf.len();
let (tx, rx) = oneshot::channel::<i32>();
let op = Op::Write {
fd,
buf_ptr,
buf_len,
offset,
reply: tx,
};
let code = ring.submit(op, rx).await?;
decode_io_result(code).map(|n| n as usize)
}
pub(crate) async fn read_at_native(
ring: &AsyncIoUring,
fd: RawFd,
buf: &mut [u8],
offset: u64,
) -> Result<usize> {
let buf_ptr = buf.as_mut_ptr() as usize;
let buf_len = buf.len();
let (tx, rx) = oneshot::channel::<i32>();
let op = Op::Read {
fd,
buf_ptr,
buf_len,
offset,
reply: tx,
};
let code = ring.submit(op, rx).await?;
decode_io_result(code).map(|n| n as usize)
}
pub(crate) async fn fdatasync_native(ring: &AsyncIoUring, fd: RawFd) -> Result<()> {
let (tx, rx) = oneshot::channel::<i32>();
let op = Op::Fdatasync { fd, reply: tx };
let code = ring.submit(op, rx).await?;
let _result_byte_count = decode_io_result(code)?;
Ok(())
}
fn decode_io_result(code: i32) -> Result<i32> {
if code < 0 {
Err(Error::Io(std::io::Error::from_raw_os_error(-code)))
} else {
Ok(code)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::OpenOptions;
use std::os::fd::AsRawFd;
use std::sync::atomic::{AtomicU32, Ordering};
static C: AtomicU32 = AtomicU32::new(0);
fn tmp_path(tag: &str) -> std::path::PathBuf {
let n = C.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!(
"fsys_substrate_test_{}_{}_{}",
std::process::id(),
n,
tag
))
}
fn ring_or_skip() -> Option<AsyncIoUring> {
AsyncIoUring::new(8).ok()
}
struct Cleanup(std::path::PathBuf);
impl Drop for Cleanup {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
async fn with_timeout<F, T>(fut: F) -> T
where
F: std::future::Future<Output = T>,
{
const TIMEOUT_SECS: u64 = 15;
match tokio::time::timeout(std::time::Duration::from_secs(TIMEOUT_SECS), fut).await {
Ok(v) => v,
Err(_) => panic!(
"test exceeded {TIMEOUT_SECS}s timeout — likely a hang in the async substrate"
),
}
}
#[tokio::test]
async fn write_at_native_round_trips() {
with_timeout(async {
let Some(ring) = ring_or_skip() else { return };
let path = tmp_path("write");
let _g = Cleanup(path.clone());
let f = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)
.unwrap();
let data = vec![0xA5u8; 4096];
let n = write_at_native(&ring, f.as_raw_fd(), &data, 0)
.await
.expect("write_at_native");
assert_eq!(n, data.len());
fdatasync_native(&ring, f.as_raw_fd())
.await
.expect("fdatasync_native");
drop(f);
let read_back = std::fs::read(&path).expect("read");
assert_eq!(read_back, data);
ring.shutdown().await;
})
.await;
}
#[tokio::test]
async fn read_at_native_round_trips() {
with_timeout(async {
let Some(ring) = ring_or_skip() else { return };
let path = tmp_path("read");
let _g = Cleanup(path.clone());
let data = vec![0x5Au8; 4096];
std::fs::write(&path, &data).unwrap();
let f = OpenOptions::new().read(true).open(&path).unwrap();
let mut buf = vec![0u8; 4096];
let n = read_at_native(&ring, f.as_raw_fd(), &mut buf, 0)
.await
.expect("read_at_native");
assert_eq!(n, data.len());
assert_eq!(buf, data);
ring.shutdown().await;
})
.await;
}
#[tokio::test]
async fn write_at_invalid_fd_returns_io_error() {
with_timeout(async {
let Some(ring) = ring_or_skip() else { return };
let data = vec![0u8; 64];
let result = write_at_native(&ring, -1, &data, 0).await;
assert!(matches!(result, Err(Error::Io(_))));
ring.shutdown().await;
})
.await;
}
#[tokio::test]
async fn concurrent_writes_complete_independently() {
with_timeout(async {
let Some(ring) = ring_or_skip() else { return };
let ring = std::sync::Arc::new(ring);
let path = tmp_path("concurrent");
let _g = Cleanup(path.clone());
std::fs::write(&path, vec![0u8; 16 * 4096]).unwrap();
let f = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.unwrap();
let fd = f.as_raw_fd();
let mut handles = Vec::new();
for i in 0..16usize {
let ring = ring.clone();
let payload = vec![i as u8; 4096];
handles.push(tokio::spawn(async move {
write_at_native(&ring, fd, &payload, (i * 4096) as u64)
.await
.expect("concurrent write")
}));
}
for h in handles {
assert_eq!(h.await.unwrap(), 4096);
}
fdatasync_native(&ring, fd).await.expect("fdatasync");
drop(f);
let bytes = std::fs::read(&path).unwrap();
for i in 0..16 {
let slice = &bytes[i * 4096..(i + 1) * 4096];
assert!(
slice.iter().all(|&b| b == i as u8),
"sector {i} content drift — concurrent submission broke ordering"
);
}
if let Some(r) = std::sync::Arc::into_inner(ring) {
r.shutdown().await;
}
})
.await;
}
#[tokio::test]
async fn writes_across_many_distinct_fds_complete_correctly() {
with_timeout(async {
let Some(ring) = ring_or_skip() else { return };
const N_FDS: usize = 20;
const PAYLOAD_LEN: usize = 256;
let mut paths = Vec::with_capacity(N_FDS);
let mut guards = Vec::with_capacity(N_FDS);
let mut files = Vec::with_capacity(N_FDS);
for i in 0..N_FDS {
let path = tmp_path(&format!("manyfds_{i:02}"));
guards.push(Cleanup(path.clone()));
let f = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)
.unwrap();
files.push(f);
paths.push(path);
}
for (i, f) in files.iter().enumerate() {
let payload = vec![i as u8; PAYLOAD_LEN];
let n = write_at_native(&ring, f.as_raw_fd(), &payload, 0)
.await
.expect("write_at_native");
assert_eq!(n, PAYLOAD_LEN, "fd {i}: short write");
fdatasync_native(&ring, f.as_raw_fd())
.await
.expect("fdatasync_native");
}
drop(files);
for (i, path) in paths.iter().enumerate() {
let bytes = std::fs::read(path).expect("read");
assert_eq!(
bytes.len(),
PAYLOAD_LEN,
"fd {i}: wrong file size on read-back"
);
assert!(
bytes.iter().all(|&b| b == i as u8),
"fd {i}: content drift — slot/fd mapping bug"
);
}
ring.shutdown().await;
})
.await;
}
#[tokio::test]
async fn repeated_writes_on_same_fd_round_trip() {
with_timeout(async {
let Some(ring) = ring_or_skip() else { return };
const N_WRITES: usize = 32;
const PAYLOAD_LEN: usize = 64;
let path = tmp_path("slot_cache");
let _g = Cleanup(path.clone());
let f = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&path)
.unwrap();
let fd = f.as_raw_fd();
std::fs::write(&path, vec![0u8; N_WRITES * PAYLOAD_LEN]).unwrap();
for i in 0..N_WRITES {
let payload = vec![(i & 0xFF) as u8; PAYLOAD_LEN];
let n = write_at_native(&ring, fd, &payload, (i * PAYLOAD_LEN) as u64)
.await
.expect("write_at_native");
assert_eq!(n, PAYLOAD_LEN, "iter {i}: short write");
}
fdatasync_native(&ring, fd).await.expect("fdatasync_native");
drop(f);
let bytes = std::fs::read(&path).unwrap();
assert_eq!(bytes.len(), N_WRITES * PAYLOAD_LEN);
for i in 0..N_WRITES {
let slice = &bytes[i * PAYLOAD_LEN..(i + 1) * PAYLOAD_LEN];
let expected = (i & 0xFF) as u8;
assert!(
slice.iter().all(|&b| b == expected),
"iter {i}: content drift (expected {expected}, got {:?}...)",
&slice[..4]
);
}
ring.shutdown().await;
})
.await;
}
}