use super::*;
use std::{
collections::HashMap,
path::PathBuf,
sync::{mpsc, Arc},
thread::JoinHandle,
};
#[derive(Debug)]
pub struct SendableReadJob {
pub file: Arc<File>,
pub offset: u64,
pub length: usize,
}
impl ReadJob for SendableReadJob {
fn file(&self) -> &File {
&self.file
}
fn offset(&self) -> u64 {
self.offset
}
fn length(&self) -> usize {
self.length
}
}
pub type Callback = Box<dyn FnOnce(&[SendableReadJob], &[ReadResult]) + Sync + Send>;
pub struct BatchReadJobs {
pub jobs: Vec<SendableReadJob>,
pub callback: Callback,
}
impl BatchReadJobs {
pub fn set_results(self, results: Result<Vec<ReadResult>>) {
match results {
Ok(r) => (self.callback)(&self.jobs, &r),
Err(e) => self.set_error(e.errno()),
}
}
pub fn set_error(self, errno: i32) {
let mut results = Vec::with_capacity(self.jobs.len());
for _ in &self.jobs {
results.push(ReadResult {
ret: i64::from(-errno),
buf: &[],
});
}
(self.callback)(&self.jobs, &results);
}
}
pub struct ReadWorker {
config: RingConfig,
sender: mpsc::Sender<BatchReadJobs>,
handle: Option<JoinHandle<()>>,
}
impl ReadWorker {
pub fn start(config: &RingConfig) -> Result<Self> {
let (sender, receiver) = mpsc::channel::<BatchReadJobs>();
let config_clone = config.clone();
let handle = std::thread::Builder::new()
.name("ReadWorker".to_string())
.spawn(move || Self::process(config_clone, receiver))
.unwrap();
Ok(Self {
config: config.clone(),
handle: Some(handle),
sender,
})
}
pub fn enqueue(&self, job: BatchReadJobs) {
self.sender.send(job).unwrap();
}
pub fn config(&self) -> &RingConfig {
&self.config
}
pub fn stop_and_join(&mut self) {
let _ = std::mem::replace(&mut self.sender, mpsc::channel::<BatchReadJobs>().0);
if let Some(handle) = self.handle.take() {
handle.join().unwrap();
}
}
fn process(config: RingConfig, receiver: mpsc::Receiver<BatchReadJobs>) {
let mut rings: HashMap<PathBuf, Ring> = HashMap::new();
while let Ok(job) = receiver.recv() {
let len = job.jobs.len();
if len == 0 {
job.set_error(0);
continue;
}
let mount_point = job.jobs[0].file.mount_point();
if !job
.jobs
.iter()
.all(|job| job.file.mount_point() == mount_point)
{
job.set_error(22);
continue;
}
if let Some(ring) = rings.get_mut(mount_point) {
let results = ring.batch_read(&job.jobs);
job.set_results(results);
} else {
let mut ring = match Ring::create(&config, mount_point) {
Ok(r) => r,
Err(e) => {
job.set_error(e.errno());
continue;
}
};
let mount_point = mount_point.to_owned();
let results = ring.batch_read(&job.jobs);
job.set_results(results);
rings.insert(mount_point, ring);
};
}
}
}
impl Drop for ReadWorker {
fn drop(&mut self) {
self.stop_and_join();
}
}