use std::{
convert::TryInto,
mem,
net::{Ipv4Addr, Ipv6Addr, SocketAddr},
sync::{Arc, Mutex},
};
use assert_matches::assert_matches;
#[cfg(all(feature = "aws-lc-rs", not(feature = "ring")))]
use aws_lc_rs::hmac;
use bytes::{Bytes, BytesMut};
use hex_literal::hex;
use rand::Rng;
#[cfg(feature = "ring")]
use ring::hmac;
use rustls::{
AlertDescription, RootCertStore,
pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer},
server::WebPkiClientVerifier,
};
use tracing::info;
use super::*;
use crate::{
Duration, FourTuple, Instant,
Side::*,
cid_generator::{ConnectionIdGenerator, RandomConnectionIdGenerator},
crypto::rustls::{QuicServerConfig, configured_provider},
frame::FrameStruct,
transport_parameters::TransportParameters,
};
mod util;
pub(crate) use util::*;
#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
mod encode_decode;
mod multipath;
#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
mod proptests;
#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
mod random_interaction;
mod token;
#[cfg(all(target_family = "wasm", target_os = "unknown"))]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[test]
fn version_negotiate_server() {
let _guard = subscribe();
let client_addr = "[::2]:7890".parse().unwrap();
let mut server = Endpoint::new(Default::default(), Some(Arc::new(server_config())), true);
let now = Instant::now();
let mut buf = Vec::with_capacity(server.config().get_max_udp_payload_size() as usize);
let event = server.handle(
now,
FourTuple {
remote: client_addr,
local_ip: None,
},
None,
hex!("80 0a1a2a3a 04 00000000 04 00000000 00")[..].into(),
&mut buf,
);
let Some(DatagramEvent::Response(Transmit { .. })) = event else {
panic!("expected a response");
};
assert_ne!(buf[0] & 0x80, 0);
assert_eq!(&buf[1..15], hex!("00000000 04 00000000 04 00000000"));
assert!(buf[15..].chunks(4).any(|x| {
DEFAULT_SUPPORTED_VERSIONS.contains(&u32::from_be_bytes(x.try_into().unwrap()))
}));
}
#[test]
fn version_negotiate_client() {
let _guard = subscribe();
let server_addr = "[::2]:7890".parse().unwrap();
let cid_generator_factory: fn() -> Box<dyn ConnectionIdGenerator> =
|| Box::new(RandomConnectionIdGenerator::new(0));
let mut client = Endpoint::new(
Arc::new(EndpointConfig {
connection_id_generator_factory: Arc::new(cid_generator_factory),
..Default::default()
}),
None,
true,
);
let (_, mut client_ch) = client
.connect(Instant::now(), client_config(), server_addr, "localhost")
.unwrap();
let now = Instant::now();
let mut buf = Vec::with_capacity(client.config().get_max_udp_payload_size() as usize);
let opt_event = client.handle(
now,
FourTuple {
remote: server_addr,
local_ip: None,
},
None,
hex!(
"80 00000000 00 04 00000000
0a1a2a3a"
)[..]
.into(),
&mut buf,
);
if let Some(DatagramEvent::ConnectionEvent(_, event)) = opt_event {
client_ch.handle_event(event);
}
assert_matches!(
client_ch.poll(),
Some(Event::ConnectionLost {
reason: ConnectionError::VersionMismatch,
})
);
}
#[test]
fn lifecycle() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
assert!(pair.client_conn_mut(client_ch).using_ecn());
assert!(pair.server_conn_mut(server_ch).using_ecn());
const REASON: &[u8] = b"whee";
info!("closing");
pair.client.connections.get_mut(&client_ch).unwrap().close(
pair.time,
VarInt(42),
REASON.into(),
);
pair.drive();
assert_matches!(pair.server_conn_mut(server_ch).poll(),
Some(Event::ConnectionLost { reason: ConnectionError::ApplicationClosed(
ApplicationClose { error_code: VarInt(42), ref reason }
)}) if reason == REASON);
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
assert_eq!(pair.client.known_connections(), 0);
assert_eq!(pair.client.known_cids(), 0);
assert_eq!(pair.server.known_connections(), 0);
assert_eq!(pair.server.known_cids(), 0);
}
#[test]
fn draft_version_compat() {
let _guard = subscribe();
let mut client_config = client_config();
client_config.version(0xff00_0020);
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect_with(client_config);
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
assert!(pair.client_conn_mut(client_ch).using_ecn());
assert!(pair.server_conn_mut(server_ch).using_ecn());
const REASON: &[u8] = b"whee";
info!("closing");
pair.client.connections.get_mut(&client_ch).unwrap().close(
pair.time,
VarInt(42),
REASON.into(),
);
pair.drive();
assert_matches!(pair.server_conn_mut(server_ch).poll(),
Some(Event::ConnectionLost { reason: ConnectionError::ApplicationClosed(
ApplicationClose { error_code: VarInt(42), ref reason }
)}) if reason == REASON);
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
assert_eq!(pair.client.known_connections(), 0);
assert_eq!(pair.client.known_cids(), 0);
assert_eq!(pair.server.known_connections(), 0);
assert_eq!(pair.server.known_cids(), 0);
}
#[test]
fn server_stateless_reset() {
let _guard = subscribe();
let mut key_material = vec![0; 64];
let mut rng = rand::rng();
rng.fill_bytes(&mut key_material);
let reset_key = hmac::Key::new(hmac::HMAC_SHA256, &key_material);
rng.fill_bytes(&mut key_material);
let mut endpoint_config = EndpointConfig::new(Arc::new(reset_key));
endpoint_config.cid_generator(move || Box::new(HashedConnectionIdGenerator::from_key(0)));
let endpoint_config = Arc::new(endpoint_config);
let mut pair = Pair::new(endpoint_config.clone(), server_config());
let (client_ch, _) = pair.connect();
pair.drive(); pair.server.endpoint = Endpoint::new(endpoint_config, Some(Arc::new(server_config())), true);
pair.client.connections.get_mut(&client_ch).unwrap().ping();
info!("resetting");
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ConnectionLost {
reason: ConnectionError::Reset
})
);
}
#[test]
fn client_stateless_reset() {
let _guard = subscribe();
let mut key_material = vec![0; 64];
let mut rng = rand::rng();
rng.fill_bytes(&mut key_material);
let reset_key = hmac::Key::new(hmac::HMAC_SHA256, &key_material);
rng.fill_bytes(&mut key_material);
let mut endpoint_config = EndpointConfig::new(Arc::new(reset_key));
endpoint_config.cid_generator(move || Box::new(HashedConnectionIdGenerator::from_key(0)));
let endpoint_config = Arc::new(endpoint_config);
let mut pair = Pair::new(endpoint_config.clone(), server_config());
let (_, server_ch) = pair.connect();
pair.client.endpoint = Endpoint::new(endpoint_config, Some(Arc::new(server_config())), true);
pair.server.connections.get_mut(&server_ch).unwrap().close(
pair.time,
VarInt(42),
(&[0xab; 128][..]).into(),
);
info!("resetting");
pair.drive();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::ConnectionLost {
reason: ConnectionError::Reset
})
);
}
#[test]
fn stateless_reset_limit() {
let _guard = subscribe();
let remote = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 42);
let mut endpoint_config = EndpointConfig::default();
endpoint_config.cid_generator(move || Box::new(RandomConnectionIdGenerator::new(8)));
let endpoint_config = Arc::new(endpoint_config);
let mut endpoint = Endpoint::new(
endpoint_config.clone(),
Some(Arc::new(server_config())),
true,
);
let time = Instant::now();
let mut buf = Vec::new();
let network_path = FourTuple {
remote,
local_ip: None,
};
let event = endpoint.handle(time, network_path, None, [0u8; 1024][..].into(), &mut buf);
assert!(matches!(event, Some(DatagramEvent::Response(_))));
let event = endpoint.handle(time, network_path, None, [0u8; 1024][..].into(), &mut buf);
assert!(event.is_none());
let event = endpoint.handle(
time + endpoint_config.min_reset_interval - Duration::from_nanos(1),
network_path,
None,
[0u8; 1024][..].into(),
&mut buf,
);
assert!(event.is_none());
let event = endpoint.handle(
time + endpoint_config.min_reset_interval,
network_path,
None,
[0u8; 1024][..].into(),
&mut buf,
);
assert!(matches!(event, Some(DatagramEvent::Response(_))));
}
#[test]
fn export_keying_material() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
const LABEL: &[u8] = b"test_label";
const CONTEXT: &[u8] = b"test_context";
let mut client_buf = [0u8; 64];
pair.client_conn_mut(client_ch)
.crypto_session()
.export_keying_material(&mut client_buf, LABEL, CONTEXT)
.unwrap();
let mut server_buf = [0u8; 64];
pair.server_conn_mut(server_ch)
.crypto_session()
.export_keying_material(&mut server_buf, LABEL, CONTEXT)
.unwrap();
assert_eq!(&client_buf[..], &server_buf[..]);
}
#[test]
fn finish_stream_simple() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"hello";
pair.client_send(client_ch, s).write(MSG).unwrap();
assert_eq!(pair.client_streams(client_ch).send_streams(), 1);
pair.client_send(client_ch, s).finish().unwrap();
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Stream(StreamEvent::Finished { id })) if id == s
);
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
assert_eq!(pair.client_streams(client_ch).send_streams(), 0);
assert_eq!(pair.server_conn_mut(client_ch).streams().send_streams(), 0);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
assert_eq!(pair.server_conn_mut(client_ch).streams().send_streams(), 0);
assert_matches!(pair.server_streams(server_ch).accept(Dir::Uni), Some(stream) if stream == s);
assert_matches!(pair.server_conn_mut(server_ch).poll(), None);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.offset == 0 && chunk.bytes == MSG
);
assert_matches!(chunks.next(usize::MAX), Ok(None));
let _ = chunks.finalize();
}
#[test]
fn reset_stream() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"hello";
pair.client_send(client_ch, s).write(MSG).unwrap();
pair.drive();
info!("resetting stream");
const ERROR: VarInt = VarInt(42);
pair.client_send(client_ch, s).reset(ERROR).unwrap();
pair.drive();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
assert_matches!(pair.server_streams(server_ch).accept(Dir::Uni), Some(stream) if stream == s);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(chunks.next(usize::MAX), Err(ReadError::Reset(ERROR)));
let _ = chunks.finalize();
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
}
#[test]
fn stop_stream() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"hello";
pair.client_send(client_ch, s).write(MSG).unwrap();
pair.drive();
info!("stopping stream");
const ERROR: VarInt = VarInt(42);
pair.server_recv(server_ch, s).stop(ERROR).unwrap();
pair.drive();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
assert_matches!(pair.server_streams(server_ch).accept(Dir::Uni), Some(stream) if stream == s);
assert_matches!(
pair.client_send(client_ch, s).write(b"foo"),
Err(WriteError::Stopped(ERROR))
);
assert_matches!(
pair.client_send(client_ch, s).finish(),
Err(FinishError::Stopped(ERROR))
);
}
#[test]
fn reject_self_signed_server_cert() {
let _guard = subscribe();
let mut pair = Pair::default();
info!("connecting");
let mut cert = rcgen::CertificateParams::new(["localhost".into()]).unwrap();
let mut issuer = rcgen::DistinguishedName::new();
issuer.push(
rcgen::DnType::OrganizationName,
"Crazy Quinn's House of Certificates",
);
cert.distinguished_name = issuer;
let cert = cert
.self_signed(&rcgen::KeyPair::generate().unwrap())
.unwrap();
let client_ch = pair.begin_connect(client_config_with_certs(vec![cert.into()]));
pair.drive();
assert_matches!(pair.client_conn_mut(client_ch).poll(),
Some(Event::ConnectionLost { reason: ConnectionError::TransportError(ref error)})
if error.code == TransportErrorCode::crypto(AlertDescription::UnknownCA.into()));
}
#[test]
fn reject_missing_client_cert() {
let _guard = subscribe();
let mut store = RootCertStore::empty();
store.add(CERTIFIED_KEY.cert.der().clone()).unwrap();
let key = PrivatePkcs8KeyDer::from(CERTIFIED_KEY.signing_key.serialize_der());
let cert = CERTIFIED_KEY.cert.der().clone();
let provider = configured_provider();
let config = rustls::ServerConfig::builder_with_provider(provider.clone())
.with_protocol_versions(&[&rustls::version::TLS13])
.unwrap()
.with_client_cert_verifier(
WebPkiClientVerifier::builder_with_provider(Arc::new(store), provider)
.build()
.unwrap(),
)
.with_single_cert(vec![cert], PrivateKeyDer::from(key))
.unwrap();
let config = QuicServerConfig::try_from(config).unwrap();
let mut pair = Pair::new(
Default::default(),
ServerConfig::with_crypto(Arc::new(config)),
);
info!("connecting");
let client_ch = pair.begin_connect(client_config());
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Connected)
);
assert_matches!(pair.client_conn_mut(client_ch).poll(),
Some(Event::ConnectionLost { reason: ConnectionError::ConnectionClosed(ref close)})
if close.error_code == TransportErrorCode::crypto(AlertDescription::CertificateRequired.into()));
let server_ch = pair.server.assert_accept();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(pair.server_conn_mut(server_ch).poll(),
Some(Event::ConnectionLost { reason: ConnectionError::TransportError(ref error)})
if error.code == TransportErrorCode::crypto(AlertDescription::CertificateRequired.into()));
}
#[test]
fn congestion() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, _) = pair.connect();
const TARGET: u64 = 2048;
assert!(pair.client_conn_mut(client_ch).congestion_window() > TARGET);
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
while pair.client_conn_mut(client_ch).congestion_window() > TARGET {
let n = pair.client_send(client_ch, s).write(&[42; 1024]).unwrap();
assert_eq!(n, 1024);
pair.drive_client();
}
pair.drive();
assert!(pair.client_conn_mut(client_ch).congestion_window() >= TARGET);
pair.client_send(client_ch, s).write(&[42; 1024]).unwrap();
}
#[test]
fn high_latency_handshake() {
let _guard = subscribe();
let mut pair = Pair::default();
pair.latency = Duration::from_micros(200 * 1000);
let (client_ch, server_ch) = pair.connect();
assert_eq!(pair.client_conn_mut(client_ch).bytes_in_flight(), 0);
assert_eq!(pair.server_conn_mut(server_ch).bytes_in_flight(), 0);
assert!(pair.client_conn_mut(client_ch).using_ecn());
assert!(pair.server_conn_mut(server_ch).using_ecn());
}
#[test]
fn zero_rtt_happypath() {
let _guard = subscribe();
let mut pair = Pair::default();
pair.server.handle_incoming = Box::new(validate_incoming);
let config = client_config();
let client_ch = pair.begin_connect(config.clone());
pair.drive();
pair.server.assert_accept();
pair.client
.connections
.get_mut(&client_ch)
.unwrap()
.close(pair.time, VarInt(0), [][..].into());
pair.drive();
pair.client.addr = SocketAddr::new(
Ipv6Addr::LOCALHOST.into(),
CLIENT_PORTS.lock().unwrap().next().unwrap(),
);
info!("resuming session");
let client_ch = pair.begin_connect(config);
assert!(pair.client_conn_mut(client_ch).has_0rtt());
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"Hello, 0-RTT!";
pair.client_send(client_ch, s).write(MSG).unwrap();
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Connected)
);
assert!(pair.client_conn_mut(client_ch).accepted_0rtt());
let server_ch = pair.server.assert_accept();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::HandshakeConfirmed)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Connected)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.offset == 0 && chunk.bytes == MSG
);
let _ = chunks.finalize();
assert_eq!(
pair.client_conn_mut(client_ch)
.path_stats(PathId::ZERO)
.unwrap()
.lost_packets,
0
);
}
#[test]
fn zero_rtt_rejection() {
let _guard = subscribe();
let server_config = ServerConfig::with_crypto(Arc::new(server_crypto_with_alpn(vec![
"foo".into(),
"bar".into(),
])));
let mut pair = Pair::new(Arc::new(EndpointConfig::default()), server_config);
let mut client_crypto = Arc::new(client_crypto_with_alpn(vec!["foo".into()]));
let client_config = ClientConfig::new(client_crypto.clone());
let client_ch = pair.begin_connect(client_config);
pair.drive();
let server_ch = pair.server.assert_accept();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::HandshakeConfirmed)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Connected)
);
assert_matches!(pair.server_conn_mut(server_ch).poll(), None);
pair.client
.connections
.get_mut(&client_ch)
.unwrap()
.close(pair.time, VarInt(0), [][..].into());
pair.drive();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::ConnectionLost { .. })
);
assert_matches!(pair.server_conn_mut(server_ch).poll(), None);
pair.client.connections.clear();
pair.server.connections.clear();
let this = Arc::get_mut(&mut client_crypto).expect("QuicClientConfig is shared");
let inner = Arc::get_mut(&mut this.inner).expect("QuicClientConfig.inner is shared");
inner.alpn_protocols = vec!["bar".into()];
let client_config = ClientConfig::new(client_crypto);
info!("resuming session");
let client_ch = pair.begin_connect(client_config);
assert!(pair.client_conn_mut(client_ch).has_0rtt());
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"Hello, 0-RTT!";
pair.client_send(client_ch, s).write(MSG).unwrap();
pair.drive();
assert!(!pair.client_conn_mut(client_ch).accepted_0rtt());
let server_ch = pair.server.assert_accept();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::HandshakeConfirmed)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Connected)
);
assert_matches!(pair.server_conn_mut(server_ch).poll(), None);
let s2 = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
assert_eq!(s, s2);
let mut recv = pair.server_recv(server_ch, s2);
let mut chunks = recv.read(false).unwrap();
assert_eq!(chunks.next(usize::MAX), Err(ReadError::Blocked));
let _ = chunks.finalize();
assert_eq!(
pair.client_conn_mut(client_ch)
.path_stats(PathId::ZERO)
.unwrap()
.lost_packets,
0
);
}
fn test_zero_rtt_incoming_limit<F: FnOnce(&mut ServerConfig)>(configure_server: F) {
const CLIENT_WRITES: usize = 8000;
const EXPECTED_DROPPED: u64 = 4;
let _guard = subscribe();
let mut transport = TransportConfig::default();
transport.initial_rtt(Duration::from_millis(10));
let transport = Arc::new(transport);
let mut server_config = server_config();
configure_server(&mut server_config);
let mut pair = Pair::new(Arc::new(EndpointConfig::default()), server_config);
let mut config = client_config();
config.transport_config(transport);
let client_ch = pair.begin_connect(config.clone());
pair.drive();
pair.server.assert_accept();
pair.client
.connections
.get_mut(&client_ch)
.unwrap()
.close(pair.time, VarInt(0), [][..].into());
pair.drive();
pair.client.addr = SocketAddr::new(
Ipv6Addr::LOCALHOST.into(),
CLIENT_PORTS.lock().unwrap().next().unwrap(),
);
info!("resuming session");
pair.server.handle_incoming = Box::new(|_| IncomingConnectionBehavior::Wait);
let client_ch = pair.begin_connect(config);
assert!(pair.client_conn_mut(client_ch).has_0rtt());
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
pair.client_send(client_ch, s)
.write(&vec![0; CLIENT_WRITES])
.unwrap();
pair.drive();
info!("accepting connection");
let incoming = pair.server.waiting_incoming.pop().unwrap();
assert!(pair.server.waiting_incoming.is_empty());
let _ = pair.server.try_accept(incoming, pair.time);
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Connected)
);
assert!(pair.client_conn_mut(client_ch).accepted_0rtt());
let server_ch = pair.server.assert_accept();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::HandshakeConfirmed)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Connected)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
let mut offset = 0;
loop {
match chunks.next(usize::MAX) {
Ok(Some(chunk)) => {
assert_eq!(chunk.offset as usize, offset);
offset += chunk.bytes.len();
}
Err(ReadError::Blocked) => break,
Ok(None) => panic!("unexpected stream end"),
Err(e) => panic!("{}", e),
}
}
assert_eq!(offset, CLIENT_WRITES);
let _ = chunks.finalize();
assert_eq!(
pair.client_conn_mut(client_ch)
.path_stats(PathId::ZERO)
.unwrap()
.lost_packets,
EXPECTED_DROPPED
);
}
#[test]
fn zero_rtt_incoming_buffer_size() {
test_zero_rtt_incoming_limit(|config| {
config.incoming_buffer_size(4000);
});
}
#[test]
fn zero_rtt_incoming_buffer_size_total() {
test_zero_rtt_incoming_limit(|config| {
config.incoming_buffer_size_total(4000);
});
}
#[test]
fn alpn_success() {
let _guard = subscribe();
let server_config = ServerConfig::with_crypto(Arc::new(server_crypto_with_alpn(vec![
"foo".into(),
"bar".into(),
"baz".into(),
])));
let mut pair = Pair::new(Arc::new(EndpointConfig::default()), server_config);
let client_config = ClientConfig::new(Arc::new(client_crypto_with_alpn(vec![
"bar".into(),
"quux".into(),
"corge".into(),
])));
let client_ch = pair.begin_connect(client_config);
pair.drive();
let server_ch = pair.server.assert_accept();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::HandshakeConfirmed)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Connected)
);
let hd = pair
.client_conn_mut(client_ch)
.crypto_session()
.handshake_data()
.unwrap()
.downcast::<crate::crypto::rustls::HandshakeData>()
.unwrap();
assert_eq!(hd.protocol.unwrap(), &b"bar"[..]);
}
#[test]
fn incoming_alpns() {
let _guard = subscribe();
let server_config = ServerConfig::with_crypto(Arc::new(server_crypto_with_alpn(vec![
"foo".into(),
"bar".into(),
])));
let mut pair = Pair::new(Arc::new(EndpointConfig::default()), server_config);
let client_alpns: Vec<Vec<u8>> = vec!["bar".into(), "quux".into()];
let expected = client_alpns.clone();
pair.server.handle_incoming = Box::new(move |incoming| {
let alpns: Vec<Vec<u8>> = incoming
.decrypt()
.expect("decrypt should succeed")
.alpns()
.expect("alpns should be parseable")
.map(|a| a.unwrap().to_vec())
.collect();
assert_eq!(alpns, expected);
IncomingConnectionBehavior::Accept
});
let client_config = ClientConfig::new(Arc::new(client_crypto_with_alpn(client_alpns)));
pair.begin_connect(client_config);
pair.drive();
pair.server.assert_accept();
}
#[test]
fn server_alpn_unset() {
let _guard = subscribe();
let mut pair = Pair::new(Arc::new(EndpointConfig::default()), server_config());
let client_config = ClientConfig::new(Arc::new(client_crypto_with_alpn(vec!["foo".into()])));
let client_ch = pair.begin_connect(client_config);
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ConnectionLost { reason: ConnectionError::ConnectionClosed(err) }) if err.error_code == TransportErrorCode::crypto(0x78)
);
}
#[test]
fn client_alpn_unset() {
let _guard = subscribe();
let server_config = ServerConfig::with_crypto(Arc::new(server_crypto_with_alpn(vec![
"foo".into(),
"bar".into(),
"baz".into(),
])));
let mut pair = Pair::new(Arc::new(EndpointConfig::default()), server_config);
let client_ch = pair.begin_connect(client_config());
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ConnectionLost { reason: ConnectionError::ConnectionClosed(err) }) if err.error_code == TransportErrorCode::crypto(0x78)
);
}
#[test]
fn alpn_mismatch() {
let _guard = subscribe();
let server_config = ServerConfig::with_crypto(Arc::new(server_crypto_with_alpn(vec![
"foo".into(),
"bar".into(),
"baz".into(),
])));
let mut pair = Pair::new(Arc::new(EndpointConfig::default()), server_config);
let client_ch = pair.begin_connect(ClientConfig::new(Arc::new(client_crypto_with_alpn(vec![
"quux".into(),
"corge".into(),
]))));
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ConnectionLost { reason: ConnectionError::ConnectionClosed(err) }) if err.error_code == TransportErrorCode::crypto(0x78)
);
}
#[test]
fn stream_id_limit() {
let _guard = subscribe();
let server = ServerConfig {
transport: Arc::new(TransportConfig {
max_concurrent_uni_streams: 1u32.into(),
..TransportConfig::default()
}),
..server_config()
};
let mut pair = Pair::new(Default::default(), server);
let (client_ch, server_ch) = pair.connect();
let s = pair
.client
.connections
.get_mut(&client_ch)
.unwrap()
.streams()
.open(Dir::Uni)
.expect("couldn't open first stream");
assert_eq!(
pair.client_streams(client_ch).open(Dir::Uni),
None,
"only one stream is permitted at a time"
);
const MSG: &[u8] = b"hello";
pair.client_send(client_ch, s).write(MSG).unwrap();
pair.client_send(client_ch, s).finish().unwrap();
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Stream(StreamEvent::Finished { id })) if id == s
);
assert_eq!(
pair.client_streams(client_ch).open(Dir::Uni),
None,
"server does not immediately grant additional credit"
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
assert_matches!(pair.server_streams(server_ch).accept(Dir::Uni), Some(stream) if stream == s);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.offset == 0 && chunk.bytes == MSG
);
assert_eq!(chunks.next(usize::MAX), Ok(None));
let _ = chunks.finalize();
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Stream(StreamEvent::Available { dir: Dir::Uni }))
);
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
let s = pair
.client
.connections
.get_mut(&client_ch)
.unwrap()
.streams()
.open(Dir::Uni)
.expect("didn't get stream id budget");
pair.client_send(client_ch, s).finish().unwrap();
pair.drive();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
assert_matches!(pair.server_streams(server_ch).accept(Dir::Uni), Some(stream) if stream == s);
assert_matches!(pair.server_conn_mut(server_ch).poll(), None);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(chunks.next(usize::MAX), Ok(None));
let _ = chunks.finalize();
}
#[test]
fn key_update_simple() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
let s = pair
.client
.connections
.get_mut(&client_ch)
.unwrap()
.streams()
.open(Dir::Bi)
.expect("couldn't open first stream");
const MSG1: &[u8] = b"hello1";
pair.client_send(client_ch, s).write(MSG1).unwrap();
pair.drive();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Bi }))
);
assert_matches!(pair.server_streams(server_ch).accept(Dir::Bi), Some(stream) if stream == s);
assert_matches!(pair.server_conn_mut(server_ch).poll(), None);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.offset == 0 && chunk.bytes == MSG1
);
let _ = chunks.finalize();
info!("initiating key update");
pair.client_conn_mut(client_ch).force_key_update();
const MSG2: &[u8] = b"hello2";
pair.client_send(client_ch, s).write(MSG2).unwrap();
pair.drive();
assert_matches!(pair.server_conn_mut(server_ch).poll(), Some(Event::Stream(StreamEvent::Readable { id })) if id == s);
assert_matches!(pair.server_conn_mut(server_ch).poll(), None);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.offset == 6 && chunk.bytes == MSG2
);
let _ = chunks.finalize();
assert_eq!(
pair.client_conn_mut(client_ch)
.path_stats(PathId::ZERO)
.unwrap()
.lost_packets,
0
);
assert_eq!(
pair.server_conn_mut(server_ch)
.path_stats(PathId::ZERO)
.unwrap()
.lost_packets,
0
);
}
#[test]
fn key_update_reordered() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
let s = pair
.client
.connections
.get_mut(&client_ch)
.unwrap()
.streams()
.open(Dir::Bi)
.expect("couldn't open first stream");
const MSG1: &[u8] = b"1";
pair.client_send(client_ch, s).write(MSG1).unwrap();
pair.client.drive(pair.time);
assert!(!pair.client.outbound.is_empty());
pair.client.delay_outbound();
pair.client_conn_mut(client_ch).force_key_update();
info!("updated keys");
const MSG2: &[u8] = b"two";
pair.client_send(client_ch, s).write(MSG2).unwrap();
pair.client.drive(pair.time);
pair.client.finish_delay();
pair.drive();
assert_eq!(
pair.client_conn_mut(client_ch)
.path_stats(PathId::ZERO)
.unwrap()
.lost_packets,
0
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Bi }))
);
assert_matches!(pair.server_streams(server_ch).accept(Dir::Bi), Some(stream) if stream == s);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(true).unwrap();
let buf1 = chunks.next(usize::MAX).unwrap().unwrap();
assert_matches!(&*buf1.bytes, MSG1);
let buf2 = chunks.next(usize::MAX).unwrap().unwrap();
assert_eq!(buf2.bytes, MSG2);
let _ = chunks.finalize();
assert_eq!(
pair.client_conn_mut(client_ch)
.path_stats(PathId::ZERO)
.unwrap()
.lost_packets,
0
);
assert_eq!(
pair.server_conn_mut(server_ch)
.path_stats(PathId::ZERO)
.unwrap()
.lost_packets,
0
);
}
#[test]
fn initial_retransmit() {
let _guard = subscribe();
let mut pair = Pair::default();
let client_ch = pair.begin_connect(client_config());
pair.client.drive(pair.time);
info!("clearing client outbound");
pair.client.outbound.clear(); pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Connected)
);
}
#[test]
fn instant_close_1() {
let _guard = subscribe();
let mut pair = Pair::default();
info!("connecting");
let client_ch = pair.begin_connect(client_config());
pair.client
.connections
.get_mut(&client_ch)
.unwrap()
.close(pair.time, VarInt(0), Bytes::new());
pair.drive();
let server_ch = pair.server.assert_accept();
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::ConnectionLost {
reason: ConnectionError::ConnectionClosed(ConnectionClose {
error_code: TransportErrorCode::APPLICATION_ERROR,
..
}),
})
);
}
#[test]
fn instant_close_2() {
let _guard = subscribe();
let mut pair = Pair::default();
info!("connecting");
let client_ch = pair.begin_connect(client_config());
pair.drive_client();
pair.client
.connections
.get_mut(&client_ch)
.unwrap()
.close(pair.time, VarInt(42), Bytes::new());
pair.drive();
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
let server_ch = pair.server.assert_accept();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::ConnectionLost {
reason: ConnectionError::ConnectionClosed(ConnectionClose {
error_code: TransportErrorCode::APPLICATION_ERROR,
..
}),
})
);
}
#[test]
fn instant_server_close() {
let _guard = subscribe();
let mut pair = Pair::default();
info!("connecting");
pair.begin_connect(client_config());
pair.drive_client();
pair.server.drive_incoming(pair.time);
let server_ch = pair.server.assert_accept();
info!("closing");
pair.server
.connections
.get_mut(&server_ch)
.unwrap()
.close(pair.time, VarInt(42), Bytes::new());
pair.drive();
assert_matches!(
pair.client_conn_mut(server_ch).poll(),
Some(Event::ConnectionLost {
reason: ConnectionError::ConnectionClosed(ConnectionClose {
error_code: TransportErrorCode::APPLICATION_ERROR,
..
}),
})
);
}
#[test]
fn idle_timeout() {
let _guard = subscribe();
const IDLE_TIMEOUT: u64 = 100;
let server = ServerConfig {
transport: Arc::new(TransportConfig {
max_idle_timeout: Some(VarInt(IDLE_TIMEOUT)),
..TransportConfig::default()
}),
..server_config()
};
let mut pair = Pair::new(Default::default(), server);
let (client_ch, server_ch) = pair.connect();
pair.client_conn_mut(client_ch).ping();
let start = pair.time;
while !pair.client_conn_mut(client_ch).is_closed()
|| !pair.server_conn_mut(server_ch).is_closed()
{
if !pair.step()
&& let Some(t) = min_opt(pair.client.next_wakeup(), pair.server.next_wakeup())
{
pair.time = t;
}
pair.client.inbound.clear(); }
assert!(pair.time - start < Duration::from_millis(2 * IDLE_TIMEOUT));
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ConnectionLost {
reason: ConnectionError::TimedOut,
})
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::ConnectionLost {
reason: ConnectionError::TimedOut,
})
);
}
#[test]
fn connection_close_sends_acks() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, _server_ch) = pair.connect();
let client_acks = pair.client_conn_mut(client_ch).stats().frame_rx.acks;
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
let time = pair.time;
pair.server_conn_mut(client_ch)
.close(time, VarInt(42), Bytes::new());
pair.drive();
let client_acks_2 = pair.client_conn_mut(client_ch).stats().frame_rx.acks;
assert!(
client_acks_2 > client_acks,
"Connection close should send pending ACKs"
);
}
#[test]
fn close_from_migrated_address() {
let _guard = subscribe();
let mut pair = ConnPair::default();
pair.drive();
pair.client.addr = SocketAddr::new(
Ipv4Addr::new(127, 0, 0, 1).into(),
CLIENT_PORTS.lock().unwrap().next().unwrap(),
);
pair.close(Client, 0, b"bye");
pair.drive();
let server_stats = pair.stats(Server);
assert_eq!(server_stats.frame_tx.connection_close, 1);
assert_matches!(
pair.poll(Server),
Some(Event::ConnectionLost {
reason: ConnectionError::ApplicationClosed(_)
})
);
let path = pair.conn(Server).network_path(PathId::ZERO).unwrap();
assert_eq!(path.remote(), pair.client.addr);
}
#[test]
fn server_hs_retransmit() {
let _guard = subscribe();
let mut pair = Pair::default();
let client_ch = pair.begin_connect(client_config());
pair.step();
assert!(!pair.client.inbound.is_empty()); pair.client.inbound.clear();
info!("client inbound queue cleared");
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Connected)
);
}
#[test]
fn migration() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
pair.drive();
let client_stats_after_connect = pair.client_conn_mut(client_ch).stats();
pair.client.addr = SocketAddr::new(
Ipv4Addr::new(127, 0, 0, 1).into(),
CLIENT_PORTS.lock().unwrap().next().unwrap(),
);
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
pair.drive_server();
assert_ne!(pair.server_conn_mut(server_ch).total_recvd(), 0);
pair.drive();
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
assert_eq!(
pair.server_conn_mut(server_ch)
.network_path(PathId::ZERO)
.map(|addrs| addrs.remote),
Ok(pair.client.addr)
);
let client_stats_after_migrate = pair.client_conn_mut(client_ch).stats();
assert_eq!(
client_stats_after_migrate.frame_tx.ping - client_stats_after_connect.frame_tx.ping,
1
);
assert_eq!(
client_stats_after_migrate.frame_tx.immediate_ack
- client_stats_after_connect.frame_tx.immediate_ack,
1
);
}
#[test]
fn path_challenge_retransmit() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
pair.drive();
pair.client_conn_mut(client_ch).ping();
pair.drive();
println!("-------- server wants path validation --------");
pair.server_conn_mut(server_ch).trigger_path_validation();
pair.drive_server(); println!("-------- client loses messages --------");
pair.client.inbound.clear();
pair.drive();
let client_tx = pair.client_conn_mut(client_ch).stats().frame_tx;
let server_tx = pair.server_conn_mut(server_ch).stats().frame_tx;
assert_eq!(
server_tx.path_challenge, 2,
"expected server to send two path challenges"
);
assert_eq!(
client_tx.path_response, 1,
"expected client to send one path response"
);
}
#[test]
fn path_response_retransmit() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
pair.drive();
pair.client_conn_mut(client_ch).ping();
pair.drive();
println!("-------- server wants path validation --------");
pair.server_conn_mut(server_ch).trigger_path_validation();
pair.drive_server(); pair.drive_client(); println!("-------- server loses messages --------");
pair.server.inbound.clear();
pair.drive();
let client_tx = pair.client_conn_mut(client_ch).stats().frame_tx;
let server_tx = pair.server_conn_mut(server_ch).stats().frame_tx;
assert_eq!(
server_tx.path_challenge, 2,
"expected server to send two path challenges"
);
assert_eq!(
client_tx.path_response, 2,
"expected client to send two path responses"
);
}
#[test]
fn regression_path_validation_stale_local_after_passive_migration() {
let _guard = subscribe();
let mut pair = ConnPair::default();
pair.drive();
pair.conn_mut(Client).trigger_path_validation();
pair.drive_client(); pair.drive_server();
pair.client.inbound.clear();
pair.passive_migration(Client);
pair.conn_mut(Client).ping();
pair.drive_client(); pair.drive_server();
assert!(
!pair.drive_bounded(1000),
"connection never became idle; path validation was stuck in a loop"
);
}
fn test_flow_control(config: TransportConfig, window_size: usize) {
let _guard = subscribe();
let mut pair = Pair::new(
Default::default(),
ServerConfig {
transport: Arc::new(config),
..server_config()
},
);
let (client_ch, server_ch) = pair.connect();
let msg = vec![0xAB; window_size + 10];
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
info!("writing");
assert_eq!(pair.client_send(client_ch, s).write(&msg), Ok(window_size));
assert_eq!(
pair.client_send(client_ch, s).write(&msg[window_size..]),
Err(WriteError::Blocked)
);
pair.drive();
info!("resetting");
pair.client_send(client_ch, s).reset(VarInt(42)).unwrap();
pair.drive();
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(true).unwrap();
assert_eq!(
chunks.next(usize::MAX).err(),
Some(ReadError::Reset(VarInt(42)))
);
let _ = chunks.finalize();
info!("writing");
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
assert_eq!(pair.client_send(client_ch, s).write(&msg), Ok(window_size));
assert_eq!(
pair.client_send(client_ch, s).write(&msg[window_size..]),
Err(WriteError::Blocked)
);
pair.drive();
let mut cursor = 0;
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(true).unwrap();
loop {
match chunks.next(usize::MAX) {
Ok(Some(chunk)) => {
cursor += chunk.bytes.len();
}
Ok(None) => {
panic!("end of stream");
}
Err(ReadError::Blocked) => {
break;
}
Err(e) => {
panic!("{}", e);
}
}
}
let _ = chunks.finalize();
info!("finished reading");
assert_eq!(cursor, window_size);
pair.drive();
info!("writing");
assert_eq!(pair.client_send(client_ch, s).write(&msg), Ok(window_size));
assert_eq!(
pair.client_send(client_ch, s).write(&msg[window_size..]),
Err(WriteError::Blocked)
);
pair.drive();
let mut cursor = 0;
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(true).unwrap();
loop {
match chunks.next(usize::MAX) {
Ok(Some(chunk)) => {
cursor += chunk.bytes.len();
}
Ok(None) => {
panic!("end of stream");
}
Err(ReadError::Blocked) => {
break;
}
Err(e) => {
panic!("{}", e);
}
}
}
assert_eq!(cursor, window_size);
let _ = chunks.finalize();
info!("finished reading");
}
#[test]
fn stream_flow_control() {
test_flow_control(
TransportConfig {
stream_receive_window: 2000u32.into(),
..TransportConfig::default()
},
2000,
);
}
#[test]
fn conn_flow_control() {
test_flow_control(
TransportConfig {
receive_window: 2000u32.into(),
..TransportConfig::default()
},
2000,
);
}
#[test]
fn stop_opens_bidi() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
assert_eq!(pair.client_streams(client_ch).send_streams(), 0);
let s = pair.client_streams(client_ch).open(Dir::Bi).unwrap();
assert_eq!(pair.client_streams(client_ch).send_streams(), 1);
const ERROR: VarInt = VarInt(42);
pair.client
.connections
.get_mut(&server_ch)
.unwrap()
.recv_stream(s)
.stop(ERROR)
.unwrap();
pair.drive();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Bi }))
);
assert_eq!(pair.server_conn_mut(client_ch).streams().send_streams(), 0);
assert_matches!(pair.server_streams(server_ch).accept(Dir::Bi), Some(stream) if stream == s);
assert_eq!(pair.server_conn_mut(client_ch).streams().send_streams(), 1);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(chunks.next(usize::MAX), Err(ReadError::Blocked));
let _ = chunks.finalize();
assert_matches!(
pair.server_send(server_ch, s).write(b"foo"),
Err(WriteError::Stopped(ERROR))
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Stream(StreamEvent::Stopped {
id: _,
error_code: ERROR
}))
);
assert_matches!(pair.server_conn_mut(server_ch).poll(), None);
}
#[test]
fn implicit_open() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
let s1 = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
let s2 = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
pair.client_send(client_ch, s2).write(b"hello").unwrap();
pair.drive();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
assert_eq!(pair.server_streams(server_ch).accept(Dir::Uni), Some(s1));
assert_eq!(pair.server_streams(server_ch).accept(Dir::Uni), Some(s2));
assert_eq!(pair.server_streams(server_ch).accept(Dir::Uni), None);
}
#[test]
fn zero_length_cid() {
let _guard = subscribe();
let cid_generator_factory: fn() -> Box<dyn ConnectionIdGenerator> =
|| Box::new(RandomConnectionIdGenerator::new(0));
let mut pair = Pair::new(
Arc::new(EndpointConfig {
connection_id_generator_factory: Arc::new(cid_generator_factory),
..EndpointConfig::default()
}),
server_config(),
);
let (client_ch, server_ch) = pair.connect();
info!("closing");
pair.client
.connections
.get_mut(&client_ch)
.unwrap()
.close(pair.time, VarInt(42), Bytes::new());
pair.drive();
pair.server
.connections
.get_mut(&server_ch)
.unwrap()
.close(pair.time, VarInt(42), Bytes::new());
pair.connect();
}
#[test]
fn keep_alive() {
let _guard = subscribe();
const IDLE_TIMEOUT: u64 = 10;
let server = ServerConfig {
transport: Arc::new(TransportConfig {
keep_alive_interval: Some(Duration::from_millis(IDLE_TIMEOUT / 2)),
max_idle_timeout: Some(VarInt(IDLE_TIMEOUT)),
..TransportConfig::default()
}),
..server_config()
};
let mut pair = Pair::new(Default::default(), server);
let (client_ch, server_ch) = pair.connect();
let end = pair.time + Duration::from_millis(20 * IDLE_TIMEOUT);
while pair.time < end {
if !pair.step()
&& let Some(time) = min_opt(pair.client.next_wakeup(), pair.server.next_wakeup())
{
pair.time = time;
}
assert!(!pair.client_conn_mut(client_ch).is_closed());
assert!(!pair.server_conn_mut(server_ch).is_closed());
}
}
#[test]
fn cid_rotation() {
let _guard = subscribe();
const CID_TIMEOUT: Duration = Duration::from_secs(2);
let cid_generator_factory: fn() -> Box<dyn ConnectionIdGenerator> =
|| Box::new(*RandomConnectionIdGenerator::new(8).set_lifetime(CID_TIMEOUT));
let server = Endpoint::new(
Arc::new(EndpointConfig {
connection_id_generator_factory: Arc::new(cid_generator_factory),
..EndpointConfig::default()
}),
Some(Arc::new(server_config())),
true,
);
let client = Endpoint::new(Arc::new(EndpointConfig::default()), None, true);
let mut pair = Pair::new_from_endpoint(client, server);
let (_, server_ch) = pair.connect();
let mut round: u64 = 1;
let mut stop = pair.time;
let end = pair.time + 5 * CID_TIMEOUT;
use crate::{LOCAL_CID_COUNT, cid_queue::CidQueue};
let mut active_cid_num = CidQueue::LEN as u64 + 1;
active_cid_num = active_cid_num.min(LOCAL_CID_COUNT);
let mut left_bound = 0;
let mut right_bound = active_cid_num - 1;
while pair.time < end {
stop += CID_TIMEOUT;
while pair.time < stop {
if !pair.step()
&& let Some(time) = min_opt(pair.client.next_wakeup(), pair.server.next_wakeup())
{
pair.time = time;
}
}
info!(
"Checking active cid sequence range before {:?} seconds",
round * CID_TIMEOUT.as_secs()
);
let _bound = (left_bound, right_bound);
assert_matches!(
pair.server_conn_mut(server_ch).active_local_cid_seq(),
_bound
);
round += 1;
left_bound += active_cid_num;
right_bound += active_cid_num;
pair.drive_server();
}
}
#[test]
fn cid_retirement() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
info!("retire one");
let now = pair.time;
pair.server_conn_mut(server_ch).rotate_local_cid(1, now);
pair.drive();
assert!(!pair.client_conn_mut(client_ch).is_closed());
assert!(!pair.server_conn_mut(server_ch).is_closed());
assert_matches!(pair.client_conn_mut(client_ch).active_remote_cid_seq(), 1);
use crate::{LOCAL_CID_COUNT, cid_queue::CidQueue};
let mut active_cid_num = CidQueue::LEN as u64;
active_cid_num = active_cid_num.min(LOCAL_CID_COUNT);
info!("retire CidQueue::LEN");
let now = pair.time;
let next_retire_prior_to = active_cid_num + 1;
pair.client_conn_mut(client_ch).ping();
pair.server_conn_mut(server_ch)
.rotate_local_cid(next_retire_prior_to, now);
pair.drive();
assert!(!pair.client_conn_mut(client_ch).is_closed());
assert!(!pair.server_conn_mut(server_ch).is_closed());
assert_eq!(
pair.client_conn_mut(client_ch).active_remote_cid_seq(),
next_retire_prior_to,
);
}
#[test]
fn finish_stream_flow_control_reordered() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"hello";
pair.client_send(client_ch, s).write(MSG).unwrap();
pair.drive_client(); pair.server.drive(pair.time);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.offset == 0 && chunk.bytes == MSG
);
let _ = chunks.finalize();
pair.server.drive(pair.time);
pair.server.delay_outbound();
pair.client_send(client_ch, s).finish().unwrap();
pair.drive_client(); pair.server.drive(pair.time); pair.server.finish_delay(); pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Stream(StreamEvent::Finished { id })) if id == s
);
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
assert_matches!(pair.server_streams(server_ch).accept(Dir::Uni), Some(stream) if stream == s);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(chunks.next(usize::MAX), Ok(None));
let _ = chunks.finalize();
}
#[test]
fn handshake_1rtt_handling() {
let _guard = subscribe();
let mut pair = Pair::default();
let client_ch = pair.begin_connect(client_config());
pair.drive_client();
pair.drive_server();
let server_ch = pair.server.assert_accept();
pair.client.drive(pair.time);
pair.client.delay_outbound();
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"hello";
pair.client_send(client_ch, s).write(MSG).unwrap();
pair.client_send(client_ch, s).finish().unwrap();
pair.client.drive(pair.time);
pair.client.finish_delay();
pair.drive();
assert!(
pair.client_conn_mut(client_ch)
.path_stats(PathId::ZERO)
.unwrap()
.lost_packets
!= 0
);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.offset == 0 && chunk.bytes == MSG
);
let _ = chunks.finalize();
}
#[test]
fn stop_before_finish() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"hello";
pair.client_send(client_ch, s).write(MSG).unwrap();
pair.drive();
info!("stopping stream");
const ERROR: VarInt = VarInt(42);
pair.server_recv(server_ch, s).stop(ERROR).unwrap();
pair.drive();
assert_matches!(
pair.client_send(client_ch, s).finish(),
Err(FinishError::Stopped(ERROR))
);
}
#[test]
fn stop_during_finish() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"hello";
pair.client_send(client_ch, s).write(MSG).unwrap();
pair.drive();
assert_matches!(pair.server_streams(server_ch).accept(Dir::Uni), Some(stream) if stream == s);
info!("stopping and finishing stream");
const ERROR: VarInt = VarInt(42);
pair.server_recv(server_ch, s).stop(ERROR).unwrap();
pair.drive_server();
pair.client_send(client_ch, s).finish().unwrap();
pair.drive_client();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Stream(StreamEvent::Stopped { id, error_code: ERROR })) if id == s
);
}
#[test]
fn congested_tail_loss() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, _) = pair.connect();
const TARGET: u64 = 2048;
assert!(pair.client_conn_mut(client_ch).congestion_window() > TARGET);
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
while pair.client_conn_mut(client_ch).congestion_window() > TARGET {
let n = pair.client_send(client_ch, s).write(&[42; 1024]).unwrap();
assert_eq!(n, 1024);
pair.drive_client();
}
assert!(!pair.server.inbound.is_empty());
pair.server.inbound.clear();
info!("recovering");
pair.drive();
assert!(pair.client_conn_mut(client_ch).congestion_window() > TARGET);
pair.client_send(client_ch, s).write(&[42; 1024]).unwrap();
}
#[test]
fn tail_loss_small_segment_size() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
let server_stats = pair.server_conn_mut(server_ch).stats();
assert_eq!(server_stats.frame_rx.datagram, 0);
const DGRAM_LEN: usize = 1000; const DGRAM_NUM: u64 = 5;
info!("Sending an ack-eliciting datagram");
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
assert!(!pair.server.inbound.is_empty());
pair.server.inbound.clear();
info!("stepping forward to PTO");
pair.step();
let server_stats = pair.server_conn_mut(server_ch).stats();
assert_eq!(server_stats.frame_rx.datagram, 0);
info!("Sending datagram batch");
for _ in 0..DGRAM_NUM {
pair.client_datagrams(client_ch)
.send(vec![0; DGRAM_LEN].into(), false)
.unwrap();
}
pair.drive();
let server_stats = pair.server_conn_mut(server_ch).stats();
assert_eq!(server_stats.frame_rx.datagram, DGRAM_NUM);
}
#[test]
fn tail_loss_respect_max_datagrams() {
let _guard = subscribe();
let client_config = {
let mut c_config = client_config();
let mut t_config = TransportConfig::default();
t_config.enable_segmentation_offload(false);
c_config.transport_config(t_config.into());
c_config
};
let mut pair = Pair::default();
let (client_ch, _) = pair.connect_with(client_config);
const DGRAM_LEN: usize = 1000; const DGRAM_NUM: u64 = 5;
info!("Sending an ack-eliciting datagram");
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
assert!(!pair.server.inbound.is_empty());
pair.server.inbound.clear();
info!("stepping forward to PTO");
pair.step();
info!("Sending datagram batch");
for _ in 0..DGRAM_NUM {
pair.client_datagrams(client_ch)
.send(vec![0; DGRAM_LEN].into(), false)
.unwrap();
}
pair.drive();
let client_stats = pair.client_conn_mut(client_ch).stats();
assert_eq!(client_stats.udp_tx.ios, client_stats.udp_tx.datagrams);
}
#[test]
fn datagram_send_recv() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
assert_matches!(pair.server_conn_mut(server_ch).poll(), None);
assert_matches!(pair.client_datagrams(client_ch).max_size(), Some(x) if x > 0);
const DATA: &[u8] = b"whee";
pair.client_datagrams(client_ch)
.send(DATA.into(), true)
.unwrap();
pair.drive();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::DatagramReceived)
);
assert_eq!(pair.server_datagrams(server_ch).recv().unwrap(), DATA);
assert_matches!(pair.server_datagrams(server_ch).recv(), None);
}
#[test]
fn datagram_recv_buffer_overflow() {
let _guard = subscribe();
const WINDOW: usize = 100;
let server = ServerConfig {
transport: Arc::new(TransportConfig {
datagram_receive_buffer_size: Some(WINDOW),
..TransportConfig::default()
}),
..server_config()
};
let mut pair = Pair::new(Default::default(), server);
let (client_ch, server_ch) = pair.connect();
assert_matches!(pair.server_conn_mut(server_ch).poll(), None);
assert_eq!(
pair.client_conn_mut(client_ch).datagrams().max_size(),
Some(WINDOW - Datagram::SIZE_BOUND)
);
const DATA1: &[u8] = &[0xAB; (WINDOW / 3) + 1];
const DATA2: &[u8] = &[0xBC; (WINDOW / 3) + 1];
const DATA3: &[u8] = &[0xCD; (WINDOW / 3) + 1];
pair.client_datagrams(client_ch)
.send(DATA1.into(), true)
.unwrap();
pair.client_datagrams(client_ch)
.send(DATA2.into(), true)
.unwrap();
pair.client_datagrams(client_ch)
.send(DATA3.into(), true)
.unwrap();
pair.drive();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::DatagramReceived)
);
assert_eq!(pair.server_datagrams(server_ch).recv().unwrap(), DATA2);
assert_eq!(pair.server_datagrams(server_ch).recv().unwrap(), DATA3);
assert_matches!(pair.server_datagrams(server_ch).recv(), None);
pair.client_datagrams(client_ch)
.send(DATA1.into(), true)
.unwrap();
pair.drive();
assert_eq!(pair.server_datagrams(server_ch).recv().unwrap(), DATA1);
assert_matches!(pair.server_datagrams(server_ch).recv(), None);
}
#[test]
fn datagram_unsupported() {
let _guard = subscribe();
let server = ServerConfig {
transport: Arc::new(TransportConfig {
datagram_receive_buffer_size: None,
..TransportConfig::default()
}),
..server_config()
};
let mut pair = Pair::new(Default::default(), server);
let (client_ch, server_ch) = pair.connect();
assert_matches!(pair.server_conn_mut(server_ch).poll(), None);
assert_matches!(pair.client_datagrams(client_ch).max_size(), None);
match pair.client_datagrams(client_ch).send(Bytes::new(), true) {
Err(SendDatagramError::UnsupportedByPeer) => {}
Err(e) => panic!("unexpected error: {e}"),
Ok(_) => panic!("unexpected success"),
}
}
#[test]
fn large_initial() {
let _guard = subscribe();
let server_config =
ServerConfig::with_crypto(Arc::new(server_crypto_with_alpn(vec![vec![0, 0, 0, 42]])));
let mut pair = Pair::new(Arc::new(EndpointConfig::default()), server_config);
let client_crypto =
client_crypto_with_alpn((0..1000u32).map(|x| x.to_be_bytes().to_vec()).collect());
let cfg = ClientConfig::new(Arc::new(client_crypto));
let client_ch = pair.begin_connect(cfg);
pair.drive();
let server_ch = pair.server.assert_accept();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Connected)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::HandshakeConfirmed)
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Connected)
);
}
#[test]
fn finish_acked() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"hello";
pair.client_send(client_ch, s).write(MSG).unwrap();
info!("client sends data to server");
pair.drive_client(); info!("server acknowledges data");
pair.drive_server();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
assert_matches!(pair.server_conn_mut(server_ch).poll(), None);
assert_matches!(pair.server_streams(server_ch).accept(Dir::Uni), Some(stream) if stream == s);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.offset == 0 && chunk.bytes == MSG
);
assert_matches!(chunks.next(usize::MAX), Err(ReadError::Blocked));
let _ = chunks.finalize();
pair.client_send(client_ch, s).finish().unwrap();
info!("client receives ACK, sends FIN");
pair.drive_client();
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
info!("server ACKs FIN");
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Stream(StreamEvent::Finished { id })) if id == s
);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(chunks.next(usize::MAX), Ok(None));
let _ = chunks.finalize();
}
#[test]
fn finish_retransmit() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"hello";
pair.client_send(client_ch, s).write(MSG).unwrap();
pair.drive_client(); pair.server.inbound.clear();
pair.client_send(client_ch, s).finish().unwrap();
pair.drive_client();
pair.drive_server();
pair.drive_client();
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Stream(StreamEvent::Finished { id })) if id == s
);
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
assert_matches!(pair.server_streams(server_ch).accept(Dir::Uni), Some(stream) if stream == s);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.offset == 0 && chunk.bytes == MSG
);
assert_matches!(chunks.next(usize::MAX), Ok(None));
let _ = chunks.finalize();
}
#[test]
fn repeated_request_response() {
let _guard = subscribe();
let server = ServerConfig {
transport: Arc::new(TransportConfig {
max_concurrent_bidi_streams: 1u32.into(),
..TransportConfig::default()
}),
..server_config()
};
let mut pair = Pair::new(Default::default(), server);
let (client_ch, server_ch) = pair.connect();
const REQUEST: &[u8] = b"hello";
const RESPONSE: &[u8] = b"world";
for _ in 0..3 {
let s = pair.client_streams(client_ch).open(Dir::Bi).unwrap();
pair.client_send(client_ch, s).write(REQUEST).unwrap();
pair.client_send(client_ch, s).finish().unwrap();
pair.drive();
assert_eq!(pair.server_streams(server_ch).accept(Dir::Bi), Some(s));
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.offset == 0 && chunk.bytes == REQUEST
);
assert_matches!(chunks.next(usize::MAX), Ok(None));
let _ = chunks.finalize();
pair.server_send(server_ch, s).write(RESPONSE).unwrap();
pair.server_send(server_ch, s).finish().unwrap();
pair.drive();
let mut recv = pair.client_recv(client_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.offset == 0 && chunk.bytes == RESPONSE
);
assert_matches!(chunks.next(usize::MAX), Ok(None));
let _ = chunks.finalize();
}
}
#[test]
fn handshake_anti_deadlock_probe() {
let _guard = subscribe();
let (cert, key) = big_cert_and_key();
let server = server_config_with_cert(cert.clone(), key);
let client = client_config_with_certs(vec![cert]);
let mut pair = Pair::new(Default::default(), server);
let client_ch = pair.begin_connect(client);
pair.drive_client();
pair.drive_server();
pair.drive_client();
pair.server.inbound.clear();
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Connected)
);
}
#[test]
fn server_can_send_3_inital_packets() {
let _guard = subscribe();
let mut transport = TransportConfig::default();
transport.initial_rtt(Duration::from_millis(10));
let transport = Arc::new(transport);
let (cert, key) = big_cert_and_key();
let mut server = server_config_with_cert(cert.clone(), key);
server.transport_config(transport);
let client = client_config_with_certs(vec![cert]);
let mut pair = Pair::new(Default::default(), server);
let client_ch = pair.begin_connect(client);
pair.drive_client();
pair.drive_server();
assert_eq!(pair.client.inbound.len(), 3);
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::Connected)
);
}
fn big_cert_and_key() -> (CertificateDer<'static>, PrivateKeyDer<'static>) {
let cert = rcgen::generate_simple_self_signed(
Some("localhost".into())
.into_iter()
.chain((0..1000).map(|x| format!("foo_{x}")))
.collect::<Vec<_>>(),
)
.unwrap();
(
cert.cert.into(),
PrivateKeyDer::Pkcs8(cert.signing_key.serialize_der().into()),
)
}
#[test]
fn malformed_token_len() {
let _guard = subscribe();
let client_addr = "[::2]:7890".parse().unwrap();
let mut server = Endpoint::new(Default::default(), Some(Arc::new(server_config())), true);
let mut buf = Vec::with_capacity(server.config().get_max_udp_payload_size() as usize);
server.handle(
Instant::now(),
FourTuple {
remote: client_addr,
local_ip: None,
},
None,
hex!("8900 0000 0101 0000 1b1b 841b 0000 0000 3f00")[..].into(),
&mut buf,
);
}
#[test]
fn loss_probe_requests_immediate_ack() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, _) = pair.connect();
pair.drive();
let stats_after_connect = pair.client_conn_mut(client_ch).stats();
let default_mtu = mem::replace(&mut pair.mtu, 0);
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
pair.mtu = default_mtu;
pair.drive();
let stats_after_recovery = pair.client_conn_mut(client_ch).stats();
assert_eq!(
stats_after_recovery.frame_tx.immediate_ack - stats_after_connect.frame_tx.immediate_ack,
2
);
}
#[test]
fn connect_too_low_mtu() {
let _guard = subscribe();
let mut pair = Pair::default();
pair.mtu = 1000;
pair.begin_connect(client_config());
pair.drive();
pair.server.assert_no_accept();
}
#[test]
fn connect_lost_mtu_probes_do_not_trigger_congestion_control() {
let _guard = subscribe();
let mut pair = Pair::default();
pair.mtu = 1200;
let (client_ch, server_ch) = pair.connect();
pair.drive();
let client_path_stats = pair
.client_conn_mut(client_ch)
.path_stats(PathId::ZERO)
.unwrap();
assert_eq!(client_path_stats.sent_plpmtud_probes, 9);
assert_eq!(client_path_stats.lost_plpmtud_probes, 9);
let server_path_stats = pair
.server_conn_mut(server_ch)
.path_stats(PathId::ZERO)
.unwrap();
assert_eq!(server_path_stats.sent_plpmtud_probes, 9);
assert_eq!(server_path_stats.lost_plpmtud_probes, 9);
assert_eq!(client_path_stats.congestion_events, 0);
assert_eq!(server_path_stats.congestion_events, 0);
}
#[test]
fn connect_detects_mtu() {
let _guard = subscribe();
let max_udp_payload_and_expected_mtu = &[(1200, 1200), (1400, 1389), (1500, 1452)];
for &(pair_max_udp, expected_mtu) in max_udp_payload_and_expected_mtu {
let mut pair = Pair::default();
pair.mtu = pair_max_udp;
let (client_ch, server_ch) = pair.connect();
pair.drive();
assert_eq!(
pair.client_conn_mut(client_ch).path_mtu(PathId::ZERO),
expected_mtu
);
assert_eq!(
pair.server_conn_mut(server_ch).path_mtu(PathId::ZERO),
expected_mtu
);
}
}
#[test]
fn migrate_detects_new_mtu_and_respects_original_peer_max_udp_payload_size() {
let _guard = subscribe();
let client_max_udp_payload_size: u16 = 1400;
let server_endpoint_config = EndpointConfig::default();
let server = Endpoint::new(
Arc::new(server_endpoint_config),
Some(Arc::new(server_config())),
true,
);
let client_endpoint_config = EndpointConfig {
max_udp_payload_size: VarInt::from(client_max_udp_payload_size),
..EndpointConfig::default()
};
let client = Endpoint::new(Arc::new(client_endpoint_config), None, true);
let mut pair = Pair::new_from_endpoint(client, server);
pair.mtu = 1300;
let (client_ch, server_ch) = pair.connect();
pair.drive();
assert_eq!(pair.client_conn_mut(client_ch).path_mtu(PathId::ZERO), 1293);
assert_eq!(pair.server_conn_mut(server_ch).path_mtu(PathId::ZERO), 1300);
pair.mtu = 1500;
pair.client.addr = SocketAddr::new(
Ipv4Addr::new(127, 0, 0, 1).into(),
CLIENT_PORTS.lock().unwrap().next().unwrap(),
);
pair.client_conn_mut(client_ch).ping();
pair.drive();
assert_eq!(
pair.server_conn_mut(server_ch)
.network_path(PathId::ZERO)
.map(|addrs| addrs.remote),
Ok(pair.client.addr)
);
assert_eq!(
pair.server_conn_mut(server_ch).path_mtu(PathId::ZERO),
client_max_udp_payload_size
);
assert_eq!(pair.client_conn_mut(client_ch).path_mtu(PathId::ZERO), 1293);
}
#[test]
fn connect_runs_mtud_again_after_600_seconds() {
let _guard = subscribe();
let mut server_config = server_config();
let mut client_config = client_config();
Arc::get_mut(&mut server_config.transport)
.unwrap()
.max_idle_timeout(None);
Arc::get_mut(&mut client_config.transport)
.unwrap()
.max_idle_timeout(None);
let mut pair = Pair::new(Default::default(), server_config);
pair.mtu = 1400;
let (client_ch, server_ch) = pair.connect_with(client_config);
pair.drive();
let client_conn = pair.client_conn_mut(client_ch);
let client_path_stats = client_conn.path_stats(PathId::ZERO).unwrap();
assert_eq!(client_conn.path_mtu(PathId::ZERO), 1389);
assert_eq!(client_path_stats.sent_plpmtud_probes, 5);
assert_eq!(client_path_stats.lost_plpmtud_probes, 3);
let server_conn = pair.server_conn_mut(server_ch);
let server_path_stats = server_conn.path_stats(PathId::ZERO).unwrap();
assert_eq!(server_conn.path_mtu(PathId::ZERO), 1389);
assert_eq!(server_path_stats.sent_plpmtud_probes, 5);
assert_eq!(server_path_stats.lost_plpmtud_probes, 3);
pair.mtu = 1500;
pair.drive();
assert_eq!(pair.client_conn_mut(client_ch).path_mtu(PathId::ZERO), 1389);
assert_eq!(pair.server_conn_mut(server_ch).path_mtu(PathId::ZERO), 1389);
pair.time += Duration::from_secs(600);
pair.drive();
assert!(!pair.client_conn_mut(client_ch).is_closed());
assert!(!pair.server_conn_mut(client_ch).is_closed());
assert_eq!(pair.client_conn_mut(client_ch).path_mtu(PathId::ZERO), 1452);
assert_eq!(pair.server_conn_mut(server_ch).path_mtu(PathId::ZERO), 1452);
}
#[test]
fn blackhole_after_mtu_change_repairs_itself() {
let _guard = subscribe();
let mut pair = Pair::default();
pair.mtu = 1500;
let (client_ch, server_ch) = pair.connect();
pair.drive();
assert_eq!(pair.client_conn_mut(client_ch).path_mtu(PathId::ZERO), 1452);
assert_eq!(pair.server_conn_mut(server_ch).path_mtu(PathId::ZERO), 1452);
pair.mtu = 1200;
let payload = vec![42; 1300];
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
pair.client_send(client_ch, s).write(&payload).unwrap();
let out_of_bounds = pair.drive_bounded(100);
if out_of_bounds {
panic!("Connections never reached an idle state");
}
let recv = pair.server_recv(server_ch, s);
let buf = stream_chunks(recv);
assert_eq!(buf.len(), 1300);
let client_path_stats = pair
.client_conn_mut(client_ch)
.path_stats(PathId::ZERO)
.unwrap();
assert!(client_path_stats.lost_packets >= 3);
assert!(client_path_stats.congestion_events >= 3);
assert_eq!(client_path_stats.black_holes_detected, 1);
}
#[test]
fn mtud_probes_include_immediate_ack() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, _) = pair.connect();
pair.drive();
let stats = pair.client_conn_mut(client_ch).stats();
let path_stats = pair
.client_conn_mut(client_ch)
.path_stats(PathId::ZERO)
.unwrap();
assert_eq!(path_stats.sent_plpmtud_probes, 4);
assert_eq!(stats.frame_tx.ping, 4);
assert_eq!(stats.frame_tx.immediate_ack, 4);
}
#[test]
fn packet_splitting_with_default_mtu() {
let _guard = subscribe();
let payload = vec![42; 1300];
let mut pair = Pair::default();
pair.mtu = 1200;
let (client_ch, _) = pair.connect();
pair.drive();
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
pair.client_send(client_ch, s).write(&payload).unwrap();
pair.client.drive(pair.time);
assert_eq!(pair.client.outbound.len(), 2);
pair.drive_client();
assert_eq!(pair.server.inbound.len(), 2);
}
#[test]
fn packet_splitting_not_necessary_after_higher_mtu_discovered() {
let _guard = subscribe();
let payload = vec![42; 1300];
let mut pair = Pair::default();
pair.mtu = 1500;
let (client_ch, _) = pair.connect();
pair.drive();
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
pair.client_send(client_ch, s).write(&payload).unwrap();
pair.client.drive(pair.time);
assert_eq!(pair.client.outbound.len(), 1);
pair.drive_client();
assert_eq!(pair.server.inbound.len(), 1);
}
#[test]
fn single_ack_eliciting_packet_triggers_ack_after_delay() {
let _guard = subscribe();
let mut pair = Pair::default_with_deterministic_pns();
let (client_ch, _) = pair.connect_with(client_config_with_deterministic_pns());
pair.drive();
let stats_after_connect = pair.client_conn_mut(client_ch).stats();
let start = pair.time;
pair.client_conn_mut(client_ch).ping();
pair.drive_client(); pair.drive_server(); pair.drive_client();
assert_eq!(pair.time, start);
let stats_after_ping = pair.client_conn_mut(client_ch).stats();
assert_eq!(
stats_after_ping.frame_tx.ping - stats_after_connect.frame_tx.ping,
1
);
assert_eq!(
stats_after_ping.frame_rx.acks - stats_after_connect.frame_rx.acks,
0
);
pair.client.capture_inbound_packets = true;
pair.drive();
let stats_after_drive = pair.client_conn_mut(client_ch).stats();
assert_eq!(
stats_after_drive.frame_rx.acks - stats_after_ping.frame_rx.acks,
1
);
let default_max_ack_delay_ms = TransportParameters::default().max_ack_delay.into_inner();
assert_eq!(
pair.time,
start + Duration::from_millis(default_max_ack_delay_ms)
);
assert_eq!(pair.client.captured_packets.len(), 1);
let mut frames = frame::Iter::new(pair.client.captured_packets.remove(0).into())
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(frames.len(), 1);
if let Frame::Ack(ack) = frames.remove(0) {
let ack_delay_exp = TransportParameters::default().ack_delay_exponent;
let delay = ack.delay << ack_delay_exp.into_inner();
assert_eq!(delay, default_max_ack_delay_ms * 1_000);
} else {
panic!("Expected ACK frame");
}
assert_eq!(
stats_after_drive.frame_tx.ping - stats_after_connect.frame_tx.ping,
1
);
}
#[test]
fn immediate_ack_triggers_ack() {
let _guard = subscribe();
let mut pair = Pair::default_with_deterministic_pns();
let (client_ch, _) = pair.connect_with(client_config_with_deterministic_pns());
pair.drive();
let acks_after_connect = pair.client_conn_mut(client_ch).stats().frame_rx.acks;
pair.client_conn_mut(client_ch).immediate_ack(PathId::ZERO);
pair.drive_client(); pair.drive_server(); pair.drive_client();
let acks_after_ping = pair.client_conn_mut(client_ch).stats().frame_rx.acks;
assert_eq!(acks_after_ping - acks_after_connect, 1);
}
#[test]
fn out_of_order_ack_eliciting_packet_triggers_ack() {
let _guard = subscribe();
let mut pair = Pair::default_with_deterministic_pns();
let (client_ch, server_ch) = pair.connect_with(client_config_with_deterministic_pns());
pair.drive();
let default_mtu = pair.mtu;
let client_stats_after_connect = pair.client_conn_mut(client_ch).stats();
let server_stats_after_connect = pair.server_conn_mut(server_ch).stats();
pair.mtu = 0;
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
let client_stats_after_first_ping = pair.client_conn_mut(client_ch).stats();
assert_eq!(
client_stats_after_first_ping.frame_tx.ping - client_stats_after_connect.frame_tx.ping,
1
);
assert_eq!(
client_stats_after_first_ping.frame_rx.acks - client_stats_after_connect.frame_rx.acks,
0
);
pair.mtu = default_mtu;
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
pair.drive_server();
pair.drive_client();
let client_stats_after_second_ping = pair.client_conn_mut(client_ch).stats();
assert_eq!(
client_stats_after_second_ping.frame_tx.ping - client_stats_after_connect.frame_tx.ping,
2
);
assert_eq!(
client_stats_after_second_ping.frame_rx.acks - client_stats_after_connect.frame_rx.acks,
1
);
let server_stats_after_second_ping = pair.server_conn_mut(server_ch).stats();
assert_eq!(
server_stats_after_second_ping.frame_rx.ping - server_stats_after_connect.frame_rx.ping,
1
);
assert_eq!(
server_stats_after_second_ping.frame_tx.acks - server_stats_after_connect.frame_tx.acks,
1
);
}
#[test]
fn single_ack_eliciting_packet_with_ce_bit_triggers_immediate_ack() {
let _guard = subscribe();
let mut pair = Pair::default_with_deterministic_pns();
let (client_ch, _) = pair.connect_with(client_config_with_deterministic_pns());
pair.drive();
let stats_after_connect = pair.client_conn_mut(client_ch).stats();
let after_connect_path_stats = pair
.client_conn_mut(client_ch)
.path_stats(PathId::ZERO)
.unwrap();
let start = pair.time;
pair.client_conn_mut(client_ch).ping();
pair.congestion_experienced = true;
pair.drive_client(); pair.congestion_experienced = false;
pair.drive_server(); pair.drive_client();
assert_eq!(pair.time, start);
let stats_after_ping = pair.client_conn_mut(client_ch).stats();
assert_eq!(
stats_after_ping.frame_tx.ping - stats_after_connect.frame_tx.ping,
1
);
assert_eq!(
stats_after_ping.frame_rx.acks - stats_after_connect.frame_rx.acks,
1
);
let after_ping_path_stats = pair
.client_conn_mut(client_ch)
.path_stats(PathId::ZERO)
.unwrap();
assert_eq!(
after_ping_path_stats.congestion_events - after_connect_path_stats.congestion_events,
1
);
}
fn setup_ack_frequency_test(max_ack_delay: Duration) -> (Pair, ConnectionHandle, ConnectionHandle) {
let mut client_config = client_config_with_deterministic_pns();
let mut ack_freq_config = AckFrequencyConfig::default();
ack_freq_config
.ack_eliciting_threshold(10u32.into())
.max_ack_delay(Some(max_ack_delay));
Arc::get_mut(&mut client_config.transport)
.unwrap()
.ack_frequency_config(Some(ack_freq_config))
.mtu_discovery_config(None) .initial_rtt(Duration::from_millis(10));
let mut pair = Pair::default_with_deterministic_pns();
pair.latency = Duration::from_millis(10); let (client_ch, server_ch) = pair.connect_with(client_config);
pair.drive();
assert_eq!(
pair.client_conn_mut(client_ch)
.stats()
.frame_tx
.ack_frequency,
1
);
assert_eq!(pair.client_conn_mut(client_ch).stats().frame_tx.ping, 0);
(pair, client_ch, server_ch)
}
#[test]
fn ack_frequency_ack_delayed_from_first_of_flight() {
let _guard = subscribe();
let (mut pair, client_ch, server_ch) = setup_ack_frequency_test(Duration::from_millis(30));
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
pair.time += Duration::from_millis(5);
for _ in 0..2 {
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
}
pair.time += Duration::from_millis(5);
let server_stats_before = pair.server_conn_mut(server_ch).stats();
pair.drive_server();
let server_stats_after = pair.server_conn_mut(server_ch).stats();
assert_eq!(
server_stats_after.frame_rx.ping - server_stats_before.frame_rx.ping,
1
);
assert_eq!(
server_stats_after.frame_tx.acks - server_stats_before.frame_tx.acks,
0
);
pair.time += Duration::from_millis(10);
let server_stats_before = pair.server_conn_mut(server_ch).stats();
pair.drive_server();
let server_stats_after = pair.server_conn_mut(server_ch).stats();
assert_eq!(
server_stats_after.frame_rx.ping - server_stats_before.frame_rx.ping,
2
);
assert_eq!(
server_stats_after.frame_tx.acks - server_stats_before.frame_tx.acks,
0
);
pair.time += Duration::from_millis(20);
let server_stats_before = pair.server_conn_mut(server_ch).stats();
pair.drive_server();
let server_stats_after = pair.server_conn_mut(server_ch).stats();
assert_eq!(
server_stats_after.frame_tx.acks - server_stats_before.frame_tx.acks,
1
);
}
#[test]
fn ack_frequency_ack_sent_after_max_ack_delay() {
let _guard = subscribe();
let max_ack_delay = Duration::from_millis(30);
let (mut pair, client_ch, server_ch) = setup_ack_frequency_test(max_ack_delay);
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
pair.time += pair.latency;
let server_stats_before = pair.server_conn_mut(server_ch).stats();
pair.drive_server();
let server_stats_after = pair.server_conn_mut(server_ch).stats();
assert_eq!(
server_stats_after.frame_rx.ping - server_stats_before.frame_rx.ping,
1
);
assert_eq!(
server_stats_after.frame_tx.acks - server_stats_before.frame_tx.acks,
0
);
pair.time += max_ack_delay;
let server_stats_before = pair.server_conn_mut(server_ch).stats();
pair.drive_server();
let server_stats_after = pair.server_conn_mut(server_ch).stats();
assert_eq!(
server_stats_after.frame_rx.ping - server_stats_before.frame_rx.ping,
0
);
assert_eq!(
server_stats_after.frame_tx.acks - server_stats_before.frame_tx.acks,
1
);
}
#[test]
fn ack_frequency_ack_sent_after_packets_above_threshold() {
let _guard = subscribe();
let max_ack_delay = Duration::from_millis(30);
let (mut pair, client_ch, server_ch) = setup_ack_frequency_test(max_ack_delay);
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
pair.time += Duration::from_millis(5);
for _ in 0..11 {
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
}
pair.time += Duration::from_millis(5);
let server_stats_before = pair.server_conn_mut(server_ch).stats();
pair.drive_server();
let server_stats_after = pair.server_conn_mut(server_ch).stats();
assert_eq!(
server_stats_after.frame_rx.ping - server_stats_before.frame_rx.ping,
1
);
assert_eq!(
server_stats_after.frame_tx.acks - server_stats_before.frame_tx.acks,
0
);
pair.time += Duration::from_millis(5);
let server_stats_before = pair.server_conn_mut(server_ch).stats();
pair.drive_server();
let server_stats_after = pair.server_conn_mut(server_ch).stats();
assert_eq!(
server_stats_after.frame_rx.ping - server_stats_before.frame_rx.ping,
11
);
assert_eq!(
server_stats_after.frame_tx.acks - server_stats_before.frame_tx.acks,
1
);
}
#[test]
fn ack_frequency_ack_sent_after_reordered_packets_below_threshold() {
let _guard = subscribe();
let max_ack_delay = Duration::from_millis(30);
let (mut pair, client_ch, server_ch) = setup_ack_frequency_test(max_ack_delay);
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
pair.time += Duration::from_millis(5);
pair.mtu = 0;
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
pair.mtu = DEFAULT_MTU;
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
pair.time += Duration::from_millis(5);
let server_stats_before = pair.server_conn_mut(server_ch).stats();
pair.drive_server();
let server_stats_after = pair.server_conn_mut(server_ch).stats();
assert_eq!(
server_stats_after.frame_rx.ping - server_stats_before.frame_rx.ping,
1
);
assert_eq!(
server_stats_after.frame_tx.acks - server_stats_before.frame_tx.acks,
0
);
pair.time += Duration::from_millis(5);
let server_stats_before = pair.server_conn_mut(server_ch).stats();
pair.drive_server();
let server_stats_after = pair.server_conn_mut(server_ch).stats();
assert_eq!(
server_stats_after.frame_rx.ping - server_stats_before.frame_rx.ping,
1
);
assert_eq!(
server_stats_after.frame_tx.acks - server_stats_before.frame_tx.acks,
0
);
}
#[test]
fn ack_frequency_ack_sent_after_reordered_packets_above_threshold() {
let _guard = subscribe();
let max_ack_delay = Duration::from_millis(30);
let (mut pair, client_ch, server_ch) = setup_ack_frequency_test(max_ack_delay);
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
pair.time += Duration::from_millis(5);
pair.mtu = 0;
for _ in 0..2 {
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
}
pair.mtu = DEFAULT_MTU;
pair.client_conn_mut(client_ch).ping();
pair.drive_client();
pair.time += Duration::from_millis(5);
let server_stats_before = pair.server_conn_mut(server_ch).stats();
pair.drive_server();
let server_stats_after = pair.server_conn_mut(server_ch).stats();
assert_eq!(
server_stats_after.frame_rx.ping - server_stats_before.frame_rx.ping,
1
);
assert_eq!(
server_stats_after.frame_tx.acks - server_stats_before.frame_tx.acks,
0
);
pair.time += Duration::from_millis(5);
let server_stats_before = pair.server_conn_mut(server_ch).stats();
pair.drive_server();
let server_stats_after = pair.server_conn_mut(server_ch).stats();
assert_eq!(
server_stats_after.frame_rx.ping - server_stats_before.frame_rx.ping,
1
);
assert_eq!(
server_stats_after.frame_tx.acks - server_stats_before.frame_tx.acks,
1
);
}
#[test]
fn ack_frequency_update_max_delay() {
let _guard = subscribe();
let (mut pair, client_ch, server_ch) = setup_ack_frequency_test(Duration::from_millis(200));
assert_eq!(
pair.server_conn_mut(server_ch)
.stats()
.frame_rx
.ack_frequency,
1
);
info!("first ping");
pair.client_conn_mut(client_ch).ping();
pair.drive();
assert_eq!(
pair.server_conn_mut(server_ch)
.stats()
.frame_rx
.ack_frequency,
1
);
info!("delayed ping");
pair.latency *= 10;
pair.client_conn_mut(client_ch).ping();
pair.drive();
assert!(
pair.server_conn_mut(server_ch)
.stats()
.frame_rx
.ack_frequency
>= 2
);
}
fn stream_chunks(mut recv: RecvStream<'_>) -> Vec<u8> {
let mut buf = Vec::new();
let mut chunks = recv.read(true).unwrap();
while let Ok(Some(chunk)) = chunks.next(usize::MAX) {
buf.extend(chunk.bytes);
}
let _ = chunks.finalize();
buf
}
#[test]
fn pure_sender_voluntarily_acks() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
let receiver_acks_initial = pair.server_conn_mut(server_ch).stats().frame_rx.acks;
for _ in 0..100 {
const MSG: &[u8] = b"hello";
pair.client_datagrams(client_ch)
.send(Bytes::from_static(MSG), true)
.unwrap();
pair.drive();
assert_eq!(pair.server_datagrams(server_ch).recv().unwrap(), MSG);
}
let receiver_acks_final = pair.server_conn_mut(server_ch).stats().frame_rx.acks;
assert!(receiver_acks_final > receiver_acks_initial);
}
#[test]
fn reject_manually() {
let _guard = subscribe();
let mut pair = Pair::default();
pair.server.handle_incoming = Box::new(|_| IncomingConnectionBehavior::Reject);
let client_ch = pair.begin_connect(client_config());
pair.drive();
pair.server.assert_no_accept();
let client = pair.client.connections.get_mut(&client_ch).unwrap();
assert!(client.is_closed());
assert!(matches!(
client.poll(),
Some(Event::ConnectionLost {
reason: ConnectionError::ConnectionClosed(close)
}) if close.error_code == TransportErrorCode::CONNECTION_REFUSED
));
}
#[test]
fn validate_then_reject_manually() {
let _guard = subscribe();
let mut pair = Pair::default();
pair.server.handle_incoming = Box::new({
let mut i = 0;
move |incoming| {
if incoming.remote_address_validated() {
assert_eq!(i, 1);
i += 1;
IncomingConnectionBehavior::Reject
} else {
assert_eq!(i, 0);
i += 1;
IncomingConnectionBehavior::Retry
}
}
});
let client_ch = pair.begin_connect(client_config());
pair.drive();
pair.server.assert_no_accept();
let client = pair.client.connections.get_mut(&client_ch).unwrap();
assert!(client.is_closed());
assert!(matches!(
client.poll(),
Some(Event::ConnectionLost {
reason: ConnectionError::ConnectionClosed(close)
}) if close.error_code == TransportErrorCode::CONNECTION_REFUSED
));
pair.drive();
assert_matches!(pair.client_conn_mut(client_ch).poll(), None);
assert_eq!(pair.client.known_connections(), 0);
assert_eq!(pair.client.known_cids(), 0);
assert_eq!(pair.server.known_connections(), 0);
assert_eq!(pair.server.known_cids(), 0);
}
#[test]
fn endpoint_and_connection_impl_send_sync() {
const fn is_send_sync<T: Send + Sync>() {}
is_send_sync::<Endpoint>();
is_send_sync::<Connection>();
}
#[test]
fn stream_gso() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, _) = pair.connect();
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
let initial_ios = pair.client_conn_mut(client_ch).stats().udp_tx.ios;
info!("sending");
for _ in 0..20 {
pair.client_send(client_ch, s).write(&[0; 1024]).unwrap();
}
pair.client_send(client_ch, s).finish().unwrap();
pair.drive();
let final_ios = pair.client_conn_mut(client_ch).stats().udp_tx.ios;
assert_eq!(final_ios - initial_ios, 2);
}
#[test]
fn datagram_gso() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, _) = pair.connect();
let initial_ios = pair.client_conn_mut(client_ch).stats().udp_tx.ios;
let initial_bytes = pair.client_conn_mut(client_ch).stats().udp_tx.bytes;
info!("sending");
const DATAGRAM_LEN: usize = 1024;
const DATAGRAMS: usize = 10;
for _ in 0..DATAGRAMS {
pair.client_datagrams(client_ch)
.send(Bytes::from_static(&[0; DATAGRAM_LEN]), false)
.unwrap();
}
pair.drive();
let final_ios = pair.client_conn_mut(client_ch).stats().udp_tx.ios;
let final_bytes = pair.client_conn_mut(client_ch).stats().udp_tx.bytes;
assert_eq!(final_ios - initial_ios, 1);
assert_eq!(
final_bytes - initial_bytes,
((29 + DATAGRAM_LEN) * DATAGRAMS) as u64
);
}
#[test]
fn gso_truncation() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
let initial_ios = pair.client_conn_mut(client_ch).stats().udp_tx.ios;
info!("sending");
const SIZES: [usize; 3] = [1024, 768, 768];
for len in SIZES {
pair.client_datagrams(client_ch)
.send(vec![0; len].into(), false)
.unwrap();
}
pair.drive();
let final_ios = pair.client_conn_mut(client_ch).stats().udp_tx.ios;
assert_eq!(final_ios - initial_ios, 2);
for len in SIZES {
assert_eq!(
pair.server_datagrams(server_ch)
.recv()
.expect("datagram lost")
.len(),
len
);
}
}
#[test]
fn pad_to_mtu() {
let _guard = subscribe();
const MTU: u16 = 1333;
let client_config = {
let mut c_config = client_config();
let t_config = TransportConfig {
initial_mtu: MTU,
mtu_discovery_config: None,
pad_to_mtu: true,
..TransportConfig::default()
};
c_config.transport_config(t_config.into());
c_config
};
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect_with(client_config);
let initial_ios = pair.client_conn_mut(client_ch).stats().udp_tx.ios;
pair.server.capture_inbound_packets = true;
info!("sending");
const LEN_1: usize = 800;
const LEN_2: usize = 600;
pair.client_datagrams(client_ch)
.send(vec![0; LEN_1].into(), false)
.unwrap();
pair.client_datagrams(client_ch)
.send(vec![0; LEN_2].into(), false)
.unwrap();
pair.client.drive(pair.time);
assert_eq!(pair.client.outbound.len(), 2);
assert_eq!(pair.client.outbound[0].0.size, usize::from(MTU));
assert_eq!(pair.client.outbound[0].1.len(), usize::from(MTU));
assert_eq!(pair.client.outbound[1].0.size, usize::from(MTU));
assert_eq!(pair.client.outbound[1].1.len(), usize::from(MTU));
pair.drive_client();
assert_eq!(pair.server.inbound.len(), 2);
assert_eq!(pair.server.inbound[0].packet.len(), usize::from(MTU));
assert_eq!(pair.server.inbound[1].packet.len(), usize::from(MTU));
pair.drive();
let final_ios = pair.client_conn_mut(client_ch).stats().udp_tx.ios;
assert_eq!(final_ios - initial_ios, 1);
assert_eq!(
pair.server_datagrams(server_ch)
.recv()
.expect("datagram lost")
.len(),
LEN_1
);
assert_eq!(
pair.server_datagrams(server_ch)
.recv()
.expect("datagram lost")
.len(),
LEN_2
);
}
#[test]
fn large_datagram_with_acks() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
for _ in 0..10 {
pair.server_conn_mut(server_ch).ping();
pair.drive_server();
pair.client.inbound.pop_back();
pair.server_conn_mut(server_ch).ping();
pair.drive_server();
}
let max_size = pair.client_datagrams(client_ch).max_size().unwrap();
let msg = Bytes::from(vec![0; max_size]);
pair.client_datagrams(client_ch)
.send(msg.clone(), true)
.unwrap();
let initial_datagrams = pair.client_conn_mut(client_ch).stats().udp_tx.datagrams;
pair.drive();
let final_datagrams = pair.client_conn_mut(client_ch).stats().udp_tx.datagrams;
assert_eq!(pair.server_datagrams(server_ch).recv().unwrap(), msg);
assert_eq!(final_datagrams - initial_datagrams, 2);
}
#[test]
fn voluntary_ack_with_large_datagrams() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, _) = pair.connect();
let initial_datagrams = pair.client_conn_mut(client_ch).stats().udp_tx.datagrams;
const COUNT: usize = 256;
for _ in 0..COUNT {
let max_size = pair.client_datagrams(client_ch).max_size().unwrap();
pair.client_datagrams(client_ch)
.send(vec![0; max_size].into(), true)
.unwrap();
pair.drive();
}
let final_datagrams = pair.client_conn_mut(client_ch).stats().udp_tx.datagrams;
assert_ne!(
final_datagrams - initial_datagrams,
COUNT as u64,
"client should have sent some ACK-only packets"
);
}
#[test]
fn address_discovery() {
let _guard = subscribe();
let server = ServerConfig {
transport: Arc::new(TransportConfig {
address_discovery_role: crate::address_discovery::Role::Both,
..TransportConfig::default()
}),
..server_config()
};
let mut pair = Pair::new(Default::default(), server);
let client_config = ClientConfig {
transport: Arc::new(TransportConfig {
address_discovery_role: crate::address_discovery::Role::Both,
..TransportConfig::default()
}),
..client_config()
};
let conn_handle = pair.begin_connect(client_config);
pair.drive();
let expected_addr = pair.client.addr;
let conn = pair.client_conn_mut(conn_handle);
assert_matches!(conn.poll(), Some(Event::HandshakeDataReady));
assert_matches!(conn.poll(), Some(Event::Connected));
assert_matches!(conn.poll(), Some(Event::HandshakeConfirmed));
assert_matches!(conn.poll(), Some(Event::Path(PathEvent::ObservedAddr{id: PathId::ZERO, addr})) if addr == expected_addr);
assert_matches!(conn.poll(), None);
let conn_handle = pair.server.assert_accept();
let expected_addr = pair.server.addr;
let conn = pair.server_conn_mut(conn_handle);
assert_matches!(conn.poll(), Some(Event::HandshakeDataReady));
assert_matches!(conn.poll(), Some(Event::HandshakeConfirmed));
assert_matches!(conn.poll(), Some(Event::Connected));
assert_matches!(conn.poll(), Some(Event::Path(PathEvent::ObservedAddr{id: PathId::ZERO, addr})) if addr == expected_addr);
assert_matches!(conn.poll(), None);
}
#[test]
fn address_discovery_zero_rtt_accepted() {
let _guard = subscribe();
let server = ServerConfig {
transport: Arc::new(TransportConfig {
address_discovery_role: crate::address_discovery::Role::Both,
..TransportConfig::default()
}),
..server_config()
};
let mut pair = Pair::new(Default::default(), server);
pair.server.handle_incoming = Box::new(|_| IncomingConnectionBehavior::Accept);
let client_cfg = ClientConfig {
transport: Arc::new(TransportConfig {
address_discovery_role: crate::address_discovery::Role::Both,
..TransportConfig::default()
}),
..client_config()
};
let alt_client_cfg = ClientConfig {
transport: Arc::new(TransportConfig {
address_discovery_role: crate::address_discovery::Role::Disabled,
..TransportConfig::default()
}),
..client_cfg.clone()
};
let client_ch = pair.begin_connect(client_cfg);
pair.drive();
pair.server.assert_accept();
pair.client
.connections
.get_mut(&client_ch)
.unwrap()
.close(pair.time, VarInt(0), [][..].into());
pair.drive();
pair.client.addr = SocketAddr::new(
Ipv6Addr::LOCALHOST.into(),
CLIENT_PORTS.lock().unwrap().next().unwrap(),
);
info!("resuming session");
let client_ch = pair.begin_connect(alt_client_cfg);
assert!(pair.client_conn_mut(client_ch).has_0rtt());
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"Hello, 0-RTT!";
pair.client_send(client_ch, s).write(MSG).unwrap();
pair.drive();
let conn = pair.client_conn_mut(client_ch);
assert_matches!(conn.poll(), Some(Event::HandshakeDataReady));
assert_matches!(conn.poll(), Some(Event::Connected));
assert!(pair.client_conn_mut(client_ch).accepted_0rtt());
let server_ch = pair.server.assert_accept();
let conn = pair.server_conn_mut(server_ch);
assert_matches!(conn.poll(), Some(Event::HandshakeDataReady));
assert_matches!(conn.poll(), Some(Event::HandshakeConfirmed));
assert_matches!(conn.poll(), Some(Event::Connected));
assert_matches!(
conn.poll(),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
let mut recv = pair.server_recv(server_ch, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.offset == 0 && chunk.bytes == MSG
);
let _ = chunks.finalize();
assert_eq!(
pair.client_conn_mut(client_ch)
.path_stats(PathId::ZERO)
.unwrap()
.lost_packets,
0
);
}
#[test]
fn address_discovery_zero_rtt_rejection() {
let _guard = subscribe();
let server_cfg = ServerConfig {
transport: Arc::new(TransportConfig {
address_discovery_role: crate::address_discovery::Role::Disabled,
..TransportConfig::default()
}),
..server_config()
};
let alt_server_cfg = ServerConfig {
transport: Arc::new(TransportConfig {
address_discovery_role: crate::address_discovery::Role::SendOnly,
..TransportConfig::default()
}),
..server_cfg.clone()
};
let mut pair = Pair::new(Default::default(), server_cfg);
let client_cfg = ClientConfig {
transport: Arc::new(TransportConfig {
address_discovery_role: crate::address_discovery::Role::Both,
..TransportConfig::default()
}),
..client_config()
};
let client_ch = pair.begin_connect(client_cfg.clone());
pair.drive();
let server_ch = pair.server.assert_accept();
let conn = pair.server_conn_mut(server_ch);
assert_matches!(conn.poll(), Some(Event::HandshakeDataReady));
assert_matches!(conn.poll(), Some(Event::HandshakeConfirmed));
assert_matches!(conn.poll(), Some(Event::Connected));
assert_matches!(conn.poll(), None);
pair.client
.connections
.get_mut(&client_ch)
.unwrap()
.close(pair.time, VarInt(0), [][..].into());
pair.drive();
assert_matches!(
pair.server_conn_mut(server_ch).poll(),
Some(Event::ConnectionLost { .. })
);
assert_matches!(pair.server_conn_mut(server_ch).poll(), None);
pair.client.connections.clear();
pair.server.connections.clear();
pair.server
.set_server_config(Some(Arc::new(alt_server_cfg)));
info!("resuming session");
let client_ch = pair.begin_connect(client_cfg);
assert!(pair.client_conn_mut(client_ch).has_0rtt());
let s = pair.client_streams(client_ch).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"Hello, 0-RTT!";
pair.client_send(client_ch, s).write(MSG).unwrap();
pair.drive();
let conn = pair.client_conn_mut(server_ch);
assert_matches!(conn.poll(), Some(Event::HandshakeDataReady));
assert_matches!(
conn.poll(),
Some(Event::ConnectionLost { reason }) if matches!(reason, ConnectionError::TransportError(_) )
);
}
#[test]
fn address_discovery_retransmission() {
let _guard = subscribe();
let server = ServerConfig {
transport: Arc::new(TransportConfig {
address_discovery_role: crate::address_discovery::Role::Both,
initial_rtt: Duration::from_millis(10),
..TransportConfig::default()
}),
..server_config()
};
let mut pair = Pair::new(Default::default(), server);
let client_config = ClientConfig {
transport: Arc::new(TransportConfig {
address_discovery_role: crate::address_discovery::Role::Both,
initial_rtt: Duration::from_millis(10),
..TransportConfig::default()
}),
..client_config()
};
let client_ch = pair.begin_connect(client_config);
pair.step();
pair.client.inbound.pop_back().unwrap();
pair.step();
let conn = pair.client_conn_mut(client_ch);
assert_matches!(conn.poll(), Some(Event::HandshakeDataReady));
assert_matches!(conn.poll(), Some(Event::Connected));
assert_matches!(conn.poll(), None);
pair.drive();
let conn = pair.client_conn_mut(client_ch);
assert_matches!(conn.poll(), Some(Event::HandshakeConfirmed));
assert_matches!(conn.poll(), Some(Event::Path(PathEvent::ObservedAddr{id: PathId::ZERO, addr})) if addr == pair.client.addr);
}
#[test]
fn address_discovery_rebind_retransmission() {
let _guard = subscribe();
let server = ServerConfig {
transport: Arc::new(TransportConfig {
address_discovery_role: crate::address_discovery::Role::Both,
initial_rtt: Duration::from_millis(10),
..TransportConfig::default()
}),
..server_config()
};
let mut pair = Pair::new(Default::default(), server);
let client_config = ClientConfig {
transport: Arc::new(TransportConfig {
address_discovery_role: crate::address_discovery::Role::Both,
initial_rtt: Duration::from_millis(10),
..TransportConfig::default()
}),
..client_config()
};
let client_ch = pair.begin_connect(client_config);
pair.step();
pair.client.inbound.pop_back().unwrap();
pair.step();
let conn = pair.client_conn_mut(client_ch);
assert_matches!(conn.poll(), Some(Event::HandshakeDataReady));
assert_matches!(conn.poll(), Some(Event::Connected));
assert_matches!(conn.poll(), None);
let time = pair.time;
pair.client_conn_mut(client_ch)
.handle_network_change(None, time);
pair.client
.addr
.set_port(pair.client.addr.port().overflowing_add(1).0);
pair.drive();
let conn = pair.client_conn_mut(client_ch);
assert_matches!(conn.poll(), Some(Event::HandshakeConfirmed));
assert_matches!(conn.poll(), Some(Event::Path(PathEvent::ObservedAddr{id: PathId::ZERO, addr})) if addr == pair.client.addr);
}
#[test]
fn network_change_single_path_recovery() {
let _guard = subscribe();
let mut pair =
ConnPair::with_transport_cfg(TransportConfig::default(), TransportConfig::default());
pair.drive();
let cid_seq_before = pair.conn(Client).active_remote_cid_seq();
pair.passive_migration(Client);
pair.handle_network_change(Client, None);
pair.drive();
assert_matches!(pair.poll(Client), None);
let cid_seq_after = pair.conn(Client).active_remote_cid_seq();
assert!(
cid_seq_after > cid_seq_before,
"CID should have been rotated: before={cid_seq_before}, after={cid_seq_after}"
);
let s = pair.streams(Client).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"after network change";
pair.send_stream(Client, s).write(MSG).unwrap();
pair.send_stream(Client, s).finish().unwrap();
pair.drive();
assert_matches!(
pair.poll(Server),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
assert_matches!(pair.streams(Server).accept(Dir::Uni), Some(stream) if stream == s);
let mut recv = pair.recv_stream(Server, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.bytes == MSG
);
let _ = chunks.finalize();
}
#[test]
fn oversized_datagrams_trigger_unblock() {
let _guard = subscribe();
let mut pair = Pair::default();
const INITIAL_MTU: usize = 1300;
pair.mtu = INITIAL_MTU;
let mut client_config = client_config();
let mut transport_config = TransportConfig::default();
let send_buffer_size = transport_config.datagram_send_buffer_size;
transport_config.initial_mtu(INITIAL_MTU as u16);
client_config.transport_config(transport_config.into());
let (client_ch, _) = pair.connect_with(client_config);
let max_size = pair.client_datagrams(client_ch).max_size().unwrap();
let data = vec![0; max_size];
loop {
match pair
.client_datagrams(client_ch)
.send(data.clone().into(), false)
{
Ok(_) => {}
Err(SendDatagramError::Blocked(_)) => {
break;
}
Err(e) => panic!("unexpected error: {e}"),
}
}
pair.mtu = 1200;
while pair.step() {
let err = loop {
if let Err(e) = pair
.client_datagrams(client_ch)
.send(data.clone().into(), false)
{
break e;
}
};
match err {
SendDatagramError::Blocked(_) => {
while let Some(event) = pair.client_conn_mut(client_ch).poll() {
tracing::info!("ignoring connection event: {event:?}");
}
}
SendDatagramError::TooLarge => {
break;
}
_ => panic!("unexpected error: {err}"),
}
}
assert_eq!(
pair.client_conn_mut(client_ch)
.path_stats(PathId::ZERO)
.unwrap()
.black_holes_detected,
1,
"expected a black hole to have been detected",
);
assert_eq!(
pair.client_datagrams(client_ch).send_buffer_space(),
send_buffer_size,
"expected the send buffer to be empty after too large datagrams were dropped",
);
match pair.client_conn_mut(client_ch).poll() {
Some(Event::DatagramsUnblocked) => {}
_ => panic!("expected DatagramsUnblocked event"),
}
}
#[test]
fn reject_short_idcid() {
let _guard = subscribe();
let client_addr = "[::2]:7890".parse().unwrap();
let network_path = FourTuple {
remote: client_addr,
local_ip: None,
};
let mut server = Endpoint::new(Default::default(), Some(Arc::new(server_config())), true);
let now = Instant::now();
let mut buf = Vec::with_capacity(server.config().get_max_udp_payload_size() as usize);
let mut initial = BytesMut::from(hex!("c4 00000001 00 00 00 3f").as_ref());
initial.resize(MIN_INITIAL_SIZE.into(), 0);
let event = server.handle(now, network_path, None, initial, &mut buf);
let Some(DatagramEvent::Response(Transmit { .. })) = event else {
panic!("expected an initial close");
};
}
#[test]
fn preferred_address() {
let _guard = subscribe();
let mut server_config = server_config();
server_config.preferred_address_v6(Some("[::1]:65535".parse().unwrap()));
let mut pair = Pair::new(Arc::new(EndpointConfig::default()), server_config);
pair.connect();
}
#[test]
fn handshake_sequence() {
let _guard = subscribe();
let mut pair = Pair::default();
let ch = pair.begin_connect(client_config());
pair.step();
assert_matches!(pair.client_conn_mut(ch).poll(), None);
let sh = pair.server.assert_accept();
assert_matches!(
pair.server_conn_mut(sh).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(pair.server_conn_mut(sh).poll(), None);
pair.step();
assert_matches!(
pair.client_conn_mut(ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(pair.client_conn_mut(ch).poll(), Some(Event::Connected));
assert_matches!(pair.client_conn_mut(ch).poll(), None);
assert_matches!(
pair.server_conn_mut(sh).poll(),
Some(Event::HandshakeConfirmed)
);
assert_matches!(pair.server_conn_mut(sh).poll(), Some(Event::Connected));
assert_matches!(pair.server_conn_mut(sh).poll(), None);
pair.drive_client();
assert_matches!(
pair.client_conn_mut(ch).poll(),
Some(Event::HandshakeConfirmed)
);
assert_matches!(pair.client_conn_mut(ch).poll(), None);
}
#[test]
fn handshake_confirmation_no_resumption_shortcut() {
let _guard = subscribe();
let mut pair = Pair::default();
let config = client_config();
let (ch, _) = pair.connect_with(config.clone());
pair.client
.connections
.get_mut(&ch)
.unwrap()
.close(pair.time, VarInt(0), [][..].into());
pair.drive();
info!("resuming session");
let ch = pair.begin_connect(config);
assert!(pair.client_conn_mut(ch).has_0rtt());
pair.step();
assert_matches!(pair.client_conn_mut(ch).poll(), None);
let sh = pair.server.assert_accept();
assert_matches!(
pair.server_conn_mut(sh).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(pair.server_conn_mut(sh).poll(), None);
pair.step();
assert_matches!(
pair.client_conn_mut(ch).poll(),
Some(Event::HandshakeDataReady)
);
assert_matches!(pair.client_conn_mut(ch).poll(), Some(Event::Connected));
assert_matches!(pair.client_conn_mut(ch).poll(), None);
assert_matches!(
pair.server_conn_mut(sh).poll(),
Some(Event::HandshakeConfirmed)
);
assert_matches!(pair.server_conn_mut(sh).poll(), Some(Event::Connected));
assert_matches!(pair.server_conn_mut(sh).poll(), None);
pair.drive_client();
assert_matches!(
pair.client_conn_mut(ch).poll(),
Some(Event::HandshakeConfirmed)
);
assert_matches!(pair.client_conn_mut(ch).poll(), None);
}
#[test]
fn regression_close_frame_encoding() {
let close = ConnectionClose {
error_code: TransportErrorCode::NO_ERROR,
frame_type: frame::MaybeFrame::None,
reason: Bytes::from_static(b"last path abandoned by peer"),
};
let mut buf = BytesMut::new();
close.encode(&mut buf, 1100);
let decoded = frame::Iter::new(buf.freeze())
.unwrap()
.next()
.unwrap()
.unwrap();
let Frame::Close(frame::Close::Connection(close_dec)) = decoded else {
panic!("Expected frame::Close to be decoded, but got {decoded:?}");
};
assert_eq!(close_dec, close);
}
#[test]
fn regression_maybe_frame_roundtrip() {
let ty = frame::MaybeFrame::Unknown(1337); let mut buf = BytesMut::new();
ty.encode(&mut buf);
let dec = frame::MaybeFrame::decode(&mut buf.freeze()).unwrap();
assert_eq!(dec, ty);
}
#[test]
fn regression_close_without_connection_event() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, server_ch) = pair.connect();
let now = pair.time;
pair.client_conn_mut(client_ch)
.simulate_protocol_violation(now);
pair.server_conn_mut(server_ch)
.close(now, VarInt(0), Bytes::new());
pair.drive();
assert_matches!(
pair.client_conn_mut(client_ch).poll(),
Some(Event::ConnectionLost { .. })
);
}