use arbiter::{fixtures::*, network::memory::InMemory, runtime::Runtime};
use tokio_stream::StreamExt;
#[tokio::test]
async fn ping_pong_self_stop() {
let mut runtime = Runtime::<InMemory>::new();
let ping =
runtime.spawn(PingPlayer { count: 0, max_count: 10 }).with_handler::<Pong>().with_name("ping");
let pong = runtime.spawn(PongPlayer).with_handler::<Ping>().with_name("pong");
let mut ping = runtime.process(ping);
let mut pong = runtime.process(pong);
let mut snapshots = ping.stream().unwrap();
ping.start().await.unwrap();
pong.start().await.unwrap();
for expected in 0..=10 {
assert_eq!(snapshots.next().await.unwrap(), expected);
}
assert_eq!(snapshots.next().await, None);
}
#[tokio::test]
async fn observer_counts_routed_messages() {
let mut runtime = Runtime::<InMemory>::new();
let ping =
runtime.spawn(PingPlayer { count: 0, max_count: 5 }).with_handler::<Pong>().with_name("ping");
let pong = runtime.spawn(PongPlayer).with_handler::<Ping>().with_name("pong");
let observer = runtime
.spawn(Counter { count: 0 })
.with_handler::<Ping>()
.with_handler::<Pong>()
.with_name("observer");
let mut ping = runtime.process(ping);
let mut pong = runtime.process(pong);
let mut observer = runtime.process(observer);
let mut observer_snapshots = observer.stream().unwrap();
ping.start().await.unwrap();
pong.start().await.unwrap();
observer.start().await.unwrap();
assert_eq!(observer_snapshots.next().await.unwrap(), 0);
let mut last = 0;
while last < 12 {
last = observer_snapshots.next().await.unwrap();
}
assert_eq!(last, 12);
observer.stop().await.unwrap();
}
#[tokio::test]
async fn dual_snapshot_streams() {
let mut runtime = Runtime::<InMemory>::new();
let ping =
runtime.spawn(PingPlayer { count: 0, max_count: 5 }).with_handler::<Pong>().with_name("ping");
let pong = runtime.spawn(PongPlayer).with_handler::<Ping>().with_name("pong");
let observer = runtime
.spawn(Counter { count: 0 })
.with_handler::<Ping>()
.with_handler::<Pong>()
.with_name("observer");
let mut ping = runtime.process(ping);
let mut pong = runtime.process(pong);
let mut observer = runtime.process(observer);
let mut ping_snapshots = ping.stream().unwrap();
let mut observer_snapshots = observer.stream().unwrap();
ping.start().await.unwrap();
pong.start().await.unwrap();
observer.start().await.unwrap();
for expected in 0..=5 {
assert_eq!(ping_snapshots.next().await.unwrap(), expected);
}
assert_eq!(ping_snapshots.next().await, None);
assert_eq!(observer_snapshots.next().await.unwrap(), 0);
let mut last = 0;
while last < 12 {
last = observer_snapshots.next().await.unwrap();
}
assert_eq!(last, 12);
observer.stop().await.unwrap();
}