usrbio 0.1.20

Rust bindings for the [3FS USRBIO API](https://github.com/deepseek-ai/3FS/blob/main/src/lib/api/UsrbIo.md).
Documentation
use super::*;
use std::{
    collections::HashMap,
    path::PathBuf,
    sync::{mpsc, Arc},
    thread::JoinHandle,
};

#[derive(Debug)]
pub struct SendableReadJob {
    pub file: Arc<File>,
    pub offset: u64,
    pub length: usize,
}

impl ReadJob for SendableReadJob {
    fn file(&self) -> &File {
        &self.file
    }

    fn offset(&self) -> u64 {
        self.offset
    }

    fn length(&self) -> usize {
        self.length
    }
}

pub type Callback = Box<dyn FnOnce(&[SendableReadJob], &[ReadResult]) + Sync + Send>;

pub struct BatchReadJobs {
    pub jobs: Vec<SendableReadJob>,
    pub callback: Callback,
}

impl BatchReadJobs {
    pub fn set_results(self, results: Result<Vec<ReadResult>>) {
        match results {
            Ok(r) => (self.callback)(&self.jobs, &r),
            Err(e) => self.set_error(e.errno()),
        }
    }

    pub fn set_error(self, errno: i32) {
        let mut results = Vec::with_capacity(self.jobs.len());
        for _ in &self.jobs {
            results.push(ReadResult {
                ret: i64::from(-errno),
                buf: &[],
            });
        }
        (self.callback)(&self.jobs, &results);
    }
}

pub struct ReadWorker {
    config: RingConfig,
    sender: mpsc::Sender<BatchReadJobs>,
    handle: Option<JoinHandle<()>>,
}

impl ReadWorker {
    pub fn start(config: &RingConfig) -> Result<Self> {
        let (sender, receiver) = mpsc::channel::<BatchReadJobs>();
        let config_clone = config.clone();
        let handle = std::thread::Builder::new()
            .name("ReadWorker".to_string())
            .spawn(move || Self::process(config_clone, receiver))
            .unwrap();

        Ok(Self {
            config: config.clone(),
            handle: Some(handle),
            sender,
        })
    }

    pub fn enqueue(&self, job: BatchReadJobs) {
        self.sender.send(job).unwrap();
    }

    pub fn config(&self) -> &RingConfig {
        &self.config
    }

    pub fn stop_and_join(&mut self) {
        // drop sender to stop.
        let _ = std::mem::replace(&mut self.sender, mpsc::channel::<BatchReadJobs>().0);
        if let Some(handle) = self.handle.take() {
            handle.join().unwrap();
        }
    }

    fn process(config: RingConfig, receiver: mpsc::Receiver<BatchReadJobs>) {
        let mut rings: HashMap<PathBuf, Ring> = HashMap::new();
        while let Ok(job) = receiver.recv() {
            let len = job.jobs.len();
            if len == 0 {
                job.set_error(0);
                continue;
            }

            let mount_point = job.jobs[0].file.mount_point();
            if !job
                .jobs
                .iter()
                .all(|job| job.file.mount_point() == mount_point)
            {
                job.set_error(22);
                continue;
            }

            if let Some(ring) = rings.get_mut(mount_point) {
                let results = ring.batch_read(&job.jobs);
                job.set_results(results);
            } else {
                let mut ring = match Ring::create(&config, mount_point) {
                    Ok(r) => r,
                    Err(e) => {
                        job.set_error(e.errno());
                        continue;
                    }
                };
                let mount_point = mount_point.to_owned();
                let results = ring.batch_read(&job.jobs);
                job.set_results(results);
                rings.insert(mount_point, ring);
            };
        }
    }
}

impl Drop for ReadWorker {
    fn drop(&mut self) {
        self.stop_and_join();
    }
}