tokio-uring-xitca 0.2.0

a fork of tokio-uring with miniaml maintenance
Documentation
use tokio_test::assert_err;
use tokio_uring_xitca::buf::fixed::{FixedBufPool, FixedBufRegistry};
use tokio_uring_xitca::buf::{BoundedBuf, BoundedBufMut};
use tokio_uring_xitca::fs::File;

use std::fs::File as StdFile;
use std::io::prelude::*;
use std::iter;
use std::mem;
use tempfile::NamedTempFile;

const HELLO: &[u8] = b"hello world...";

#[test]
fn fixed_buf_turnaround() {
    tokio_uring_xitca::start(async {
        let mut tempfile = tempfile();
        tempfile.write_all(HELLO).unwrap();

        let file = File::open(tempfile.path()).await.unwrap();

        let buffers = FixedBufRegistry::new([30, 20, 10].iter().map(|&n| Vec::with_capacity(n)));
        buffers.register().unwrap();

        let fixed_buf = buffers.check_out(0).unwrap();
        assert_eq!(fixed_buf.bytes_total(), 30);

        // Can't check out the same buffer twice.
        assert!(buffers.check_out(0).is_none());

        // Checking out another buffer from the same registry is possible,
        // but does not affect the status of the first buffer.
        let fixed_buf1 = buffers.check_out(1).unwrap();
        assert_eq!(fixed_buf1.bytes_total(), 20);
        assert!(buffers.check_out(0).is_none());
        mem::drop(fixed_buf1);
        assert!(buffers.check_out(0).is_none());

        let op = file.read_fixed_at(fixed_buf, 0);

        // The buffer is used by the pending operation, can't check it out
        // for another instance.
        assert!(buffers.check_out(0).is_none());

        let (res, buf) = op.await;
        let n = res.unwrap();
        assert_eq!(n, HELLO.len());

        // The buffer is owned by `buf`, can't check it out
        // for another instance.
        assert!(buffers.check_out(0).is_none());

        mem::drop(buf);

        // The buffer has been released, check it out again.
        let fixed_buf = buffers.check_out(0).unwrap();
        assert_eq!(fixed_buf.bytes_total(), 30);
        assert_eq!(fixed_buf.bytes_init(), 0);
    });
}

#[test]
fn unregister_invalidates_checked_out_buffers() {
    tokio_uring_xitca::start(async {
        let mut tempfile = tempfile();
        tempfile.write_all(HELLO).unwrap();

        let file = File::open(tempfile.path()).await.unwrap();

        let buffers = FixedBufRegistry::new([Vec::with_capacity(1024)]);
        buffers.register().unwrap();

        let fixed_buf = buffers.check_out(0).unwrap();

        // The checked out handle keeps the buffer allocation alive.
        // Meanwhile, we replace buffer registration in the kernel:
        buffers.unregister().unwrap();
        let buffers = FixedBufRegistry::new([Vec::with_capacity(1024)]);
        buffers.register().unwrap();

        // The old buffer's index no longer matches the memory area of the
        // currently registered buffer, so the read operation using the old
        // buffer's memory should fail.
        let (res, _) = file.read_fixed_at(fixed_buf, 0).await;
        assert_err!(res);

        let fixed_buf = buffers.check_out(0).unwrap();
        let (res, buf) = file.read_fixed_at(fixed_buf, 0).await;
        let n = res.unwrap();
        assert_eq!(n, HELLO.len());
        assert_eq!(&buf[..], HELLO);
    });
}

#[test]
fn slicing() {
    tokio_uring_xitca::start(async {
        let mut tempfile = tempfile();
        tempfile.write_all(HELLO).unwrap();

        let file = File::from_std(StdFile::options().read(true).write(true).open(tempfile.path()).unwrap());

        let buffers = FixedBufRegistry::new([Vec::with_capacity(1024)]);
        buffers.register().unwrap();

        let fixed_buf = buffers.check_out(0).unwrap();

        // Read no more than 8 bytes into the fixed buffer.
        let (res, slice) = file.read_fixed_at(fixed_buf.slice(..8), 3).await;
        let n = res.unwrap();
        assert_eq!(n, 8);
        assert_eq!(slice[..], HELLO[3..11]);
        let fixed_buf = slice.into_inner();

        // Write from the fixed buffer, starting at offset 1,
        // up to the end of the initialized bytes in the buffer.
        let (res, slice) = file.write_fixed_at(fixed_buf.slice(1..), HELLO.len() as u64).await;
        let n = res.unwrap();
        assert_eq!(n, 7);
        assert_eq!(slice[..], HELLO[4..11]);
        let fixed_buf = slice.into_inner();

        // Read into the fixed buffer, overwriting bytes starting from offset 3
        // and then extending the initialized part with as many bytes as
        // the operation can read.
        let (res, slice) = file.read_fixed_at(fixed_buf.slice(3..), 0).await;
        let n = res.unwrap();
        assert_eq!(n, HELLO.len() + 7);
        assert_eq!(slice[..HELLO.len()], HELLO[..]);
        assert_eq!(slice[HELLO.len()..], HELLO[4..11]);
    })
}

#[test]
fn pool_next_as_concurrency_limit() {
    tokio_uring_xitca::start(async move {
        const BUF_SIZE: usize = 80;

        let mut tempfile = tempfile();
        let file = StdFile::options().write(true).open(tempfile.path()).unwrap();

        let buffers = FixedBufPool::new(iter::repeat_with(|| Vec::with_capacity(BUF_SIZE)).take(2));
        buffers.register().unwrap();

        let mut join_handles = vec![];
        for i in 0..10 {
            let mut buf = buffers.next(BUF_SIZE).await;
            println!("[main] iteration {}: obtained buffer {}", i, buf.buf_index());
            let cloned_file = file.try_clone().unwrap();

            let handle = tokio_uring_xitca::spawn(async move {
                let file = File::from_std(cloned_file);
                let data = [b'0' + i as u8; BUF_SIZE];
                buf.put_slice(&data);
                let (res, buf) = file.write_fixed_all_at(buf, BUF_SIZE as u64 * i).await;
                res.unwrap();
                println!("[worker {}]: dropping buffer {}", i, buf.buf_index());
            });

            join_handles.push(handle);
        }
        for (i, handle) in join_handles.into_iter().enumerate() {
            handle
                .await
                .unwrap_or_else(|e| panic!("worker {} terminated abnormally: {}", i, e));
        }

        mem::drop(file);
        let mut content = String::new();
        tempfile.read_to_string(&mut content).unwrap();
        println!("{}", content);
    })
}

fn tempfile() -> NamedTempFile {
    NamedTempFile::new().unwrap()
}