mod common;
use std::{
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
time::{Duration, Instant},
};
use ana_gotatun::{noise::rate_limiter::RateLimiter, packet::PacketBufPool, x25519};
use common::{
mocks::{MockAuthorization, MockControlPlaneClient},
server_harness::ServerHarness,
test_helpers::{build_test_scion_packet, setup_test_environment},
};
use snap_tun::{
client::{PACKET_BUF_POOL_SIZE, SnapTunEndpoint},
server::SnapTunServer,
};
#[tokio::test]
async fn test_client_connect_and_echo() {
let env = setup_test_environment().await;
let tunnel = env
.endpoint
.connect_tunnel(
env.server_static_identity,
env.server_harness.socket_addr(),
url::Url::parse("http://localhost:8080").unwrap(),
env.mock_control_plane.clone(),
env.client_socket.clone(),
100,
PacketBufPool::<PACKET_BUF_POOL_SIZE>::new(1024),
)
.await
.expect("Failed to connect tunnel");
let local_addr = tunnel.local_addr();
assert_ne!(
local_addr.port(),
0,
"Local address should have non-zero port"
);
let test_payload = b"Hello, SNAP tunnel!";
let mut test_packet = build_test_scion_packet(test_payload);
let test_packet_clone = test_packet.buf_mut().clone();
tunnel
.send(test_packet)
.await
.expect("Failed to send packet");
let (received_packet, source_addr) = env
.server_harness
.recv_from_tunnel(Duration::from_secs(2))
.await
.expect("Failed to receive packet from tunnel");
assert_eq!(
received_packet.as_ref(),
test_packet_clone.as_ref(),
"Received packet should match sent packet"
);
assert_eq!(
source_addr, local_addr,
"Source address should match tunnel local address"
);
env.server_harness
.send_to_tunnel(received_packet.clone(), local_addr);
let echo_packet = tokio::time::timeout(Duration::from_secs(2), tunnel.recv())
.await
.expect("Timeout waiting for echo")
.expect("Failed to receive echo");
assert_eq!(
echo_packet, test_packet_clone,
"Echo packet should match original"
);
drop(tunnel);
env.server_harness.stop().await;
}
#[test_log::test(tokio::test)]
async fn test_handshake_timeout_unregistered_identity() {
let client_static_secret = x25519::StaticSecret::from([3u8; 32]);
let server_static_secret = x25519::StaticSecret::from([4u8; 32]);
let server_static_identity = x25519::PublicKey::from(&server_static_secret);
let mut mock_cp = MockControlPlaneClient::new();
mock_cp
.expect_register_identity()
.returning(|_, _| Ok(None));
let mock_cp = Arc::new(mock_cp);
let mock_authz = Arc::new(MockAuthorization::new());
let rate_limiter = Arc::new(RateLimiter::new(&server_static_identity, 100));
let server = SnapTunServer::new(server_static_secret, rate_limiter, mock_authz);
let server_harness = Arc::new(
ServerHarness::new(server, "127.0.0.1:0".parse().unwrap())
.await
.expect("Failed to create server harness"),
);
let harness_clone = server_harness.clone();
let server_task = tokio::spawn(async move {
harness_clone.run().await;
});
let token_source = Arc::new(
scion_sdk_reqwest_connect_rpc::token_source::mock::MockTokenSource::new(
"test-token".to_string(),
),
);
let endpoint = SnapTunEndpoint::new(token_source, client_static_secret);
let client_socket = Arc::new(
tokio::net::UdpSocket::bind("127.0.0.1:0")
.await
.expect("Failed to bind client socket"),
);
let result = tokio::time::timeout(
Duration::from_secs(1),
endpoint.connect_tunnel(
server_static_identity,
server_harness.socket_addr(),
url::Url::parse("http://localhost:8080").unwrap(),
mock_cp,
client_socket,
100,
PacketBufPool::<PACKET_BUF_POOL_SIZE>::new(1024),
),
)
.await;
assert!(result.is_err(), "Expected timeout error");
server_harness.cancel_token().cancel();
let _ = server_task.await;
}
#[tokio::test]
async fn test_token_expiry_drops_tunnel() {
let env = setup_test_environment().await;
let client_static_identity = x25519::PublicKey::from(&env.client_static_secret);
env.mock_authz
.revoke_identity(client_static_identity.as_bytes());
env.mock_authz
.authorize_for_duration(*client_static_identity.as_bytes(), Duration::from_secs(1));
let tunnel = env
.endpoint
.connect_tunnel(
env.server_static_identity,
env.server_harness.socket_addr(),
url::Url::parse("http://localhost:8080").unwrap(),
env.mock_control_plane.clone(),
env.client_socket.clone(),
100,
PacketBufPool::<PACKET_BUF_POOL_SIZE>::new(1024),
)
.await
.expect("Failed to connect tunnel");
let _local_addr = tunnel.local_addr();
let test_payload = b"Test packet";
let start = Instant::now();
let (success_count, last_success) = tokio::time::timeout(Duration::from_secs(3), async {
let mut success_count = 0;
let mut last_success = Instant::now();
loop {
tunnel
.send(build_test_scion_packet(test_payload))
.await
.expect("send queues");
match env
.server_harness
.recv_from_tunnel(Duration::from_millis(100))
.await
{
Some(_) => {
success_count += 1;
last_success = Instant::now();
}
None => {
if last_success.elapsed() > Duration::from_millis(500) {
break (success_count, last_success);
}
}
}
}
})
.await
.expect("Test timed out");
assert!(
success_count > 0,
"Should have had some successful packets before expiry"
);
assert!(
last_success.duration_since(start) < Duration::from_secs(1),
"Authorization should have expired after 1 second"
);
drop(tunnel);
env.server_harness.stop().await;
}
#[tokio::test]
async fn test_client_reregisters_with_new_token() {
let env = setup_test_environment().await;
let call_count = Arc::new(AtomicUsize::new(0));
let count_clone = call_count.clone();
let mut mock_cp = MockControlPlaneClient::new();
mock_cp
.expect_register_identity()
.times(..)
.returning(move |_, _| {
count_clone.fetch_add(1, Ordering::SeqCst);
Ok(None)
});
let mock_cp = Arc::new(mock_cp);
let tunnel = env
.endpoint
.connect_tunnel(
env.server_static_identity,
env.server_harness.socket_addr(),
url::Url::parse("http://localhost:8080").unwrap(),
mock_cp.clone(),
env.client_socket.clone(),
100,
PacketBufPool::<PACKET_BUF_POOL_SIZE>::new(1024),
)
.await
.expect("Failed to connect tunnel");
tokio::time::timeout(Duration::from_secs(2), async {
if call_count.load(Ordering::SeqCst) >= 1 {
return;
}
tokio::time::sleep(Duration::from_millis(10)).await;
})
.await
.expect("Timeout waiting for initial registration");
env.token_source.update_token("new-token".to_string());
tokio::time::timeout(Duration::from_secs(2), async {
if call_count.load(Ordering::SeqCst) >= 2 {
return;
}
tokio::time::sleep(Duration::from_millis(10)).await;
})
.await
.expect("Timeout waiting for re-registration");
let test_payload = b"After re-registration";
let mut test_packet = build_test_scion_packet(test_payload);
let test_packet_clone = test_packet.buf_mut().clone();
tunnel
.send(test_packet)
.await
.expect("Failed to send packet");
let (received_packet, _) = env
.server_harness
.recv_from_tunnel(Duration::from_secs(2))
.await
.expect("Failed to receive packet after re-registration");
assert_eq!(
received_packet, test_packet_clone,
"Tunnel should still work after re-registration"
);
drop(tunnel);
env.server_harness.stop().await;
}