use std::{collections::HashSet, time::Duration};
use futures::{future::pending, StreamExt};
use tiny_actor::*;
#[tokio::test]
async fn spawn_and_abort() {
let (mut child, _address) = spawn(Config::default(), |_: Inbox<()>| async move {
let () = pending().await;
});
child.abort();
assert!(child.await.unwrap_err().is_abort());
}
#[tokio::test]
async fn spawn_await_address() {
let (mut child, address) = spawn(Config::default(), |_: Inbox<()>| async move {
let () = pending().await;
});
child.abort();
address.await;
}
#[tokio::test]
async fn spawn_and_panic() {
let (child, _address) = spawn(Config::default(), |_: Inbox<()>| async move { panic!() });
assert!(child.await.unwrap_err().is_panic());
}
#[tokio::test]
async fn spawn_and_normal_exit() {
let (child, _address) = spawn(Config::default(), |_: Inbox<()>| async move {});
assert!(child.await.is_ok());
}
#[tokio::test]
async fn spawn_and_halt() {
let (child, _address) = spawn(Config::default(), |mut inbox: Inbox<()>| async move {
assert_eq!(inbox.recv().await.unwrap_err(), RecvError::Halted);
});
child.halt();
assert!(child.await.is_ok());
}
#[tokio::test]
async fn spawn_and_drop() {
let (child, address) = spawn(
Config {
link: Link::Attached(Duration::from_millis(10)),
capacity: Capacity::Bounded(10),
},
|mut inbox: Inbox<()>| async move {
assert_eq!(inbox.recv().await.unwrap_err(), RecvError::Halted);
let () = pending().await;
},
);
drop(child);
address.await;
}
#[tokio::test]
async fn spawn_and_drop_detached() {
let (child, address) = spawn(
Config::new(Link::Detached, Capacity::Unbounded(BackPressure::default())),
|mut inbox: Inbox<()>| async move {
assert_eq!(inbox.recv().await.unwrap(), ());
},
);
drop(child);
tokio::time::sleep(Duration::from_millis(10)).await;
address.send(()).await.unwrap();
address.await;
}
#[tokio::test]
async fn base_counts() {
let (mut child, address) = spawn(Config::default(), |inbox: Inbox<()>| async move {
pending::<()>().await;
drop(inbox);
});
assert_eq!(child.address_count(), 1);
assert_eq!(child.process_count(), 1);
assert_eq!(address.msg_count(), 0);
child.abort();
}
#[tokio::test]
async fn address_counts() {
let (mut child, address) = spawn(Config::default(), |inbox: Inbox<()>| async move {
pending::<()>().await;
drop(inbox);
});
assert_eq!(child.address_count(), 1);
let address2 = address.clone();
assert_eq!(child.address_count(), 2);
drop(address2);
assert_eq!(child.address_count(), 1);
child.abort();
}
#[tokio::test]
async fn inbox_counts() {
let (pool, _address) = spawn_many(
0..4,
Config::default(),
|_, mut inbox: Inbox<()>| async move {
inbox.recv().await.unwrap_err();
},
);
let mut pool = pool.into_dyn();
assert_eq!(pool.process_count(), 4);
pool.halt_some(1);
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(pool.process_count(), 3);
pool.try_spawn(|mut inbox: Inbox<()>| async move {
inbox.recv().await.unwrap_err();
})
.unwrap();
assert_eq!(pool.process_count(), 4);
pool.halt_some(2);
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(pool.process_count(), 2);
pool.halt();
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(pool.process_count(), 0);
}
#[tokio::test]
async fn pooled_messaging_split() {
let (pool, address) = spawn_many(0..3, Config::bounded(5), |_, mut inbox| async move {
let mut numbers = Vec::new();
loop {
match inbox.recv().await {
Ok(msg) => {
tokio::time::sleep(Duration::from_millis(1)).await;
numbers.push(msg);
}
Err(signal) => match signal {
RecvError::Halted => break Ok(numbers),
RecvError::ClosedAndEmpty => break Err(()),
},
}
}
});
for i in 0..30 {
address.send(i).await.unwrap();
}
tokio::time::sleep(Duration::from_millis(20)).await;
address.halt();
let res = pool
.map(|e| e.unwrap().unwrap())
.fold(HashSet::new(), |mut acc, vals| async move {
assert!(vals.len() >= 9);
assert!(vals.len() <= 11);
for val in vals {
assert!(acc.insert(val));
}
acc
})
.await;
assert_eq!(res.len(), 30)
}