use anyhow::Result;
use aqueue::{AQueue, Actor};
use futures_util::try_join;
use std::cell::Cell;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tokio::time::{sleep, Duration};
static VALUE: AtomicU64 = AtomicU64::new(0);
#[tokio::test]
async fn test_base() -> Result<()> {
let queue = Arc::new(AQueue::new());
let a_queue = queue.clone();
tokio::spawn(async move {
let x = a_queue
.run(
|_| async move {
println!("a");
1
},
&(),
)
.await;
println!("{:?}", x);
})
.await?;
let a_queue = queue.clone();
tokio::spawn(async move {
for i in 0..100 {
a_queue
.run(
|_| async move {
println!("b:{}", i);
},
&(),
)
.await;
}
})
.await?;
sleep(Duration::from_secs(2)).await;
let start = Instant::now();
let mut v = 0u64;
for i in 0..10000000 {
v = queue
.run(
|x| async move {
VALUE.fetch_add(*x, Ordering::Relaxed);
VALUE.load(Ordering::Relaxed)
},
&i,
)
.await;
}
println!("{} {}", start.elapsed().as_secs_f32(), v);
assert_eq!(v, 49999995000000);
Ok(())
}
#[tokio::test]
async fn test_string() -> Result<()> {
let queue = Arc::new(AQueue::new());
let str = 12345.to_string();
let len = queue.run(|x| async move { x.len() }, &str).await;
assert_eq!(len, 5);
struct Foo {
i: i32,
}
let foo = Foo { i: 5 };
let len = queue.run(|x| async move { x.i }, &foo).await;
assert_eq!(len, 5);
Ok(())
}
#[tokio::test]
async fn test_struct() -> Result<()> {
#[async_trait::async_trait]
trait IFoo {
async fn run(&self, x: i32, y: i32) -> i32;
fn get_count(&self) -> i32;
}
pub struct Foo {
count: Cell<i32>,
}
unsafe impl Sync for Foo {}
#[async_trait::async_trait]
impl IFoo for Foo {
async fn run(&self, x: i32, y: i32) -> i32 {
self.count.set(self.count.get() + 1);
x + y
}
fn get_count(&self) -> i32 {
self.count.get()
}
}
pub struct MakeActorIFoo {
inner: Arc<dyn IFoo + Sync + Send>,
queue: AQueue,
}
impl MakeActorIFoo {
pub fn from(x: Foo) -> MakeActorIFoo {
MakeActorIFoo {
inner: Arc::new(x),
queue: AQueue::new(),
}
}
}
#[async_trait::async_trait]
impl IFoo for MakeActorIFoo {
async fn run(&self, x: i32, y: i32) -> i32 {
self.queue.run(|inner| async move { inner.run(x, y).await }, &self.inner).await
}
fn get_count(&self) -> i32 {
self.inner.get_count()
}
}
let foo = Foo { count: Cell::new(0) };
let make = Arc::new(MakeActorIFoo::from(foo));
let x = make.run(1, 2).await;
assert_eq!(x, 3);
let begin = Instant::now();
let a_make = make.clone();
let a = tokio::spawn(async move {
let start = Instant::now();
for i in 0..2000000 {
a_make.run(i, i).await;
}
println!("a {} {}", start.elapsed().as_secs_f32(), a_make.inner.get_count());
});
let b_make = make.clone();
let b = tokio::spawn(async move {
let start = Instant::now();
for i in 0..2000000 {
b_make.run(i, i).await;
}
println!("b {} {}", start.elapsed().as_secs_f32(), b_make.inner.get_count());
});
let c = tokio::spawn(async move {
let start = Instant::now();
for i in 0..2000000 {
make.run(i, i).await;
}
println!("c {} {}", start.elapsed().as_secs_f32(), make.inner.get_count());
});
try_join!(a, b, c)?;
println!("all secs:{}", begin.elapsed().as_secs_f32());
Ok(())
}
#[tokio::test]
async fn test_count() -> Result<()> {
struct Foo {
count: u64,
data: String,
}
impl Foo {
pub fn add_one(&mut self) {
self.count += 1;
self.data.push_str(&self.count.to_string())
}
pub fn get_str(&self) -> String {
self.data.clone()
}
}
trait IFoo {
async fn add_one(&self) -> Result<()>;
async fn get_str(&self) -> Result<String>;
}
impl IFoo for Actor<Foo> {
async fn add_one(&self) -> Result<()> {
self.inner_call(|inner| async move {
inner.get_mut().add_one();
Ok(())
})
.await
}
async fn get_str(&self) -> Result<String> {
self.inner_call(|inner| async move { Ok(inner.get_mut().get_str()) }).await
}
}
let obj = Arc::new(Actor::new(Foo {
count: 0,
data: "".to_string(),
}));
let mut vec = vec![];
for _ in 0..1000 {
let p_obj = obj.clone();
vec.push(tokio::spawn(async move {
for _ in 0..1000 {
p_obj.add_one().await.unwrap();
}
}));
}
for j in vec {
j.await?;
}
let mut check = Foo {
count: 0,
data: "".to_string(),
};
for _ in 0..1000000 {
check.add_one();
}
let str = obj.get_str().await?;
assert_eq!(str, check.get_str());
Ok(())
}
#[tokio::test]
async fn test_actor() -> Result<()> {
#[derive(Default)]
struct Foo {
i: i32,
x: i32,
y: i32,
}
impl Foo {
pub fn get(&self) -> (i32, i32, i32) {
(self.i, self.x, self.y)
}
pub async fn set(&mut self, x: i32, y: i32) -> i32 {
self.x += x;
self.y += y;
sleep(Duration::from_millis(1)).await;
println!("{} {}", self.x, self.y);
self.i += 1;
self.i
}
}
trait FooRunner {
async fn set(&self, x: i32, y: i32) -> i32;
async fn get(&self) -> (i32, i32, i32);
async fn get_len<'a>(&'a self, b: &'a [u8]) -> usize;
}
impl FooRunner for Actor<Foo> {
async fn set(&self, x: i32, y: i32) -> i32 {
self.inner_call(|inner| async move { inner.get_mut().set(x, y).await }).await
}
async fn get(&self) -> (i32, i32, i32) {
self.inner_call(|inner| async move { inner.get().get() }).await
}
async fn get_len<'a>(&'a self, b: &'a [u8]) -> usize {
self.inner_call(|_| async move { b.len() }).await
}
}
let a_foo = Arc::new(Actor::new(Foo::default()));
let b_foo = a_foo.clone();
let b = tokio::spawn(async move {
for i in 0..100 {
let x = b_foo.set(i - 1, i + 1).await;
println!("b:{}", x);
}
});
let c_foo = a_foo.clone();
let c = tokio::spawn(async move {
for i in 0..100 {
let x = c_foo.set(i - 1, i + 1).await;
println!("c:{}", x);
}
});
for i in 200..300 {
let x = a_foo.set(i - 1, i + 1).await;
println!("a:{}", x);
}
try_join!(b, c)?;
assert_eq!((300, 34550, 35150), a_foo.get().await);
let buff = [1u8, 2, 3, 4, 5];
let x = { a_foo.get_len(&buff[..]).await };
assert_eq!(buff.len(), x);
Ok(())
}