use bytes::{Bytes, BytesMut};
use libc::{c_int, c_void};
use nix::{Error, Result};
use nix::errno::*;
use nix::sys::aio::*;
use nix::sys::signal::{SaFlags, SigAction, sigaction, SigevNotify, SigHandler, Signal, SigSet};
use nix::sys::time::{TimeSpec, TimeValLike};
use std::io::{Write, Read, Seek, SeekFrom};
use std::ops::Deref;
use std::os::unix::io::AsRawFd;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{thread, time};
use tempfile::tempfile;
fn poll_aio(aiocb: &mut AioCb) -> Result<()> {
loop {
let err = aiocb.error();
if err != Err(Error::from(Errno::EINPROGRESS)) { return err; };
thread::sleep(time::Duration::from_millis(10));
}
}
#[test]
fn test_accessors() {
let mut rbuf = vec![0; 4];
let aiocb = AioCb::from_mut_slice( 1001,
2, &mut rbuf,
42, SigevNotify::SigevSignal {
signal: Signal::SIGUSR2,
si_value: 99
},
LioOpcode::LIO_NOP);
assert_eq!(1001, aiocb.fd());
assert_eq!(Some(LioOpcode::LIO_NOP), aiocb.lio_opcode());
assert_eq!(4, aiocb.nbytes());
assert_eq!(2, aiocb.offset());
assert_eq!(42, aiocb.priority());
let sev = aiocb.sigevent().sigevent();
assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
assert_eq!(99, sev.sigev_value.sival_ptr as i64);
}
#[test]
#[cfg_attr(target_env = "musl", ignore)]
fn test_cancel() {
let wbuf: &[u8] = b"CDEF";
let f = tempfile().unwrap();
let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
0, wbuf,
0, SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
aiocb.write().unwrap();
let err = aiocb.error();
assert!(err == Ok(()) || err == Err(Error::from(Errno::EINPROGRESS)));
let cancelstat = aiocb.cancel();
assert!(cancelstat.is_ok());
let _ = poll_aio(&mut aiocb);
let _ = aiocb.aio_return();
}
#[test]
#[cfg_attr(target_env = "musl", ignore)]
fn test_aio_cancel_all() {
let wbuf: &[u8] = b"CDEF";
let f = tempfile().unwrap();
let mut aiocb = AioCb::from_slice(f.as_raw_fd(),
0, wbuf,
0, SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
aiocb.write().unwrap();
let err = aiocb.error();
assert!(err == Ok(()) || err == Err(Error::from(Errno::EINPROGRESS)));
let cancelstat = aio_cancel_all(f.as_raw_fd());
assert!(cancelstat.is_ok());
let _ = poll_aio(&mut aiocb);
let _ = aiocb.aio_return();
}
#[test]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
fn test_fsync() {
const INITIAL: &[u8] = b"abcdef123456";
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
let mut aiocb = AioCb::from_fd( f.as_raw_fd(),
0, SigevNotify::SigevNone);
let err = aiocb.fsync(AioFsyncMode::O_SYNC);
assert!(err.is_ok());
poll_aio(&mut aiocb).unwrap();
aiocb.aio_return().unwrap();
}
#[test]
#[cfg(any(target_os = "freebsd", target_os = "macos"))]
fn test_fsync_error() {
use std::mem;
const INITIAL: &[u8] = b"abcdef123456";
let mode = unsafe { mem::transmute(666) };
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
let mut aiocb = AioCb::from_fd( f.as_raw_fd(),
0, SigevNotify::SigevNone);
let err = aiocb.fsync(mode);
assert!(err.is_err());
}
#[test]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
#[cfg_attr(target_os = "linux", ignore)]
fn test_aio_suspend() {
const INITIAL: &[u8] = b"abcdef123456";
const WBUF: &[u8] = b"CDEFG";
let timeout = TimeSpec::seconds(10);
let mut rbuf = vec![0; 4];
let rlen = rbuf.len();
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
let mut wcb = AioCb::from_slice( f.as_raw_fd(),
2, WBUF,
0, SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);
let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(),
8, &mut rbuf,
0, SigevNotify::SigevNone,
LioOpcode::LIO_READ);
wcb.write().unwrap();
rcb.read().unwrap();
loop {
{
let cbbuf = [&wcb, &rcb];
assert!(aio_suspend(&cbbuf[..], Some(timeout)).is_ok());
}
if rcb.error() != Err(Error::from(Errno::EINPROGRESS)) &&
wcb.error() != Err(Error::from(Errno::EINPROGRESS)) {
break
}
}
assert_eq!(wcb.aio_return().unwrap() as usize, WBUF.len());
assert_eq!(rcb.aio_return().unwrap() as usize, rlen);
}
#[test]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
fn test_read() {
const INITIAL: &[u8] = b"abcdef123456";
let mut rbuf = vec![0; 4];
const EXPECT: &[u8] = b"cdef";
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
{
let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(),
2, &mut rbuf,
0, SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
aiocb.read().unwrap();
let err = poll_aio(&mut aiocb);
assert_eq!(err, Ok(()));
assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len());
}
assert_eq!(EXPECT, rbuf.deref().deref());
}
#[test]
#[cfg(any(target_os = "freebsd", target_os = "macos"))]
fn test_read_error() {
const INITIAL: &[u8] = b"abcdef123456";
let mut rbuf = vec![0; 4];
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(),
-1, &mut rbuf,
0, SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
assert!(aiocb.read().is_err());
}
#[test]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
fn test_read_into_mut_slice() {
const INITIAL: &[u8] = b"abcdef123456";
let mut rbuf = vec![0; 4];
const EXPECT: &[u8] = b"cdef";
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
{
let mut aiocb = AioCb::from_mut_slice( f.as_raw_fd(),
2, &mut rbuf,
0, SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
aiocb.read().unwrap();
let err = poll_aio(&mut aiocb);
assert_eq!(err, Ok(()));
assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len());
}
assert_eq!(rbuf, EXPECT);
}
#[test]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
fn test_read_into_pointer() {
const INITIAL: &[u8] = b"abcdef123456";
let mut rbuf = vec![0; 4];
const EXPECT: &[u8] = b"cdef";
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
{
let mut aiocb = unsafe {
AioCb::from_mut_ptr( f.as_raw_fd(),
2, rbuf.as_mut_ptr() as *mut c_void,
rbuf.len(),
0, SigevNotify::SigevNone,
LioOpcode::LIO_NOP)
};
aiocb.read().unwrap();
let err = poll_aio(&mut aiocb);
assert_eq!(err, Ok(()));
assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len());
}
assert_eq!(rbuf, EXPECT);
}
#[test]
#[should_panic(expected = "Can't read into an immutable buffer")]
#[cfg_attr(target_env = "musl", ignore)]
fn test_read_immutable_buffer() {
let rbuf: &[u8] = b"CDEF";
let f = tempfile().unwrap();
let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
2, rbuf,
0, SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
aiocb.read().unwrap();
}
#[test]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
fn test_write() {
const INITIAL: &[u8] = b"abcdef123456";
let wbuf = "CDEF".to_string().into_bytes();
let mut rbuf = Vec::new();
const EXPECT: &[u8] = b"abCDEF123456";
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
2, &wbuf,
0, SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
aiocb.write().unwrap();
let err = poll_aio(&mut aiocb);
assert_eq!(err, Ok(()));
assert_eq!(aiocb.aio_return().unwrap() as usize, wbuf.len());
f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf).unwrap();
assert_eq!(len, EXPECT.len());
assert_eq!(rbuf, EXPECT);
}
#[test]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
fn test_write_bytes() {
const INITIAL: &[u8] = b"abcdef123456";
let wbuf = Box::new(Bytes::from(&b"CDEF"[..]));
let mut rbuf = Vec::new();
const EXPECT: &[u8] = b"abCDEF123456";
let expected_len = wbuf.len();
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(),
2, wbuf,
0, SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
aiocb.write().unwrap();
let err = poll_aio(&mut aiocb);
assert_eq!(err, Ok(()));
assert_eq!(aiocb.aio_return().unwrap() as usize, expected_len);
f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf).unwrap();
assert_eq!(len, EXPECT.len());
assert_eq!(rbuf, EXPECT);
}
#[test]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
fn test_read_bytes_mut_small() {
const INITIAL: &[u8] = b"abcdef";
let rbuf = Box::new(BytesMut::from(vec![0; 4]));
const EXPECT: &[u8] = b"cdef";
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
let mut aiocb = AioCb::from_boxed_mut_slice( f.as_raw_fd(),
2, rbuf,
0, SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
aiocb.read().unwrap();
let err = poll_aio(&mut aiocb);
assert_eq!(err, Ok(()));
assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len());
let buffer = aiocb.boxed_mut_slice().unwrap();
assert_eq!(buffer.borrow(), EXPECT);
}
#[test]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
fn test_write_from_pointer() {
const INITIAL: &[u8] = b"abcdef123456";
let wbuf = "CDEF".to_string().into_bytes();
let mut rbuf = Vec::new();
const EXPECT: &[u8] = b"abCDEF123456";
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
let mut aiocb = unsafe {
AioCb::from_ptr( f.as_raw_fd(),
2, wbuf.as_ptr() as *const c_void,
wbuf.len(),
0, SigevNotify::SigevNone,
LioOpcode::LIO_NOP)
};
aiocb.write().unwrap();
let err = poll_aio(&mut aiocb);
assert_eq!(err, Ok(()));
assert_eq!(aiocb.aio_return().unwrap() as usize, wbuf.len());
f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf).unwrap();
assert_eq!(len, EXPECT.len());
assert_eq!(rbuf, EXPECT);
}
#[test]
#[cfg(any(target_os = "freebsd", target_os = "macos"))]
fn test_write_error() {
let wbuf = "CDEF".to_string().into_bytes();
let mut aiocb = AioCb::from_slice( 666, 0, &wbuf,
0, SigevNotify::SigevNone,
LioOpcode::LIO_NOP);
assert!(aiocb.write().is_err());
}
lazy_static! {
pub static ref SIGNALED: AtomicBool = AtomicBool::new(false);
}
extern fn sigfunc(_: c_int) {
SIGNALED.store(true, Ordering::Relaxed);
}
#[test]
#[cfg_attr(any(all(target_env = "musl", target_arch = "x86_64"), target_arch = "mips", target_arch = "mips64"), ignore)]
fn test_write_sigev_signal() {
let _m = ::SIGNAL_MTX.lock().expect("Mutex got poisoned by another test");
let sa = SigAction::new(SigHandler::Handler(sigfunc),
SaFlags::SA_RESETHAND,
SigSet::empty());
SIGNALED.store(false, Ordering::Relaxed);
unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap();
const INITIAL: &[u8] = b"abcdef123456";
const WBUF: &[u8] = b"CDEF";
let mut rbuf = Vec::new();
const EXPECT: &[u8] = b"abCDEF123456";
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
let mut aiocb = AioCb::from_slice( f.as_raw_fd(),
2, WBUF,
0, SigevNotify::SigevSignal {
signal: Signal::SIGUSR2,
si_value: 0 },
LioOpcode::LIO_NOP);
aiocb.write().unwrap();
while !SIGNALED.load(Ordering::Relaxed) {
thread::sleep(time::Duration::from_millis(10));
}
assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len());
f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf).unwrap();
assert_eq!(len, EXPECT.len());
assert_eq!(rbuf, EXPECT);
}
#[test]
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
fn test_liocb_listio_wait() {
const INITIAL: &[u8] = b"abcdef123456";
const WBUF: &[u8] = b"CDEF";
let mut rbuf = vec![0; 4];
let rlen = rbuf.len();
let mut rbuf2 = Vec::new();
const EXPECT: &[u8] = b"abCDEF123456";
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
{
let wcb = AioCb::from_slice( f.as_raw_fd(),
2, WBUF,
0, SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);
let rcb = AioCb::from_mut_slice( f.as_raw_fd(),
8, &mut rbuf,
0, SigevNotify::SigevNone,
LioOpcode::LIO_READ);
let mut liocb = LioCb::with_capacity(2);
liocb.aiocbs.push(wcb);
liocb.aiocbs.push(rcb);
let err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone);
err.expect("lio_listio");
assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len());
assert_eq!(liocb.aio_return(1).unwrap() as usize, rlen);
}
assert_eq!(rbuf.deref().deref(), b"3456");
f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf2).unwrap();
assert_eq!(len, EXPECT.len());
assert_eq!(rbuf2, EXPECT);
}
#[test]
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
fn test_liocb_listio_nowait() {
const INITIAL: &[u8] = b"abcdef123456";
const WBUF: &[u8] = b"CDEF";
let mut rbuf = vec![0; 4];
let rlen = rbuf.len();
let mut rbuf2 = Vec::new();
const EXPECT: &[u8] = b"abCDEF123456";
let mut f = tempfile().unwrap();
f.write_all(INITIAL).unwrap();
{
let wcb = AioCb::from_slice( f.as_raw_fd(),
2, WBUF,
0, SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);
let rcb = AioCb::from_mut_slice( f.as_raw_fd(),
8, &mut rbuf,
0, SigevNotify::SigevNone,
LioOpcode::LIO_READ);
let mut liocb = LioCb::with_capacity(2);
liocb.aiocbs.push(wcb);
liocb.aiocbs.push(rcb);
let err = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone);
err.expect("lio_listio");
poll_aio(&mut liocb.aiocbs[0]).unwrap();
poll_aio(&mut liocb.aiocbs[1]).unwrap();
assert_eq!(liocb.aiocbs[0].aio_return().unwrap() as usize, WBUF.len());
assert_eq!(liocb.aiocbs[1].aio_return().unwrap() as usize, rlen);
}
assert_eq!(rbuf.deref().deref(), b"3456");
f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf2).unwrap();
assert_eq!(len, EXPECT.len());
assert_eq!(rbuf2, EXPECT);
}
#[test]
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg_attr(any(target_arch = "mips", target_arch = "mips64", target_env = "musl"), ignore)]
fn test_liocb_listio_signal() {
let _m = ::SIGNAL_MTX.lock().expect("Mutex got poisoned by another test");
const INITIAL: &[u8] = b"abcdef123456";
const WBUF: &[u8] = b"CDEF";
let mut rbuf = vec![0; 4];
let rlen = rbuf.len();
let mut rbuf2 = Vec::new();
const EXPECT: &[u8] = b"abCDEF123456";
let mut f = tempfile().unwrap();
let sa = SigAction::new(SigHandler::Handler(sigfunc),
SaFlags::SA_RESETHAND,
SigSet::empty());
let sigev_notify = SigevNotify::SigevSignal { signal: Signal::SIGUSR2,
si_value: 0 };
f.write_all(INITIAL).unwrap();
{
let wcb = AioCb::from_slice( f.as_raw_fd(),
2, WBUF,
0, SigevNotify::SigevNone,
LioOpcode::LIO_WRITE);
let rcb = AioCb::from_mut_slice( f.as_raw_fd(),
8, &mut rbuf,
0, SigevNotify::SigevNone,
LioOpcode::LIO_READ);
let mut liocb = LioCb::with_capacity(2);
liocb.aiocbs.push(wcb);
liocb.aiocbs.push(rcb);
SIGNALED.store(false, Ordering::Relaxed);
unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap();
let err = liocb.listio(LioMode::LIO_NOWAIT, sigev_notify);
err.expect("lio_listio");
while !SIGNALED.load(Ordering::Relaxed) {
thread::sleep(time::Duration::from_millis(10));
}
assert_eq!(liocb.aiocbs[0].aio_return().unwrap() as usize, WBUF.len());
assert_eq!(liocb.aiocbs[1].aio_return().unwrap() as usize, rlen);
}
assert_eq!(rbuf.deref().deref(), b"3456");
f.seek(SeekFrom::Start(0)).unwrap();
let len = f.read_to_end(&mut rbuf2).unwrap();
assert_eq!(len, EXPECT.len());
assert_eq!(rbuf2, EXPECT);
}
#[test]
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[should_panic(expected = "Can't read into an immutable buffer")]
#[cfg_attr(target_env = "musl", ignore)]
fn test_liocb_listio_read_immutable() {
let rbuf: &[u8] = b"abcd";
let f = tempfile().unwrap();
let mut liocb = LioCb::from(vec![
AioCb::from_slice( f.as_raw_fd(),
2, rbuf,
0, SigevNotify::SigevNone,
LioOpcode::LIO_READ)
]);
let _ = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone);
}