use std::thread;
use crossbeam_channel::{Receiver, Sender};
pub(crate) struct DiskIoRequest {
pub(crate) work: Box<dyn FnOnce() -> DiskIoResult + Send>,
pub(crate) seq: u32,
pub(crate) response_tx: Sender<DiskIoResponse>,
pub(crate) wake_handle: crate::wakeup::WakeHandle,
}
pub(crate) struct DiskIoResult {
pub(crate) result: i32,
pub(crate) metadata: Option<crate::fs::Metadata>,
}
pub(crate) struct DiskIoResponse {
pub(crate) seq: u32,
pub(crate) result: i32,
pub(crate) metadata: Option<crate::fs::Metadata>,
}
pub(crate) struct DiskIoPool {
pub(crate) request_tx: Sender<DiskIoRequest>,
_threads: Vec<thread::JoinHandle<()>>,
}
impl DiskIoPool {
pub(crate) fn start(num_threads: usize) -> Self {
let (request_tx, request_rx) = crossbeam_channel::unbounded::<DiskIoRequest>();
let mut threads = Vec::with_capacity(num_threads);
for i in 0..num_threads {
let rx = request_rx.clone();
let handle = thread::Builder::new()
.name(format!("ringline-disk-io-{i}"))
.spawn(move || {
disk_io_thread(rx);
})
.expect("failed to spawn disk I/O thread");
threads.push(handle);
}
DiskIoPool {
request_tx,
_threads: threads,
}
}
}
fn disk_io_thread(rx: Receiver<DiskIoRequest>) {
while let Ok(req) = rx.recv() {
let result = (req.work)();
let _ = req.response_tx.send(DiskIoResponse {
seq: req.seq,
result: result.result,
metadata: result.metadata,
});
req.wake_handle.wake();
}
}