use futures::future::{join_all, poll_fn, select_all};
use futures::{pin_mut, FutureExt, Stream, StreamExt};
use itertools::Itertools;
use crate::root::current_context;
use crate::{span, Config, InstrumentAwait, Registry};
async fn sleep(time: u64) {
tokio::time::sleep(std::time::Duration::from_millis(time)).await;
println!("slept {time}ms");
}
async fn sleep_nested() {
join_all([
sleep(1500).instrument_await("sleep nested 1500"),
sleep(2500).instrument_await("sleep nested 2500"),
])
.await;
}
async fn multi_sleep() {
sleep(400).await;
sleep(800)
.instrument_await("sleep another in multi sleep")
.await;
}
fn stream1() -> impl Stream<Item = ()> {
use futures::stream::{iter, once};
iter(std::iter::repeat_with(|| {
once(async {
sleep(150).await;
})
}))
.flatten()
}
fn stream2() -> impl Stream<Item = ()> {
use futures::stream::{iter, once};
iter([
once(async {
sleep(444).await;
})
.boxed(),
once(async {
join_all([
sleep(400).instrument_await("sleep nested 400"),
sleep(600).instrument_await("sleep nested 600"),
])
.await;
})
.boxed(),
])
.flatten()
}
async fn hello() {
async move {
join_all([
sleep(1000)
.boxed()
.instrument_await(span!("sleep {}", 1000)),
sleep(2000).boxed().instrument_await("sleep 2000"),
sleep_nested().boxed().instrument_await("sleep nested"),
multi_sleep().boxed().instrument_await("multi sleep"),
])
.await;
join_all([
sleep(1200).instrument_await("sleep 1200"),
sleep(2200).instrument_await("sleep 2200"),
])
.await;
select_all([
sleep(666).boxed().instrument_await("sleep 666"),
sleep_nested()
.boxed()
.instrument_await("sleep nested (should be cancelled)"),
])
.await;
sleep(233).instrument_await("sleep 233").await;
{
let mut stream1 = stream1().fuse().boxed();
let mut stream2 = stream2().fuse().boxed();
let mut count = 0;
'outer: loop {
tokio::select! {
_ = stream1.next().instrument_await(span!("stream1 next {count}")) => {},
r = stream2.next().instrument_await(span!("stream2 next {count}")) => {
if r.is_none() { break 'outer }
},
}
sleep(50)
.instrument_await(span!("sleep before next stream poll: {count}"))
.await;
count += 1;
}
}
sleep(233).instrument_await("sleep 233").await;
}
.instrument_await("hello")
.await;
assert_eq!(current_context().unwrap().tree().active_node_count(), 1);
}
#[tokio::test]
async fn test_await_tree() {
let registry = Registry::new(Config::default());
let root = registry.register((), "actor 233");
let fut = root.instrument(hello());
pin_mut!(fut);
let expected_counts = vec![
(1, 0),
(8, 0),
(9, 0),
(8, 0),
(6, 0),
(5, 0),
(4, 0),
(4, 0),
(3, 0),
(6, 0),
(3, 0),
(4, 0),
(3, 0),
(4, 0),
(3, 0),
(4, 0),
(3, 0),
(6, 0),
(5, 2),
(6, 0),
(5, 2),
(6, 0),
(5, 0),
(4, 1),
(5, 0),
(3, 0),
(3, 0),
];
let mut actual_counts = vec![];
poll_fn(|cx| {
let tree = registry
.collect::<()>()
.into_iter()
.exactly_one()
.ok()
.unwrap()
.1;
println!("{tree}");
actual_counts.push((tree.active_node_count(), tree.detached_node_count()));
fut.poll_unpin(cx)
})
.await;
assert_eq!(actual_counts, expected_counts);
}