use io_uring::{opcode, squeue, types, IoUring};
#[allow(unused_imports)]
use crate::log_structured_store::LogStructuredStore;
#[allow(unused_imports)]
use crate::flush_buffer::RING_SIZE;
use crate::{flush_buffer::FlushBuffer, log_structured_store::FOUR_KB_PAGE};
use std::{
fs::File,
io,
os::fd::AsRawFd,
sync::{atomic::Ordering, Arc},
};
#[allow(unused)]
const SERIALIZED_ORDERING: u8 = 0;
#[allow(unused)]
const LOCALIZED_WRITES: u8 = 1;
pub type SharedAsyncFileWriter = Arc<parking_lot::Mutex<IoUring>>;
#[derive(Clone, Copy, Debug)]
pub enum WriteMode {
TailLocalizedWrites,
SerializedWrites,
}
pub struct Appender {
store: Arc<File>,
flusher: SharedAsyncFileWriter,
mode: WriteMode,
}
impl Appender {
pub fn new(io_uring: SharedAsyncFileWriter, file_handle: Arc<File>, mode: WriteMode) -> Self {
Self {
flusher: io_uring,
store: file_handle,
mode,
}
}
pub fn submit(
&self,
buffer: &FlushBuffer,
buffer_data: &[u8],
at: u64,
buffer_ptr: u64,
) -> io::Result<()> {
let flags = match self.mode {
WriteMode::TailLocalizedWrites => squeue::Flags::empty(),
WriteMode::SerializedWrites => squeue::Flags::IO_LINK,
};
let sqe = opcode::Write::new(
types::Fd(self.store.as_raw_fd()),
buffer_data.as_ptr(),
buffer_data.len() as u32,
)
.offset(at)
.build()
.flags(flags)
.user_data(buffer_ptr);
let mut ring = self.flusher.lock();
unsafe {
ring.submission()
.push(&sqe)
.map_err(|_| io::Error::other("SQ full"))?;
*buffer.submit_queue_entry.get() = Some(sqe);
}
ring.submit()?;
Ok(())
}
pub fn submit_blocking(
&self,
buffer: &FlushBuffer,
buffer_data: &[u8],
at: u64,
buffer_ptr: u64,
) -> io::Result<()> {
let flags = match self.mode {
WriteMode::TailLocalizedWrites => squeue::Flags::empty(),
WriteMode::SerializedWrites => squeue::Flags::IO_LINK,
};
let sqe = opcode::Write::new(
types::Fd(self.store.as_raw_fd()),
buffer_data.as_ptr(),
buffer_data.len() as u32,
)
.offset(at)
.build()
.flags(flags)
.user_data(buffer_ptr);
let mut ring = self.flusher.lock();
unsafe {
ring.submission()
.push(&sqe)
.map_err(|_| io::Error::other("SQ full"))?;
*buffer.submit_queue_entry.get() = Some(sqe);
}
ring.submit_and_wait(1)?;
if let Some(cqe) = ring.completion().next() {
if cqe.result() < 0 {
return Err(io::Error::from_raw_os_error(-cqe.result()));
}
return Ok(());
}
Err(io::Error::other("No completion received"))
}
pub fn cqueue(&self) -> parking_lot::lock_api::MutexGuard<'_, parking_lot::RawMutex, IoUring> {
let flusher_ring = self.flusher.lock();
flusher_ring
}
}
pub enum FlushBehavior {
WaitAppender(Appender),
NoWaitAppender(Appender),
}
impl FlushBehavior {
pub fn with_wait_appender(io_uring: SharedAsyncFileWriter, file: Arc<File>) -> Self {
FlushBehavior::WaitAppender(Appender::new(io_uring, file, WriteMode::SerializedWrites))
}
pub fn with_no_wait_appender(io_uring: SharedAsyncFileWriter, file: Arc<File>) -> Self {
FlushBehavior::NoWaitAppender(Appender::new(
io_uring,
file,
WriteMode::TailLocalizedWrites,
))
}
pub fn submit_buffer(&self, buffer: &FlushBuffer) {
match self {
FlushBehavior::WaitAppender(a) | FlushBehavior::NoWaitAppender(a) => {
let ptr = unsafe { *buffer.buf.buffer.get() };
let slice: &[u8] = unsafe { &*std::ptr::slice_from_raw_parts(ptr, FOUR_KB_PAGE) };
let ptr_to_buffer_position = buffer as *const FlushBuffer as u64;
let _ = a.submit(
buffer,
slice,
buffer.local_lss_address_slot.load(Ordering::Acquire) as u64
* FOUR_KB_PAGE as u64,
ptr_to_buffer_position,
);
}
}
}
pub fn submit_buffer_and_wait(&self, buffer: &FlushBuffer) -> io::Result<()> {
match self {
FlushBehavior::WaitAppender(a) | FlushBehavior::NoWaitAppender(a) => {
let ptr = unsafe { *buffer.buf.buffer.get() };
let slice: &[u8] = unsafe { &*std::ptr::slice_from_raw_parts(ptr, FOUR_KB_PAGE) };
let ptr_to_buffer_position = buffer as *const FlushBuffer as u64;
a.submit_blocking(
buffer,
slice,
buffer.local_lss_address_slot.load(Ordering::Acquire) as u64
* FOUR_KB_PAGE as u64,
ptr_to_buffer_position,
)
}
}
}
pub fn get_cqueue(
&self,
) -> parking_lot::lock_api::MutexGuard<'_, parking_lot::RawMutex, IoUring> {
match self {
FlushBehavior::WaitAppender(appender) | FlushBehavior::NoWaitAppender(appender) => {
appender.cqueue()
}
}
}
}