a10 0.4.2

This library is meant as a low-level library safely exposing different OS's abilities to perform non-blocking I/O.
Documentation
use std::future::Future;
use std::mem::take;
use std::os::fd::AsRawFd;
use std::pin::Pin;
use std::sync::{Arc, Barrier, Mutex};
use std::task::{self, Poll, Wake};
use std::time::{Duration, Instant};
use std::{io, ptr, thread};

use a10::fs::{Open, OpenOptions};
use a10::pipe::pipe;
use a10::{Ring, SubmissionQueue};

use crate::util::{
    LOREM_IPSUM_50, Waker, block_on, init, is_send, is_sync, next, poll_nop, start_op, syscall,
};

#[test]
fn ring_size() {
    #[cfg(any(target_os = "android", target_os = "linux"))]
    const SIZE: usize = 48;
    #[cfg(any(
        target_os = "dragonfly",
        target_os = "freebsd",
        target_os = "ios",
        target_os = "macos",
        target_os = "netbsd",
        target_os = "openbsd",
        target_os = "tvos",
        target_os = "visionos",
        target_os = "watchos",
    ))]
    const SIZE: usize = 32;
    assert_eq!(std::mem::size_of::<Ring>(), SIZE);
    assert_eq!(std::mem::size_of::<Option<Ring>>(), SIZE);
}

#[test]
fn ring_is_send_and_sync() {
    is_send::<Ring>();
    is_sync::<Ring>();
}

#[test]
fn sq_is_send_and_sync() {
    is_send::<SubmissionQueue>();
    is_sync::<SubmissionQueue>();
}

#[test]
fn sq_size() {
    assert_eq!(std::mem::size_of::<SubmissionQueue>(), 8);
    assert_eq!(std::mem::size_of::<Option<SubmissionQueue>>(), 8);
}

#[test]
fn dropping_ring_unmaps_queues() {
    init();
    let ring = Ring::new().unwrap();
    drop(ring);
}

#[test]
fn polling_with_timeout() -> io::Result<()> {
    const TIMEOUT: Duration = Duration::from_millis(100);
    const MARGIN: Duration = Duration::from_millis(50);

    init();
    let mut ring = Ring::new().unwrap();

    let start = Instant::now();
    ring.poll(Some(TIMEOUT)).unwrap();
    let elapsed = start.elapsed();
    assert!(
        elapsed <= (TIMEOUT + MARGIN),
        "polling elapsed: {elapsed:?}, expected: {TIMEOUT:?}",
    );
    Ok(())
}

#[test]
fn submission_queue_full_is_handled_internally() {
    const SIZE: usize = 31396;
    const N: usize = (usize::BITS as usize) + 10;
    const BUF_SIZE: usize = SIZE / N;

    init();
    let mut ring = Ring::new().unwrap();
    let sq = ring.sq();
    let path = LOREM_IPSUM_50.path;
    let expected = LOREM_IPSUM_50.content;

    let mut future: Open = OpenOptions::new().open(sq, path.into());
    let file = loop {
        match poll_nop(Pin::new(&mut future)) {
            Poll::Ready(result) => break result.unwrap(),
            Poll::Pending => ring.poll(None).unwrap(),
        }
    };

    let indices = Arc::new(Mutex::new(Vec::new()));

    struct Waker {
        index: usize,
        /// Indices of task that are awoken.
        indices: Arc<Mutex<Vec<usize>>>,
    }

    impl Wake for Waker {
        fn wake(self: Arc<Self>) {
            self.indices.lock().unwrap().push(self.index);
        }
    }

    let mut futures = (0..N)
        .map(|i| {
            let fut = file
                .read(Vec::with_capacity(BUF_SIZE))
                .from((i * BUF_SIZE) as u64);
            let waker = Arc::new(Waker {
                index: i,
                indices: indices.clone(),
            });
            Some((fut, task::Waker::from(waker)))
        })
        .collect::<Vec<_>>();

    // Run all futures once to register the operation or them waiting for a
    // submission slot.
    for (i, fut) in futures.iter_mut().enumerate() {
        if let Some((future, waker)) = fut {
            let mut ctx = task::Context::from_waker(waker);
            match Pin::new(future).poll(&mut ctx) {
                Poll::Ready(result) => {
                    *fut = None;
                    let read_buf = result.unwrap();
                    assert_eq!(read_buf, &expected[i * BUF_SIZE..(i + 1) * BUF_SIZE]);
                }
                Poll::Pending => {}
            }
        }
    }

    loop {
        // Poll all futures that got a wake up.
        for i in take(&mut *indices.lock().unwrap()).into_iter() {
            if let Some((future, waker)) = &mut futures[i] {
                let mut ctx = task::Context::from_waker(waker);
                match Pin::new(future).poll(&mut ctx) {
                    Poll::Ready(result) => {
                        futures[i] = None;
                        let read_buf = result.unwrap();
                        assert_eq!(read_buf, &expected[i * BUF_SIZE..(i + 1) * BUF_SIZE]);
                    }
                    Poll::Pending => {}
                }
            }
        }

        if futures.iter().all(Option::is_some) {
            break;
        }

        ring.poll(None).unwrap();
    }

    // NOTE: this is here to deallocate the resources in the Future that was
    // stalled.
    ring.poll(Some(Duration::ZERO)).unwrap();
}

