ant-quic 0.27.3

QUIC transport protocol with advanced NAT traversal for P2P networks
Documentation
#![allow(clippy::expect_used, clippy::unwrap_used)]

mod support;

use std::time::{Duration, Instant};
use support::{
    latest_live_connection_id_for_peer, lifecycle_events, make_node, normalize_local_addr,
    reset_lifecycle_events, spawn_accept_loop, test_guard, wait_until,
};
use tokio::time::sleep;

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn simultaneous_connect_settles_on_complementary_live_views() {
    let _guard = test_guard().await;
    reset_lifecycle_events();

    let a = make_node(vec![]).await;
    let b = make_node(vec![]).await;
    let a_addr = normalize_local_addr(a.local_addr().expect("a addr"));
    let b_addr = normalize_local_addr(b.local_addr().expect("b addr"));
    let a_id = a.peer_id();
    let b_id = b.peer_id();

    let accept_a = spawn_accept_loop(a.clone());
    let accept_b = spawn_accept_loop(b.clone());

    for _ in 0..5 {
        let a_task = {
            let a = a.clone();
            tokio::spawn(async move { a.connect_addr(b_addr).await })
        };
        let b_task = {
            let b = b.clone();
            tokio::spawn(async move { b.connect_addr(a_addr).await })
        };

        let _ = a_task.await.expect("a join");
        let _ = b_task.await.expect("b join");

        wait_until(Duration::from_secs(5), || {
            a.get_quic_connection(&b_id).ok().flatten().is_some()
                && b.get_quic_connection(&a_id).ok().flatten().is_some()
        })
        .await;

        if lifecycle_events()
            .iter()
            .any(|event| event.fields.get("to_state") == Some(&"Superseded".to_string()))
        {
            break;
        }
    }

    let deadline = Instant::now() + Duration::from_secs(5);
    let (a_live_id, b_live_id) = loop {
        match (
            latest_live_connection_id_for_peer(b_id),
            latest_live_connection_id_for_peer(a_id),
        ) {
            (Some(a_live_id), Some(b_live_id)) if a_live_id == b_live_id => {
                break (a_live_id, b_live_id);
            }
            _ if Instant::now() >= deadline => {
                panic!(
                    "lifecycle connection_ids did not converge: {:?}",
                    lifecycle_events()
                );
            }
            _ => sleep(Duration::from_millis(20)).await,
        }
    };

    let _a_conn = a
        .get_quic_connection(&b_id)
        .expect("a lookup")
        .expect("a live conn");
    let _b_conn = b
        .get_quic_connection(&a_id)
        .expect("b lookup")
        .expect("b live conn");

    assert_eq!(
        a_live_id, b_live_id,
        "both endpoints must converge on the same lifecycle connection_id winner"
    );
    assert!(
        lifecycle_events()
            .iter()
            .any(|event| event.fields.get("to_state") == Some(&"Superseded".to_string())),
        "simultaneous connect should trigger lifecycle replacement logging"
    );

    let _ = a.shutdown().await;
    let _ = b.shutdown().await;
    accept_a.abort();
    accept_b.abort();
}