#![allow(clippy::expect_used, clippy::unwrap_used)]
mod support;
use std::time::{Duration, Instant};
use support::{
latest_live_connection_id_for_peer, lifecycle_events, make_node, normalize_local_addr,
reset_lifecycle_events, spawn_accept_loop, test_guard, wait_until,
};
use tokio::time::sleep;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn simultaneous_connect_settles_on_complementary_live_views() {
let _guard = test_guard().await;
reset_lifecycle_events();
let a = make_node(vec![]).await;
let b = make_node(vec![]).await;
let a_addr = normalize_local_addr(a.local_addr().expect("a addr"));
let b_addr = normalize_local_addr(b.local_addr().expect("b addr"));
let a_id = a.peer_id();
let b_id = b.peer_id();
let accept_a = spawn_accept_loop(a.clone());
let accept_b = spawn_accept_loop(b.clone());
for _ in 0..5 {
let a_task = {
let a = a.clone();
tokio::spawn(async move { a.connect_addr(b_addr).await })
};
let b_task = {
let b = b.clone();
tokio::spawn(async move { b.connect_addr(a_addr).await })
};
let _ = a_task.await.expect("a join");
let _ = b_task.await.expect("b join");
wait_until(Duration::from_secs(5), || {
a.get_quic_connection(&b_id).ok().flatten().is_some()
&& b.get_quic_connection(&a_id).ok().flatten().is_some()
})
.await;
if lifecycle_events()
.iter()
.any(|event| event.fields.get("to_state") == Some(&"Superseded".to_string()))
{
break;
}
}
let deadline = Instant::now() + Duration::from_secs(5);
let (a_live_id, b_live_id) = loop {
match (
latest_live_connection_id_for_peer(b_id),
latest_live_connection_id_for_peer(a_id),
) {
(Some(a_live_id), Some(b_live_id)) if a_live_id == b_live_id => {
break (a_live_id, b_live_id);
}
_ if Instant::now() >= deadline => {
panic!(
"lifecycle connection_ids did not converge: {:?}",
lifecycle_events()
);
}
_ => sleep(Duration::from_millis(20)).await,
}
};
let _a_conn = a
.get_quic_connection(&b_id)
.expect("a lookup")
.expect("a live conn");
let _b_conn = b
.get_quic_connection(&a_id)
.expect("b lookup")
.expect("b live conn");
assert_eq!(
a_live_id, b_live_id,
"both endpoints must converge on the same lifecycle connection_id winner"
);
assert!(
lifecycle_events()
.iter()
.any(|event| event.fields.get("to_state") == Some(&"Superseded".to_string())),
"simultaneous connect should trigger lifecycle replacement logging"
);
let _ = a.shutdown().await;
let _ = b.shutdown().await;
accept_a.abort();
accept_b.abort();
}