#[test]
fn pollable() {
    const DATA: &str = "Hello, World!";

    init();
    let mut main_ring = Ring::new().unwrap();
    let mut other_ring = Ring::new().unwrap();

    let pipe = pipe(other_ring.sq());
    let [receiver, sender] = block_on(&mut other_ring, pipe).unwrap();

    let barrier = Arc::new(Barrier::new(2));
    let other_sq = other_ring.sq();
    let b = barrier.clone();
    let handle = thread::spawn(move || {
        let waker = Waker::new();
        let mut read = receiver.read(Vec::with_capacity(DATA.len() + 1));
        start_op(&mut read);
        b.wait();

        let buf = waker.block_on_with(read, &other_sq).unwrap();
        assert_eq!(buf, DATA.as_bytes());
    });

    // Ensure that the read operation is submitted to the kernel for io_uring.
    barrier.wait();
    other_ring.poll(Some(Duration::ZERO)).unwrap();

    let mut ring_pollable = other_ring.pollable(main_ring.sq());

    // Trigger an event to complete the read on the other thread.
    let n = syscall!(write(
        sender.as_fd().unwrap().as_raw_fd(),
        ptr::from_ref(DATA).cast(),
        DATA.len()
    ))
    .unwrap();
    assert_eq!(n.cast_unsigned(), DATA.len());

    // Using the main ring wait until the other ring is pollable.
    let () = block_on(&mut main_ring, next(&mut ring_pollable))
        .unwrap()
        .unwrap();
    // Now poll the other ring.
    other_ring.poll(None).unwrap();

    handle.join().unwrap();
}

#[test]
#[should_panic = "can't wait on pollable with sq of the same ring"]
fn pollable_self() {
    let ring = Ring::new().unwrap();
    let _ = ring.pollable(ring.sq());
}

#[test]
#[cfg(any(target_os = "android", target_os = "linux"))]
fn wake_ring_with_kernel_thread() {
    init();
    let mut ring = Ring::config()
        .with_kernel_thread()
        .with_idle_timeout(Duration::from_millis(1))
        .build()
        .unwrap();
    let sq = ring.sq();

    let handle = thread::spawn(move || {
        thread::sleep(Duration::from_millis(10));
        sq.wake();
    });

    // Should be awoken by the wake call above.
    ring.poll(None).unwrap();
    handle.join().unwrap();
}

#[test]
#[cfg(any(target_os = "android", target_os = "linux"))]
fn wake_ring_no_kernel_thread() {
    init();
    // Defaults to no kernel thread.
    let mut ring = Ring::config()
        .with_idle_timeout(Duration::from_millis(1))
        .build()
        .unwrap();
    let sq = ring.sq();

    let handle = thread::spawn(move || {
        thread::sleep(Duration::from_millis(10));
        sq.wake();
    });

    // Should be awoken by the wake call above.
    ring.poll(None).unwrap();
    handle.join().unwrap();
}

#[test]
#[cfg(any(target_os = "android", target_os = "linux"))]
fn wake_ring_with_single_issuer() {
    init();

    let mut ring = Ring::config()
        .single_issuer()
        .defer_task_run()
        .build()
        .unwrap();
    let sq = ring.sq();

    let handle = thread::spawn(move || {
        sq.wake();
    });

    // Should be awoken by the wake call above.
    ring.poll(None).unwrap();
    handle.join().unwrap();
}

#[test]
fn wake_ring_after_ring_dropped() {
    init();
    let ring = Ring::new().unwrap();
    let sq = ring.sq();

    drop(ring);
    sq.wake();
}