use phantom_protocol::api::{PhantomListener, PhantomSession, TcpSessionTransport};
use phantom_protocol::crypto::hybrid_sign::HybridVerifyingKey;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::timeout;
#[tokio::test]
#[ignore]
async fn tcp_integration_pinned_and_encrypted() {
let listener = PhantomListener::bind("127.0.0.1:0".to_string())
.await
.expect("bind listener");
let addr = listener.local_addr();
let server_key_bytes = listener.verifying_key_bytes();
let expected_key =
HybridVerifyingKey::from_bytes(&server_key_bytes).expect("deserialize verifying key");
let server_handle = tokio::spawn(async move {
let session = listener.accept().await.expect("accept").session();
let msg = session.recv().await.expect("server recv");
assert_eq!(msg, b"hello-from-client");
session
.send(b"hello-from-server".to_vec())
.await
.expect("server send");
tokio::time::sleep(Duration::from_millis(200)).await;
});
let tcp = TcpStream::connect(&addr).await.expect("tcp connect");
let transport = TcpSessionTransport::new(tcp);
let client = PhantomSession::connect_with_transport(&addr, transport, expected_key);
client
.send(b"hello-from-client".to_vec())
.await
.expect("client send");
let reply = timeout(Duration::from_secs(5), client.recv())
.await
.expect("client recv timeout")
.expect("client recv");
assert_eq!(reply, b"hello-from-server");
let _ = server_handle.await;
}
#[tokio::test]
#[ignore]
async fn tcp_integration_wrong_pinned_key_rejected() {
use phantom_protocol::api::ConnectionState;
let listener = PhantomListener::bind("127.0.0.1:0".to_string())
.await
.expect("bind listener");
let addr = listener.local_addr();
let _real_key_bytes = listener.verifying_key_bytes();
use phantom_protocol::crypto::hybrid_sign::HybridSigningKey;
let (_attacker_sk, attacker_pk) = HybridSigningKey::generate();
let _server_handle = tokio::spawn(async move {
let _ = timeout(Duration::from_secs(3), listener.accept()).await;
});
let tcp = TcpStream::connect(&addr).await.expect("tcp connect");
let transport = TcpSessionTransport::new(tcp);
let client = PhantomSession::connect_with_transport(
&addr,
transport,
attacker_pk, );
tokio::time::sleep(Duration::from_millis(800)).await;
let state = client.connection_state();
assert!(
matches!(state, ConnectionState::Failed | ConnectionState::Connecting),
"expected Failed (or still Connecting) after wrong pin, got {:?}",
state
);
}
#[tokio::test]
#[ignore]
async fn tcp_integration_zero_rtt_resumption_round_trip() {
use phantom_protocol::api::session::{connect_pinned, connect_pinned_with_resumption};
let listener = PhantomListener::bind("127.0.0.1:0".to_string())
.await
.expect("bind listener");
let local = listener.local_addr();
let (host, port_str) = local.rsplit_once(':').expect("local_addr is host:port");
let host = host.to_string();
let port: u16 = port_str.parse().expect("port parses");
let pinned = listener.verifying_key_bytes();
let server_handle = tokio::spawn(async move {
for _ in 0..2 {
let session = listener.accept().await.expect("accept").session();
let msg = session.recv().await.expect("server recv");
session.send(msg).await.expect("server send");
tokio::time::sleep(Duration::from_millis(200)).await;
}
});
let s1 = connect_pinned(host.clone(), port, pinned.clone())
.await
.expect("connect_pinned");
s1.send(b"ping-1".to_vec()).await.expect("c1 send");
let r1 = timeout(Duration::from_secs(5), s1.recv())
.await
.expect("c1 recv timeout")
.expect("c1 recv");
assert_eq!(r1, b"ping-1");
let hint = timeout(Duration::from_secs(5), async {
loop {
if let Some(h) = s1.resumption_hint().await {
return h;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
})
.await
.expect("resumption hint did not arrive within 5s");
assert_eq!(hint.session_id.len(), 32, "session_id is 32 bytes");
assert_eq!(
hint.resumption_secret.len(),
32,
"resumption_secret is 32 bytes"
);
let s2 =
connect_pinned_with_resumption(host, port, pinned, hint, b"zero-rtt-early-data".to_vec())
.await
.expect("connect_pinned_with_resumption");
s2.send(b"ping-2".to_vec()).await.expect("c2 send");
let r2 = timeout(Duration::from_secs(5), s2.recv())
.await
.expect("c2 recv timeout")
.expect("c2 recv");
assert_eq!(r2, b"ping-2");
let _ = server_handle.await;
}
#[tokio::test]
#[ignore]
async fn tcp_soak_drives_automatic_rekey_end_to_end() {
const MESSAGES: usize = 300;
const REKEY_EVERY: u64 = 8;
let listener = PhantomListener::bind("127.0.0.1:0".to_string())
.await
.expect("bind listener");
let addr = listener.local_addr();
let server_key_bytes = listener.verifying_key_bytes();
let expected_key =
HybridVerifyingKey::from_bytes(&server_key_bytes).expect("deserialize verifying key");
let server_handle = tokio::spawn(async move {
let session = listener.accept().await.expect("accept").session();
assert!(
session.set_rekey_threshold(REKEY_EVERY).await,
"server session should be established at accept()"
);
for _ in 0..MESSAGES {
let msg = session.recv().await.expect("server recv");
session.send(msg).await.expect("server echo");
}
let epoch = session.current_epoch().await.unwrap_or(0);
assert!(epoch > 0, "server epoch must advance via echo-driven rekey");
tokio::time::sleep(Duration::from_millis(200)).await;
});
let tcp = TcpStream::connect(&addr).await.expect("tcp connect");
let transport = TcpSessionTransport::new(tcp);
let client = PhantomSession::connect_with_transport(&addr, transport, expected_key);
let mut armed = false;
for _ in 0..100 {
if client.set_rekey_threshold(REKEY_EVERY).await {
armed = true;
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert!(armed, "client session never became established");
for i in 0..MESSAGES {
let payload = format!("soak-message-{i:05}").into_bytes();
client.send(payload.clone()).await.expect("client send");
let reply = timeout(Duration::from_secs(5), client.recv())
.await
.unwrap_or_else(|_| panic!("client recv timed out on message {i}"))
.expect("client recv");
assert_eq!(
reply, payload,
"echo {i} must round-trip intact across rekeys"
);
}
let client_epoch = client.current_epoch().await.unwrap_or(0);
assert!(
client_epoch > 5,
"client epoch must advance via automatic rekey across the soak (got {client_epoch})"
);
server_handle.await.expect("server task");
}
#[tokio::test]
#[ignore]
async fn tcp_zero_rtt_rejection_retransmits_early_data_over_1rtt() {
use phantom_protocol::api::session::{connect_pinned, connect_pinned_with_resumption};
let listener = PhantomListener::bind("127.0.0.1:0".to_string())
.await
.expect("bind listener");
let local = listener.local_addr();
let (host, port_str) = local.rsplit_once(':').expect("local_addr is host:port");
let host = host.to_string();
let port: u16 = port_str.parse().expect("port parses");
let pinned = listener.verifying_key_bytes();
let server_handle = tokio::spawn(async move {
{
let session = listener.accept().await.expect("accept 1").session();
assert_eq!(session.recv().await.expect("recv 1"), b"warmup");
}
{
let outcome = listener.accept().await.expect("accept 2");
assert_eq!(
outcome.take_early_data().as_deref(),
Some(&b"first-0rtt"[..]),
"a valid one-shot ticket must accept the 0-RTT early-data server-side"
);
}
{
let outcome = listener.accept().await.expect("accept 3");
assert!(
outcome.take_early_data().is_none(),
"a consumed ticket must reject 0-RTT (no server-side early-data take)"
);
let session = outcome.session();
let got = session
.recv()
.await
.expect("recv 3 — rejected early-data must be re-sent over 1-RTT");
assert_eq!(
got, b"second-0rtt-rejected",
"the rejected 0-RTT payload must arrive losslessly over the 1-RTT session"
);
}
tokio::time::sleep(Duration::from_millis(200)).await;
});
let c1 = connect_pinned(host.clone(), port, pinned.clone())
.await
.expect("connect_pinned c1");
c1.send(b"warmup".to_vec()).await.expect("c1 send");
let hint = timeout(Duration::from_secs(5), async {
loop {
if let Some(h) = c1.resumption_hint().await {
return h;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
})
.await
.expect("resumption hint did not arrive");
let _c2 = connect_pinned_with_resumption(
host.clone(),
port,
pinned.clone(),
hint.clone(),
b"first-0rtt".to_vec(),
)
.await
.expect("connect_pinned_with_resumption c2");
let c3 =
connect_pinned_with_resumption(host, port, pinned, hint, b"second-0rtt-rejected".to_vec())
.await
.expect("connect_pinned_with_resumption c3");
let mut verdict = None;
for _ in 0..200 {
verdict = c3.early_data_accepted().await;
if verdict.is_some() {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
assert_eq!(
verdict,
Some(false),
"reusing a one-shot ticket must reject the 0-RTT early-data"
);
server_handle.await.expect("server task");
}
#[tokio::test]
#[ignore]
async fn tcp_bidirectional_bulk_transfer_completes_byte_exact() {
use std::sync::Arc;
const TOTAL: usize = 512 * 1024; const CHUNK: usize = 8 * 1024;
fn payload(seed: u64, n: usize) -> Vec<u8> {
(0..n)
.map(|i| ((i as u64).wrapping_mul(31).wrapping_add(seed) % 251) as u8)
.collect()
}
let to_server = payload(7, TOTAL);
let to_client = payload(200, TOTAL);
let listener = PhantomListener::bind("127.0.0.1:0".to_string())
.await
.expect("bind listener");
let addr = listener.local_addr();
let key =
HybridVerifyingKey::from_bytes(&listener.verifying_key_bytes()).expect("deserialize key");
let to_client_srv = to_client.clone();
let to_server_expected = to_server.clone();
let server_handle = tokio::spawn(async move {
let session = listener.accept().await.expect("accept").session();
let s_send = session.clone();
let sender = tokio::spawn(async move {
for chunk in to_client_srv.chunks(CHUNK) {
s_send.send(chunk.to_vec()).await.expect("server send");
}
});
let mut got = Vec::with_capacity(TOTAL);
while got.len() < TOTAL {
let part = timeout(Duration::from_secs(30), session.recv())
.await
.expect("server recv timed out — flow-control deadlock?")
.expect("server recv");
got.extend_from_slice(&part);
}
sender.await.expect("server sender task");
assert_eq!(got.len(), TOTAL, "server received exactly TOTAL bytes");
assert!(
got == to_server_expected,
"server byte stream must match exactly (no loss/reorder)"
);
tokio::time::sleep(Duration::from_millis(200)).await;
});
let tcp = TcpStream::connect(&addr).await.expect("tcp connect");
let transport = TcpSessionTransport::new(tcp);
let client = Arc::new(PhantomSession::connect_with_transport(
&addr, transport, key,
));
let c_send = client.clone();
let to_server_cl = to_server.clone();
let sender = tokio::spawn(async move {
for chunk in to_server_cl.chunks(CHUNK) {
c_send.send(chunk.to_vec()).await.expect("client send");
}
});
let mut got = Vec::with_capacity(TOTAL);
while got.len() < TOTAL {
let part = timeout(Duration::from_secs(30), client.recv())
.await
.expect("client recv timed out — flow-control deadlock?")
.expect("client recv");
got.extend_from_slice(&part);
}
sender.await.expect("client sender task");
assert_eq!(got.len(), TOTAL, "client received exactly TOTAL bytes");
assert!(
got == to_client,
"client byte stream must match exactly (no loss/reorder)"
);
server_handle.await.expect("server task");
}
#[tokio::test]
#[ignore]
async fn tcp_blocked_consumer_does_not_stall_the_other_direction() {
use std::sync::Arc;
const SERVER_TO_CLIENT: usize = 512 * 1024; const CLIENT_FLOOD: usize = 1024 * 1024; const CHUNK: usize = 8 * 1024;
fn payload(seed: u64, n: usize) -> Vec<u8> {
(0..n)
.map(|i| ((i as u64).wrapping_mul(31).wrapping_add(seed) % 251) as u8)
.collect()
}
let s2c = payload(11, SERVER_TO_CLIENT);
let s2c_expected = s2c.clone();
let listener = PhantomListener::bind("127.0.0.1:0".to_string())
.await
.expect("bind listener");
let addr = listener.local_addr();
let key =
HybridVerifyingKey::from_bytes(&listener.verifying_key_bytes()).expect("deserialize key");
let server_handle = tokio::spawn(async move {
let session = listener.accept().await.expect("accept").session();
for chunk in s2c.chunks(CHUNK) {
session.send(chunk.to_vec()).await.expect("server send");
}
tokio::time::sleep(Duration::from_secs(8)).await;
});
let tcp = TcpStream::connect(&addr).await.expect("tcp connect");
let transport = TcpSessionTransport::new(tcp);
let client = Arc::new(PhantomSession::connect_with_transport(
&addr, transport, key,
));
let c_send = client.clone();
let flood = tokio::spawn(async move {
let burst = payload(99, CLIENT_FLOOD);
for chunk in burst.chunks(CHUNK) {
if tokio::time::timeout(Duration::from_secs(5), c_send.send(chunk.to_vec()))
.await
.is_err()
{
break;
}
}
});
let mut got = Vec::with_capacity(SERVER_TO_CLIENT);
while got.len() < SERVER_TO_CLIENT {
let part = timeout(Duration::from_secs(20), client.recv())
.await
.expect("client recv timed out — head-of-line stall regression")
.expect("client recv");
got.extend_from_slice(&part);
}
assert_eq!(got.len(), SERVER_TO_CLIENT);
assert!(
got == s2c_expected,
"server→client stream must arrive byte-exact despite a blocked reverse consumer"
);
flood.abort();
let _ = flood.await;
let _ = server_handle.await;
}
#[tokio::test]
#[ignore]
async fn tcp_peer_close_tears_down_session_cleanly() {
let listener = PhantomListener::bind("127.0.0.1:0".to_string())
.await
.expect("bind listener");
let addr = listener.local_addr();
let key =
HybridVerifyingKey::from_bytes(&listener.verifying_key_bytes()).expect("deserialize key");
let server_handle = tokio::spawn(async move {
let session = listener.accept().await.expect("accept").session();
let msg = session.recv().await.expect("server recv");
assert_eq!(msg, b"hello");
session.send(b"ack".to_vec()).await.expect("server send");
tokio::time::sleep(Duration::from_millis(300)).await;
drop(session);
tokio::time::sleep(Duration::from_millis(50)).await;
});
let tcp = TcpStream::connect(&addr).await.expect("tcp connect");
let transport = TcpSessionTransport::new(tcp);
let client = PhantomSession::connect_with_transport(&addr, transport, key);
client.send(b"hello".to_vec()).await.expect("client send");
let reply = timeout(Duration::from_secs(5), client.recv())
.await
.expect("client recv timeout")
.expect("client recv");
assert_eq!(reply, b"ack");
let after = timeout(Duration::from_secs(5), client.recv())
.await
.expect("recv() must not hang after peer close");
assert!(
after.is_err(),
"recv() after peer close must surface a closed-session error, got: {after:?}"
);
client.disconnect().await.expect("clean disconnect");
server_handle.await.expect("server task");
}
#[tokio::test]
#[ignore]
async fn tcp_integration_stalled_peer_does_not_block_accept() {
let listener = PhantomListener::bind("127.0.0.1:0".to_string())
.await
.expect("bind listener");
let addr = listener.local_addr();
let expected_key = HybridVerifyingKey::from_bytes(&listener.verifying_key_bytes()).expect("vk");
let staller = TcpStream::connect(&addr).await.expect("staller connect");
tokio::time::sleep(Duration::from_millis(150)).await;
let tcp = TcpStream::connect(&addr).await.expect("tcp connect");
let transport = TcpSessionTransport::new(tcp);
let client = PhantomSession::connect_with_transport(&addr, transport, expected_key);
let outcome = timeout(Duration::from_secs(5), listener.accept())
.await
.expect("accept must not block on the stalled peer")
.expect("accept returns the well-behaved session");
let server_session = outcome.session();
client
.send(b"ping-past-staller".to_vec())
.await
.expect("client send");
let got = timeout(Duration::from_secs(5), server_session.recv())
.await
.expect("server recv timeout")
.expect("server recv");
assert_eq!(got, b"ping-past-staller");
drop(staller);
}