use io_uring::{IoUring, cqueue::Entry, opcode, squeue, types};
use std::{
alloc::{Layout, alloc_zeroed, dealloc},
cell::UnsafeCell,
fs::File,
io::{self},
os::fd::AsRawFd,
sync::Arc,
};
#[allow(unused_imports)]
use crate::ONE_MEGABYTE_BLOCK;
const FOUR_KB_PAGE: usize = 4096;
pub type SubmitQueueEntry = UnsafeCell<Option<squeue::Entry>>;
pub trait FlushableBuffer {
fn buffer_data(&self) -> &[u8];
fn offset(&self) -> u64;
fn user_data(&self) -> u64;
fn submit_entry(&self) -> &SubmitQueueEntry;
}
#[allow(unused)]
const SERIALIZED_ORDERING: u8 = 0;
#[allow(unused)]
const LOCALIZED_WRITES: u8 = 1;
pub type SharedAsyncFileWriter = Arc<parking_lot::Mutex<IoUring>>;
#[derive(Clone, Copy, Debug)]
pub enum WriteMode {
TailLocalizedWrites,
SerializedWrites,
}
pub(crate) struct BackingStore {
store: Arc<File>,
flusher: SharedAsyncFileWriter,
mode: WriteMode,
}
impl BackingStore {
pub fn new(io_uring: SharedAsyncFileWriter, file_handle: Arc<File>, mode: WriteMode) -> Self {
Self {
flusher: io_uring,
store: file_handle,
mode,
}
}
pub fn submit(
&self,
buffer_data: &[u8],
at: u64,
buffer_ptr: u64,
submit_entry: &SubmitQueueEntry,
) -> io::Result<()> {
let flags = match self.mode {
WriteMode::TailLocalizedWrites => squeue::Flags::empty(),
WriteMode::SerializedWrites => squeue::Flags::IO_LINK,
};
let sqe = opcode::Write::new(
types::Fd(self.store.as_raw_fd()),
buffer_data.as_ptr(),
buffer_data.len() as u32,
)
.offset(at)
.build()
.flags(flags)
.user_data(buffer_ptr);
let mut ring = self.flusher.lock();
unsafe {
ring.submission()
.push(&sqe)
.map_err(|_| io::Error::other("SQ full"))?;
*submit_entry.get() = Some(sqe);
}
ring.submit()?;
Ok(())
}
pub(crate) fn io_instance__(
&self,
) -> parking_lot::lock_api::MutexGuard<'_, parking_lot::RawMutex, IoUring> {
let flusher_ring = self.flusher.lock();
flusher_ring
}
}
pub enum QuikIO {
#[allow(private_interfaces)]
Searalized(BackingStore),
#[allow(private_interfaces)]
TailLocalized(BackingStore),
}
impl QuikIO {
pub fn link(file: Arc<File>) -> Self {
let io_uring = Arc::new(parking_lot::Mutex::new(io_uring::IoUring::new(8).unwrap()));
QuikIO::Searalized(BackingStore::new(
io_uring,
file,
WriteMode::SerializedWrites,
))
}
pub fn new(file: Arc<File>) -> Self {
let io_uring = Arc::new(parking_lot::Mutex::new(io_uring::IoUring::new(8).unwrap()));
QuikIO::TailLocalized(BackingStore::new(
io_uring,
file,
WriteMode::TailLocalizedWrites,
))
}
pub fn submit_buffer<B: FlushableBuffer>(&self, buffer: &B) {
let buffer_data = buffer.buffer_data();
let at = buffer.offset();
let user_data = buffer.user_data();
let submit_entry = buffer.submit_entry();
self.submit_buffer_raw(buffer_data, at, user_data, submit_entry);
}
pub fn submit_buffer_raw(
&self,
buffer_data: &[u8],
at: u64,
user_data: u64,
submit_entry: &SubmitQueueEntry,
) {
match self {
QuikIO::Searalized(a) | QuikIO::TailLocalized(a) => {
let _ = a.submit(buffer_data, at, user_data, submit_entry);
}
}
}
pub fn sync_data(&self) -> io::Result<()> {
let backing_store = self.get_backing_store();
let mut ring = backing_store.flusher.lock();
let sqe = opcode::Fsync::new(types::Fd(backing_store.store.as_raw_fd()))
.flags(types::FsyncFlags::DATASYNC)
.build()
.user_data(0)
.flags(squeue::Flags::IO_DRAIN);
unsafe {
ring.submission()
.push(&sqe)
.map_err(|_| io::Error::other("SQ full"))?;
}
ring.submit_and_wait(1)?;
Ok(())
}
pub fn wait_for_all(&self) -> io::Result<()> {
loop {
let cqes = self.cqe();
for cqe in &cqes {
if cqe.result() < 0 {
return Err(io::Error::from_raw_os_error(-cqe.result()));
}
}
let mut ring = self.ring();
if ring.submission().len() == 0 && ring.completion().len() == 0 {
break;
}
std::thread::yield_now();
}
Ok(())
}
pub fn cqe(&self) -> Vec<io_uring::cqueue::Entry> {
let mut entries = Vec::new();
let mut io = self.ring();
{
let mut cq = io.completion();
cq.sync();
while let Some(cqe) = cq.next() {
entries.push(cqe);
}
}
entries
}
pub fn ring(&self) -> parking_lot::lock_api::MutexGuard<'_, parking_lot::RawMutex, IoUring> {
match self {
QuikIO::Searalized(appender) | QuikIO::TailLocalized(appender) => {
appender.io_instance__()
}
}
}
pub fn retry_sqe(&self, sqe: &squeue::Entry) -> io::Result<()> {
let backing_store = self.get_backing_store();
let mut ring = backing_store.flusher.lock();
unsafe {
ring.submission()
.push(sqe)
.map_err(|_| io::Error::other("SQ full"))?;
}
ring.submit()?;
Ok(())
}
pub fn read(&self, ptr: *mut u8, len: usize, offset: u64) -> io::Result<()> {
use io_uring::{opcode, types};
use std::os::fd::AsRawFd;
let aligned_offset = offset & !(FOUR_KB_PAGE as u64 - 1);
let delta = (offset - aligned_offset) as usize;
let aligned_len = (len + delta).next_multiple_of(FOUR_KB_PAGE);
let layout = Layout::from_size_align(aligned_len, FOUR_KB_PAGE).unwrap();
let aligned_ptr = unsafe { alloc_zeroed(layout) };
assert!(!aligned_ptr.is_null());
let backing_store = self.get_backing_store();
let sqe = opcode::Read::new(
types::Fd(backing_store.store.as_raw_fd()),
aligned_ptr,
aligned_len as u32,
)
.offset(aligned_offset)
.build();
let mut ring = self.ring();
unsafe {
ring.submission()
.push(&sqe)
.map_err(|_| io::Error::other("submission queue full"))?;
}
ring.submit_and_wait(1)?;
let cqe = ring
.completion()
.next()
.ok_or_else(|| io::Error::other("no completion"))?;
drop(ring);
if cqe.result() < 0 {
unsafe { dealloc(aligned_ptr, layout) };
return Err(io::Error::from_raw_os_error(-cqe.result()));
}
unsafe {
std::ptr::copy_nonoverlapping(aligned_ptr.add(delta), ptr, len);
dealloc(aligned_ptr, layout);
}
Ok(())
}
fn get_backing_store(&self) -> &BackingStore {
match self {
QuikIO::Searalized(backing_store) | QuikIO::TailLocalized(backing_store) => {
backing_store
}
}
}
}
#[cfg(test)]
pub mod test {
use tempfile::NamedTempFile;
use crate::{FlushBuffer, quik_io::QuikIO};
use std::{io::Write, sync::Arc};
#[test]
fn quickio_read_basic() {
let mut temp_file = NamedTempFile::new().unwrap();
let test_data = b"Hello, QuickIO read test!";
temp_file.write_all(test_data).unwrap();
temp_file.flush().unwrap();
let file = Arc::new(temp_file.as_file().try_clone().unwrap());
let quickio = QuikIO::new(file);
let mut buffer = vec![0u8; test_data.len()];
quickio.read(buffer.as_mut_ptr(), buffer.len(), 0).unwrap();
assert_eq!(&buffer, test_data);
}
#[test]
fn quickio_read_with_offset() {
let mut temp_file = NamedTempFile::new().unwrap();
let test_data = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
temp_file.write_all(test_data).unwrap();
temp_file.flush().unwrap();
let file = Arc::new(temp_file.as_file().try_clone().unwrap());
let quickio = QuikIO::new(file);
let mut buffer = vec![0u8; 10];
quickio.read(buffer.as_mut_ptr(), 10, 5).unwrap();
let expected = &test_data[5..15];
assert_eq!(&buffer, expected);
}
#[test]
fn quickio_read_unaligned_offset() {
let mut temp_file = NamedTempFile::new().unwrap();
let test_data = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
temp_file.write_all(test_data).unwrap();
temp_file.flush().unwrap();
let file = Arc::new(temp_file.as_file().try_clone().unwrap());
let quickio = QuikIO::new(file);
let mut buffer = vec![0u8; 5];
quickio.read(buffer.as_mut_ptr(), 5, 7).unwrap();
let expected = &test_data[7..12];
assert_eq!(&buffer, expected);
}
#[test]
fn read_write_test() {
let temp_file = NamedTempFile::new().unwrap();
let file = Arc::new(temp_file.as_file().try_clone().unwrap());
let quickio = QuikIO::new(file);
let expected: Vec<[u8; 4096]> = vec![
[0u8; 4096],
[1u8; 4096],
[2u8; 4096],
[3u8; 4096],
[4u8; 4096],
];
let buffers: Vec<FlushBuffer> = (0..expected.len())
.map(|i| {
let buf = FlushBuffer::default();
buf.set_address(i * 4096).expect("set_address failed");
buf
})
.collect();
for (buf, data) in buffers.iter().zip(expected.iter()) {
let offset = buf
.increment_offset(data.len())
.expect("increment_offset failed");
buf.write(offset, data);
quickio.submit_buffer(buf);
}
quickio.sync_data().unwrap();
quickio.wait_for_all().unwrap();
for (i, check_against) in expected.iter().enumerate() {
let mut read_buffer = vec![0u8; 4096];
let byte_offset = (i * 4096) as u64;
quickio
.read(read_buffer.as_mut_ptr(), 4096, byte_offset)
.unwrap();
assert_eq!(&read_buffer[..], check_against, "slot {i} mismatch");
}
}
}