glowdust 0.0.1

A DBMS with a data model based on functions and pattern matching
Documentation
use io_uring::squeue::Entry;
use io_uring::{opcode, types, IoUring};
use std::fs::{File, OpenOptions};
use std::io::{Error, ErrorKind, Read, Result};
use std::os::fd::AsRawFd;
use std::path::PathBuf;

pub struct UringReader {
    blocksize: u32,
    source: File,
    filesize: u64,
}

impl UringReader {
    pub fn new(source: &PathBuf, blocksize: u32) -> Result<Self> {
        assert!(
            blocksize.is_power_of_two(),
            "blocksize needs to be a power of two"
        );
        let mut source = OpenOptions::new().read(true).open(source)?;
        let filesize = source.metadata()?.len();
        Ok(UringReader {
            blocksize,
            source,
            filesize,
        })
    }

    pub fn get_blocksize(&self) -> u32 {
        self.blocksize
    }

    pub fn read(&mut self, buffer: &mut [u8], offset: u64) -> Result<usize> {
        assert_eq!(
            buffer.len() as u32,
            self.blocksize,
                "Buffer provided had length {}, block size is {}",
                buffer.len(),
                self.blocksize
        );

        let mut ring: IoUring<Entry, io_uring::cqueue::Entry> =
            IoUring::builder().setup_sqpoll(500).build(2).unwrap();

        let (submitter, mut submission_queue, mut completion_queue) = ring.split();

        let r_entry = opcode::Read::new(
            types::Fd(self.source.as_raw_fd()),
            buffer.as_mut_ptr(),
            self.blocksize,
        )
        .offset(offset)
        .build()
        .user_data(offset);

        unsafe {
            match submission_queue.push(&r_entry) {
                Ok(()) => {}
                Err(e) => return Err(Error::new(ErrorKind::Other, e)),
            }
        }

        submission_queue.sync();
        completion_queue.sync();
        submitter.submit_and_wait(1).unwrap();
        submission_queue.sync();
        completion_queue.sync();

        match completion_queue.next() {
            // it doesn't matter if a CQE is read or write. Since we are reading/writing at offsets
            // that are integral multiples of the blocksize (except for the last one), then if a
            // return value is NOT blocksize, then that offset needs to be redone: read + write
            None => {
                panic!("didn't read CQE, that's not ok")
            }
            Some(cqe) => {
                let assumed_offset = cqe.user_data();
                let result = cqe.result();
                if result != self.blocksize as i32 {
                    // op came up short. It's either fatal (for now, later retry) or the end of the file
                    // println!("{} read less : result {}@offset {}", i, result, assumed_offset);
                    if result != (self.filesize - offset) as i32 {
                        panic!(
                            "For filesize {} and offset {} i got a result of {}",
                            self.filesize, offset, result
                        );
                    }
                }
                return Ok(result as usize);
            }
        }
    }
}