use std::future::Future;
use std::mem::take;
use std::os::fd::AsRawFd;
use std::pin::Pin;
use std::sync::{Arc, Barrier, Mutex};
use std::task::{self, Poll, Wake};
use std::time::{Duration, Instant};
use std::{io, ptr, thread};
use a10::fs::{Open, OpenOptions};
use a10::pipe::pipe;
use a10::{Ring, SubmissionQueue};
use crate::util::{
LOREM_IPSUM_50, Waker, block_on, init, is_send, is_sync, next, poll_nop, start_op, syscall,
};
#[test]
fn ring_size() {
#[cfg(any(target_os = "android", target_os = "linux"))]
const SIZE: usize = 48;
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd",
target_os = "tvos",
target_os = "visionos",
target_os = "watchos",
))]
const SIZE: usize = 32;
assert_eq!(std::mem::size_of::<Ring>(), SIZE);
assert_eq!(std::mem::size_of::<Option<Ring>>(), SIZE);
}
#[test]
fn ring_is_send_and_sync() {
is_send::<Ring>();
is_sync::<Ring>();
}
#[test]
fn sq_is_send_and_sync() {
is_send::<SubmissionQueue>();
is_sync::<SubmissionQueue>();
}
#[test]
fn sq_size() {
assert_eq!(std::mem::size_of::<SubmissionQueue>(), 8);
assert_eq!(std::mem::size_of::<Option<SubmissionQueue>>(), 8);
}
#[test]
fn dropping_ring_unmaps_queues() {
init();
let ring = Ring::new().unwrap();
drop(ring);
}
#[test]
fn polling_with_timeout() -> io::Result<()> {
const TIMEOUT: Duration = Duration::from_millis(100);
const MARGIN: Duration = Duration::from_millis(50);
init();
let mut ring = Ring::new().unwrap();
let start = Instant::now();
ring.poll(Some(TIMEOUT)).unwrap();
let elapsed = start.elapsed();
assert!(
elapsed <= (TIMEOUT + MARGIN),
"polling elapsed: {elapsed:?}, expected: {TIMEOUT:?}",
);
Ok(())
}
#[test]
fn submission_queue_full_is_handled_internally() {
const SIZE: usize = 31396;
const N: usize = (usize::BITS as usize) + 10;
const BUF_SIZE: usize = SIZE / N;
init();
let mut ring = Ring::new().unwrap();
let sq = ring.sq();
let path = LOREM_IPSUM_50.path;
let expected = LOREM_IPSUM_50.content;
let mut future: Open = OpenOptions::new().open(sq, path.into());
let file = loop {
match poll_nop(Pin::new(&mut future)) {
Poll::Ready(result) => break result.unwrap(),
Poll::Pending => ring.poll(None).unwrap(),
}
};
let indices = Arc::new(Mutex::new(Vec::new()));
struct Waker {
index: usize,
indices: Arc<Mutex<Vec<usize>>>,
}
impl Wake for Waker {
fn wake(self: Arc<Self>) {
self.indices.lock().unwrap().push(self.index);
}
}
let mut futures = (0..N)
.map(|i| {
let fut = file
.read(Vec::with_capacity(BUF_SIZE))
.from((i * BUF_SIZE) as u64);
let waker = Arc::new(Waker {
index: i,
indices: indices.clone(),
});
Some((fut, task::Waker::from(waker)))
})
.collect::<Vec<_>>();
for (i, fut) in futures.iter_mut().enumerate() {
if let Some((future, waker)) = fut {
let mut ctx = task::Context::from_waker(waker);
match Pin::new(future).poll(&mut ctx) {
Poll::Ready(result) => {
*fut = None;
let read_buf = result.unwrap();
assert_eq!(read_buf, &expected[i * BUF_SIZE..(i + 1) * BUF_SIZE]);
}
Poll::Pending => {}
}
}
}
loop {
for i in take(&mut *indices.lock().unwrap()).into_iter() {
if let Some((future, waker)) = &mut futures[i] {
let mut ctx = task::Context::from_waker(waker);
match Pin::new(future).poll(&mut ctx) {
Poll::Ready(result) => {
futures[i] = None;
let read_buf = result.unwrap();
assert_eq!(read_buf, &expected[i * BUF_SIZE..(i + 1) * BUF_SIZE]);
}
Poll::Pending => {}
}
}
}
if futures.iter().all(Option::is_some) {
break;
}
ring.poll(None).unwrap();
}
ring.poll(Some(Duration::ZERO)).unwrap();
}
#[test]
fn pollable() {
const DATA: &str = "Hello, World!";
init();
let mut main_ring = Ring::new().unwrap();
let mut other_ring = Ring::new().unwrap();
let pipe = pipe(other_ring.sq());
let [receiver, sender] = block_on(&mut other_ring, pipe).unwrap();
let barrier = Arc::new(Barrier::new(2));
let other_sq = other_ring.sq();
let b = barrier.clone();
let handle = thread::spawn(move || {
let waker = Waker::new();
let mut read = receiver.read(Vec::with_capacity(DATA.len() + 1));
start_op(&mut read);
b.wait();
let buf = waker.block_on_with(read, &other_sq).unwrap();
assert_eq!(buf, DATA.as_bytes());
});
barrier.wait();
other_ring.poll(Some(Duration::ZERO)).unwrap();
let mut ring_pollable = other_ring.pollable(main_ring.sq());
let n = syscall!(write(
sender.as_fd().unwrap().as_raw_fd(),
ptr::from_ref(DATA).cast(),
DATA.len()
))
.unwrap();
assert_eq!(n.cast_unsigned(), DATA.len());
let () = block_on(&mut main_ring, next(&mut ring_pollable))
.unwrap()
.unwrap();
other_ring.poll(None).unwrap();
handle.join().unwrap();
}
#[test]
#[should_panic = "can't wait on pollable with sq of the same ring"]
fn pollable_self() {
let ring = Ring::new().unwrap();
let _ = ring.pollable(ring.sq());
}
#[test]
#[cfg(any(target_os = "android", target_os = "linux"))]
fn wake_ring_with_kernel_thread() {
init();
let mut ring = Ring::config()
.with_kernel_thread()
.with_idle_timeout(Duration::from_millis(1))
.build()
.unwrap();
let sq = ring.sq();
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
sq.wake();
});
ring.poll(None).unwrap();
handle.join().unwrap();
}
#[test]
#[cfg(any(target_os = "android", target_os = "linux"))]
fn wake_ring_no_kernel_thread() {
init();
let mut ring = Ring::config()
.with_idle_timeout(Duration::from_millis(1))
.build()
.unwrap();
let sq = ring.sq();
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
sq.wake();
});
ring.poll(None).unwrap();
handle.join().unwrap();
}
#[test]
#[cfg(any(target_os = "android", target_os = "linux"))]
fn wake_ring_with_single_issuer() {
init();
let mut ring = Ring::config()
.single_issuer()
.defer_task_run()
.build()
.unwrap();
let sq = ring.sq();
let handle = thread::spawn(move || {
sq.wake();
});
ring.poll(None).unwrap();
handle.join().unwrap();
}
#[test]
fn wake_ring_after_ring_dropped() {
init();
let ring = Ring::new().unwrap();
let sq = ring.sq();
drop(ring);
sq.wake();
}