use super::BackingStore;
use crate::pages::Pages;
use async_trait::async_trait;
use bufpool::buf::Buf;
use dashmap::DashMap;
use io_uring::cqueue::Entry as CEntry;
use io_uring::opcode;
use io_uring::squeue::Entry as SEntry;
use io_uring::types;
use io_uring::IoUring;
use off64::u32;
use off64::usz;
use signal_future::SignalFuture;
use signal_future::SignalFutureController;
use std::collections::VecDeque;
use std::fmt;
use std::fs::File;
use std::io;
use std::os::fd::AsRawFd;
use std::sync::Arc;
use std::thread;
use strum::Display;
use tokio::time::Instant;
use tracing::trace;
fn assert_result_is_ok(req: &Request, res: i32) -> u32 {
if res < 0 {
panic!(
"{:?} failed with {:?}",
req,
io::Error::from_raw_os_error(-res)
);
};
u32!(res)
}
struct ReadRequest {
out_buf: Buf,
offset: u64,
len: u32,
}
struct WriteRequest {
offset: u64,
data: Buf,
}
#[derive(Display)]
enum Request {
Read {
req: ReadRequest,
res: SignalFutureController<Buf>,
},
Write {
req: WriteRequest,
res: SignalFutureController<Buf>,
},
Sync {
res: SignalFutureController<()>,
},
}
impl fmt::Debug for Request {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Read { req, .. } => write!(f, "read {} len {}", req.offset, req.len),
Self::Write { req, .. } => write!(f, "write {} len {}", req.offset, req.data.len()),
Self::Sync { .. } => write!(f, "sync"),
}
}
}
#[derive(Clone)]
pub(crate) struct UringBackingStore {
pages: Pages,
sender: crossbeam_channel::Sender<Request>,
}
#[derive(Clone, Default, Debug)]
pub(crate) struct UringCfg {
pub coop_taskrun: bool,
pub defer_taskrun: bool,
pub iopoll: bool,
pub sqpoll: Option<u32>,
}
impl UringBackingStore {
pub fn new(file: File, pages: Pages, cfg: UringCfg) -> Self {
let (sender, receiver) = crossbeam_channel::unbounded::<Request>();
let pending: Arc<DashMap<u64, (Request, Instant)>> = Default::default();
let ring = {
let mut builder = IoUring::<SEntry, CEntry>::builder();
builder.setup_clamp();
if cfg.coop_taskrun {
builder.setup_coop_taskrun();
};
if cfg.defer_taskrun {
builder.setup_defer_taskrun();
};
if cfg.iopoll {
builder.setup_iopoll();
}
if let Some(sqpoll) = cfg.sqpoll {
builder.setup_sqpoll(sqpoll);
};
builder.build(134217728).unwrap()
};
ring
.submitter()
.register_files(&[file.as_raw_fd()])
.unwrap();
let ring = Arc::new(ring);
thread::spawn({
let pending = pending.clone();
let ring = ring.clone();
let mut msgbuf = VecDeque::new();
move || {
let mut submission = unsafe { ring.submission_shared() };
let mut next_id = 0;
while let Ok(init_msg) = receiver.recv() {
msgbuf.push_back(init_msg);
while let Ok(msg) = receiver.try_recv() {
msgbuf.push_back(msg);
}
while let Some(msg) = msgbuf.pop_front() {
let id = next_id;
next_id += 1;
trace!(id, typ = msg.to_string(), "submitting request");
let submission_entry = match &msg {
Request::Read { req, .. } => {
let ptr = req.out_buf.as_ptr() as *mut u8;
opcode::Read::new(types::Fixed(0), ptr, req.len)
.offset(req.offset)
.build()
.user_data(id)
}
Request::Write { req, .. } => {
let ptr = req.data.as_ptr() as *mut u8;
let len = u32!(req.data.len());
opcode::Write::new(types::Fixed(0), ptr, len)
.offset(req.offset)
.build()
.user_data(id)
}
Request::Sync { .. } => opcode::Fsync::new(types::Fixed(0)).build().user_data(id),
};
pending.insert(id, (msg, Instant::now()));
if submission.is_full() {
submission.sync();
ring.submit_and_wait(1).unwrap();
}
unsafe {
submission.push(&submission_entry).unwrap();
};
}
submission.sync();
ring.submit().unwrap();
}
}
});
thread::spawn({
let pending = pending.clone();
let ring = ring.clone();
move || {
let mut completion = unsafe { ring.completion_shared() };
loop {
let Some(e) = completion.next() else {
ring.submit_and_wait(1).unwrap();
completion.sync();
continue;
};
let id = e.user_data();
let (req, started) = pending.remove(&id).unwrap().1;
trace!(
id,
typ = req.to_string(),
exec_us = started.elapsed().as_micros(),
"completing request"
);
let rv = assert_result_is_ok(&req, e.result());
match req {
Request::Read { req, res } => {
assert_eq!(usz!(rv), req.out_buf.len());
res.signal(req.out_buf);
}
Request::Write { req, res } => {
assert_eq!(rv, u32!(req.data.len()));
res.signal(req.data);
}
Request::Sync { res } => {
res.signal(());
}
}
}
}
});
Self { pages, sender }
}
}
#[async_trait]
impl BackingStore for UringBackingStore {
async fn read_at(&self, offset: u64, len: u64) -> Buf {
let out_buf = self.pages.allocate_uninitialised(len);
let (fut, fut_ctl) = SignalFuture::new();
self
.sender
.send(Request::Read {
req: ReadRequest {
out_buf,
offset,
len: u32!(len),
},
res: fut_ctl,
})
.unwrap();
fut.await
}
async fn write_at(&self, offset: u64, data: Buf) -> Buf {
let (fut, fut_ctl) = SignalFuture::new();
self
.sender
.send(Request::Write {
req: WriteRequest { offset, data },
res: fut_ctl,
})
.unwrap();
fut.await
}
async fn sync(&self) {
let (fut, fut_ctl) = SignalFuture::new();
self.sender.send(Request::Sync { res: fut_ctl }).unwrap();
fut.await
}
}