use std::time::{Duration, Instant};
use tokio::task;
use super::*;
#[test]
fn simple_test()
{
let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
let buf0_res = bufs.allocate();
assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
let buf0 = buf0_res.unwrap();
let buf0_w = buf0.write();
assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
drop(buf0_w);
let buf0_r = buf0.read();
assert_eq!(buf0_r.is_ok(), true, "{:?}", buf0_r.err().unwrap());
assert_eq!(buf0.write(), Err(RwBufferError::WriteTryAgianLater));
let buf0_1 = buf0.clone();
assert_eq!(buf0_1.write(), Err(RwBufferError::WriteTryAgianLater));
let flags0 = buf0.get_flags();
let flags0_1 = buf0_1.get_flags();
assert_eq!(flags0, flags0_1);
assert_eq!(flags0.base, 3);
assert_eq!(flags0.read, 1);
assert_eq!(flags0.write, false);
}
#[test]
fn simple_test_dopped_in_place()
{
let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
let buf0_res = bufs.allocate();
assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
let buf0 = buf0_res.unwrap();
println!("{:?}", buf0.get_flags());
let buf0_w = buf0.write();
assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
drop(buf0);
let buf0_flags = bufs.get_flags_by_index(0);
assert_eq!(buf0_flags.is_some(), true, "no flags");
let buf0_flags = buf0_flags.unwrap();
println!("{:?}", buf0_flags);
assert_eq!(buf0_flags.base, 1);
assert_eq!(buf0_flags.read, 0);
assert_eq!(buf0_flags.write, true);
drop(buf0_w.unwrap());
let buf0_flags = bufs.get_flags_by_index(0);
assert_eq!(buf0_flags.is_some(), true, "no flags");
let buf0_flags = buf0_flags.unwrap();
println!("{:?}", buf0_flags);
assert_eq!(buf0_flags.base, 1);
assert_eq!(buf0_flags.read, 0);
assert_eq!(buf0_flags.write, false);
}
#[test]
fn simple_test_dropped_in_place_downgrade()
{
let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
let buf0_res = bufs.allocate();
assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
let buf0 = buf0_res.unwrap();
println!("{:?}", buf0.get_flags());
let buf0_w = buf0.write();
assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
drop(buf0);
let buf0_rd = buf0_w.unwrap().downgrade();
assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
let buf0_flags = bufs.get_flags_by_index(0);
assert_eq!(buf0_flags.is_some(), true, "no flags");
let buf0_flags = buf0_flags.unwrap();
println!("{:?}", buf0_flags);
assert_eq!(buf0_flags.base, 1);
assert_eq!(buf0_flags.read, 1);
assert_eq!(buf0_flags.write, false);
}
#[test]
fn simple_test_drop_in_place_downgrade()
{
let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
let buf0_w =
{
let buf0 = bufs.allocate_in_place();
println!("1: {:?}", buf0.get_flags());
let buf0_w = buf0.write();
assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
drop(buf0);
buf0_w
};
let buf0_rd = buf0_w.unwrap().downgrade();
assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
let buf0_flags = bufs.get_flags_by_index(0);
assert_eq!(buf0_flags.is_some(), false, "flags");
let buf0_rd = buf0_rd.unwrap();
let buf0_flags = buf0_rd.get_flags();
println!("2: {:?}", buf0_flags);
assert_eq!(buf0_flags.base, 0);
assert_eq!(buf0_flags.read, 1);
assert_eq!(buf0_flags.write, false);
}
#[test]
fn timing_test()
{
let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
for _ in 0..10
{
let inst = Instant::now();
let buf0_res = bufs.allocate_in_place();
let end = inst.elapsed();
println!("alloc: {:?}", end);
drop(buf0_res);
}
let buf0_res = bufs.allocate();
assert_eq!(buf0_res.is_ok(), true, "{:?}", buf0_res.err().unwrap());
let buf0 = buf0_res.unwrap();
for _ in 0..10
{
let inst = Instant::now();
let buf0_w = buf0.write();
let end = inst.elapsed();
println!("write: {:?}", end);
assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
drop(buf0_w);
}
for _ in 0..10
{
let inst = Instant::now();
let buf0_r = buf0.read();
let end = inst.elapsed();
println!("read: {:?}", end);
assert_eq!(buf0_r.is_ok(), true, "{:?}", buf0_r.err().unwrap());
assert_eq!(buf0.write(), Err(RwBufferError::WriteTryAgianLater));
drop(buf0_r);
}
}
#[test]
fn simple_test_mth()
{
let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
let buf0 = bufs.allocate().unwrap();
let buf0_rd = buf0.write().unwrap().downgrade().unwrap();
let join1=
std::thread::spawn(move ||
{
println!("{:?}", buf0_rd);
std::thread::sleep(Duration::from_secs(2));
return;
}
);
let buf1_rd = buf0.read().unwrap();
let join2=
std::thread::spawn(move ||
{
println!("{:?}", buf1_rd);
std::thread::sleep(Duration::from_secs(2));
return;
}
);
let flags = buf0.get_flags();
assert_eq!(flags.base, 2);
assert_eq!(flags.read, 2);
assert_eq!(flags.write, false);
let _ = join1.join();
let _ = join2.join();
let flags = buf0.get_flags();
assert_eq!(flags.base, 2);
assert_eq!(flags.read, 0);
assert_eq!(flags.write, false);
}
#[test]
fn simple_test_concurent()
{
let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
let buf0 = bufs.allocate().unwrap();
let buf0_w = buf0.write().unwrap();
let join1=
std::thread::spawn(move ||
{
std::thread::park();
println!("{:?}", buf0_w);
std::thread::sleep(Duration::from_secs(3));
return;
}
);
join1.thread().unpark();
let s = Instant::now();
let buf1_rd =
loop
{
match buf0.read()
{
Ok(r) => break r,
Err(e) =>
{
assert_eq!(e, RwBufferError::ReadTryAgianLater);
continue;
}
}
};
let e = s.elapsed();
println!("read await {:?} {}", e, e.as_millis());
assert_eq!(e.as_millis(), 3000);
let _ = join1.join();
let flags = buf0.get_flags();
assert_eq!(flags.base, 2);
assert_eq!(flags.read, 1);
assert_eq!(flags.write, false);
drop(buf1_rd);
let flags = buf0.get_flags();
assert_eq!(flags.base, 2);
assert_eq!(flags.read, 0);
assert_eq!(flags.write, false);
}
#[test]
fn test_try_into_read()
{
let mut bufs = RwBuffers::new(4096, 1, 2).unwrap();
let buf0 = bufs.allocate_in_place();
println!("{:?}", buf0.get_flags());
let buf0_w = buf0.write();
assert_eq!(buf0_w.is_ok(), true, "{:?}", buf0_w.err().unwrap());
assert_eq!(buf0.read(), Err(RwBufferError::ReadTryAgianLater));
drop(buf0);
let buf0_rd = buf0_w.unwrap().downgrade();
assert_eq!(buf0_rd.is_ok(), true, "{:?}", buf0_rd.err().unwrap());
let buf0_flags = bufs.get_flags_by_index(0);
assert_eq!(buf0_flags.is_some(), false, "flags");
let buf0_rd = buf0_rd.unwrap();
let buf0_flags = buf0_rd.get_flags();
println!("{:?}", buf0_flags);
assert_eq!(buf0_flags.base, 0);
assert_eq!(buf0_flags.read, 1);
assert_eq!(buf0_flags.write, false);
let inst = Instant::now();
let ve = buf0_rd.try_inner();
let end = inst.elapsed();
println!("try inner: {:?}", end);
assert_eq!(ve.is_ok(), true);
}
#[tokio::test]
async fn test_multithreading()
{
let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
let buf0 = bufs.allocate().unwrap();
let mut buf0_write = buf0.write().unwrap();
buf0_write.as_mut_slice()[0] = 5;
buf0_write.as_mut_slice()[1] = 4;
println!("{}", buf0_write[0]);
let buf0_r = buf0_write.downgrade().unwrap();
let join1=
tokio::task::spawn(async move
{
println!("thread[1]:{}", buf0_r[0]);
tokio::time::sleep(Duration::from_millis(200)).await;
return;
}
);
let buf0_r = buf0.read().unwrap();
drop(buf0);
let join2=
tokio::task::spawn(async move
{
println!("thread[2]: {}", buf0_r[0]);
println!("thread[2]: {}", buf0_r[1]);
tokio::time::sleep(Duration::from_millis(200)).await;
return;
}
);
let _ = join1.await;
let _ = join2.await;
return;
}
#[tokio::test]
async fn test_multithreading_async()
{
let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
let buf0 = bufs.allocate().unwrap();
let buf0_w = buf0.write_async().await.unwrap();
let task_hndl =
task::spawn(async move
{
tokio::task::yield_now().await;
println!("{:?}", buf0_w);
tokio::time::sleep(Duration::from_secs(3)).await;
async_drop(buf0_w).await;
return;
}
);
let s = tokio::time::Instant::now();
let buf1_rd = buf0.read_async().await.unwrap();
let e = s.elapsed();
println!("read await {:?} {}", e, e.as_millis());
assert_eq!(e.as_millis(), 3000);
let _ = task_hndl.await;
let flags = buf0.get_flags();
assert_eq!(flags.base, 2);
assert_eq!(flags.read, 1);
assert_eq!(flags.write, false);
async_drop(buf1_rd).await;
let flags = buf0.get_flags();
assert_eq!(flags.base, 2);
assert_eq!(flags.read, 0);
assert_eq!(flags.write, false);
}
#[tokio::test]
async fn test_async_clone()
{
let mut bufs = RwBuffers::new(4096, 1, 3).unwrap();
let buf0 = bufs.allocate().unwrap();
let buf0_r = buf0.read_async().await.unwrap();
let c_buf0_r = buf0_r.async_clone().await;
println!("{:?} {:?}", buf0_r, c_buf0_r);
assert_eq!(buf0_r, c_buf0_r);
}