use std::{sync::Arc, time::Duration};
use anapaya_quinn::{EndpointConfig, crypto::rustls::QuicClientConfig};
use anyhow::Context as _;
use bytes::Bytes;
use pocketscion::topologies::{IA132, IA212, UnderlayType, minimal::two_path_topology};
use rustls::ClientConfig;
use scion_stack::{quic::QuinnConn as _, scionstack::ScionStackBuilder};
use snap_tokens::v0::dummy_snap_token;
use test_log::test;
use tokio::{
sync::Barrier,
time::{sleep, timeout},
};
use tracing::info;
#[test(tokio::test)]
#[ntest::timeout(5_000)]
async fn should_failover_on_link_error() {
let ps_handle = two_path_topology(UnderlayType::Snap).await;
let sender_stack = ScionStackBuilder::new()
.with_endhost_api(ps_handle.endhost_api(IA132).await.unwrap())
.with_auth_token(dummy_snap_token())
.build()
.await
.unwrap();
let receiver_stack = ScionStackBuilder::new()
.with_endhost_api(ps_handle.endhost_api(IA212).await.unwrap())
.with_auth_token(dummy_snap_token())
.build()
.await
.unwrap();
let sender_socket = Arc::new(sender_stack.bind(None).await.unwrap());
let sender_addr = sender_socket.local_addr();
let receiver_socket = receiver_stack.bind(None).await.unwrap();
let receiver_addr = receiver_socket.local_addr();
let test_data = Bytes::from("Hello, World!");
let mut recv_buffer = [0u8; 1024];
let failover_send_barrier = Arc::new(Barrier::new(2));
let sender_socket_clone = sender_socket.clone();
let sender_recv_task = tokio::spawn(async move {
let mut recv_buffer = [0u8; 1024];
let (..) = sender_socket_clone
.recv_from(&mut recv_buffer)
.await
.unwrap();
panic!("Sender should not receive udp packets");
});
let sender_task = tokio::spawn({
let failover_send_barrier = failover_send_barrier.clone();
let test_data = test_data.clone();
async move {
sender_socket
.send_to(test_data.as_ref(), receiver_addr)
.await
.unwrap_or_else(|_| {
panic!("error sending from {sender_addr:?} to {receiver_addr:?}")
});
failover_send_barrier.wait().await;
sender_socket
.send_to(test_data.as_ref(), receiver_addr)
.await
.unwrap_or_else(|_| {
panic!("error sending from {sender_addr:?} to {receiver_addr:?}")
});
loop {
sleep(Duration::from_millis(100)).await;
sender_socket
.send_to(test_data.as_ref(), receiver_addr)
.await
.unwrap_or_else(|_| {
panic!("error sending from {sender_addr:?} to {receiver_addr:?}")
});
}
}
});
let mut path_buffer = vec![0u8; 1500];
let (_, source, path) = receiver_socket
.recv_from_with_path(&mut recv_buffer, &mut path_buffer)
.await
.unwrap();
assert_eq!(
source, sender_addr,
"receiver should receive packets from the sender"
);
let egress = path
.first_hop_egress_interface()
.expect("path should have first hop egress interface");
ps_handle
.api_client
.set_link_state(egress.isd_asn, egress.id, false)
.await
.unwrap();
failover_send_barrier.wait().await;
let mut path_buffer = vec![0u8; 1500];
let mut recv_buffer = [0u8; 1024];
let (_size, _addr, new_path) = timeout(
Duration::from_millis(500),
receiver_socket.recv_from_with_path(&mut recv_buffer, &mut path_buffer),
)
.await
.expect("should not time out waiting for packet after failover")
.expect("should receive packet after failover");
info!(old_path = ?path, new_path = ?new_path, "path changed?");
assert_ne!(path, new_path, "should use a different path after failover");
sender_task.abort();
sender_recv_task.abort();
}
async fn verify_quic_bidirectional_communication(
client_conn: &anapaya_quinn::Connection,
server_conn: &scion_stack::quic::ScionQuinnConn,
test_data: Bytes,
timeout_duration: Duration,
) -> anyhow::Result<()> {
let (client_send_result, server_send_result) = tokio::join!(
client_conn.send_datagram_wait(test_data.clone()),
server_conn.send_datagram_wait(test_data.clone())
);
client_send_result.context("failed to send data from client to server")?;
server_send_result.context("failed to send data from server to client")?;
let (server_recv_result, client_recv_result) = tokio::join!(
timeout(timeout_duration, server_conn.read_datagram()),
timeout(timeout_duration, client_conn.read_datagram())
);
let recv_data =
server_recv_result.context("should not time out waiting for packet from client")??;
let recv_response =
client_recv_result.context("should not time out waiting for response from server")??;
assert_eq!(
recv_data.as_ref(),
test_data.as_ref(),
"server should receive data from client"
);
assert_eq!(
recv_response.as_ref(),
test_data.as_ref(),
"client should receive data from server"
);
Ok(())
}
#[test(tokio::test)]
async fn should_quic_failover_on_link_error() {
scion_sdk_utils::rustls::select_ring_crypto_provider();
let ps_handle = two_path_topology(UnderlayType::Snap).await;
ps_handle
.api_client
.set_link_state(IA132, 2, false)
.await
.unwrap();
let sender_stack = ScionStackBuilder::new()
.with_endhost_api(ps_handle.endhost_api(IA132).await.unwrap())
.with_auth_token(dummy_snap_token())
.build()
.await
.unwrap();
let receiver_stack = ScionStackBuilder::new()
.with_endhost_api(ps_handle.endhost_api(IA212).await.unwrap())
.with_auth_token(dummy_snap_token())
.build()
.await
.unwrap();
let (cert_der, server_config) =
scion_sdk_utils::test::generate_cert([42u8; 32], vec!["localhost".into()], vec![]);
#[allow(deprecated)]
let server_endpoint = receiver_stack
.quic_endpoint(None, EndpointConfig::default(), Some(server_config), None)
.await
.unwrap();
let server_addr = server_endpoint.local_scion_addr();
let mut roots = rustls::RootCertStore::empty();
roots.add(cert_der).unwrap();
let client_crypto = ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
let client_config = anapaya_quinn::ClientConfig::new(Arc::new(
QuicClientConfig::try_from(client_crypto).unwrap(),
));
#[allow(deprecated)]
let mut client_endpoint = sender_stack
.quic_endpoint(None, EndpointConfig::default(), None, None)
.await
.unwrap();
client_endpoint.set_default_client_config(client_config);
let (server_conn, conn) = tokio::join!(
async { server_endpoint.accept().await.unwrap().unwrap() },
async {
client_endpoint
.connect(server_addr, "localhost")
.unwrap()
.await
.unwrap()
}
);
info!(
"QUIC connection established from {} to {}",
client_endpoint.local_scion_addr(),
server_addr
);
let test_data = Bytes::from("Hello, QUIC!");
verify_quic_bidirectional_communication(
&conn,
&server_conn,
test_data,
Duration::from_millis(100),
)
.await
.expect("initial QUIC communication should work");
ps_handle
.api_client
.set_link_state(IA132, 2, true)
.await
.unwrap();
ps_handle
.api_client
.set_link_state(IA132, 1, false)
.await
.unwrap();
let test_data2 = Bytes::from("Hello after path change!");
for i in 0..200 {
match verify_quic_bidirectional_communication(
&conn,
&server_conn,
test_data2.clone(),
Duration::from_millis(100),
)
.await
{
Ok(_) => break,
Err(_) => {
if i == 199 {
panic!(
"failed to verify QUIC communication after path change after 200 attempts"
);
}
}
}
}
}