use std::net::SocketAddr;
use std::num::NonZeroU32;
use std::sync::Arc;
use std::time::Duration;
use assert_matches::assert_matches;
use testresult::TestResult;
use tracing::info;
use crate::tests::RoutingTable;
use crate::tests::util::{CLIENT_PORTS, ConnPair, SERVER_PORTS};
use crate::{
ClientConfig, ConnectionId, ConnectionIdGenerator, Endpoint, EndpointConfig, FourTuple,
LOCAL_CID_COUNT, NetworkChangeHint, PathId, PathStatus, RandomConnectionIdGenerator,
ServerConfig, Side::*, TransportConfig, cid_queue::CidQueue,
};
use crate::{ClosePathError, Dir, Event, PathAbandonReason, PathEvent, StreamEvent, TransportErrorCode};
use super::util::{min_opt, subscribe};
use super::{Pair, client_config, server_config};
const MAX_PATHS: u32 = 3;
fn multipath_pair() -> ConnPair {
multipath_pair_with_nat_traversal(false)
}
fn multipath_pair_with_nat_traversal(nat_traversal: bool) -> ConnPair {
let mut cfg = TransportConfig::default();
cfg.max_concurrent_multipath_paths(MAX_PATHS);
cfg.initial_rtt(Duration::from_millis(10));
if nat_traversal {
cfg.set_max_remote_nat_traversal_addresses(8);
}
#[cfg(feature = "qlog")]
cfg.qlog_from_env("multipath_test");
let mut pair = ConnPair::with_transport_cfg(cfg.clone(), cfg);
pair.drive();
info!("connected");
pair
}
#[test]
fn non_zero_length_cids() {
let _guard = subscribe();
let multipath_transport_cfg = Arc::new(TransportConfig {
max_concurrent_multipath_paths: NonZeroU32::new(3 as _),
initial_rtt: Duration::from_millis(10),
..TransportConfig::default()
});
let server_cfg = Arc::new(ServerConfig {
transport: multipath_transport_cfg.clone(),
..server_config()
});
let server = Endpoint::new(Default::default(), Some(server_cfg), true);
struct ZeroLenCidGenerator;
impl ConnectionIdGenerator for ZeroLenCidGenerator {
fn generate_cid(&mut self) -> ConnectionId {
ConnectionId::new(&[])
}
fn cid_len(&self) -> usize {
0
}
fn cid_lifetime(&self) -> Option<std::time::Duration> {
None
}
}
let mut ep_config = EndpointConfig::default();
ep_config.cid_generator(|| Box::new(ZeroLenCidGenerator));
let client = Endpoint::new(Arc::new(ep_config), None, true);
let mut pair = Pair::new_from_endpoint(client, server);
let client_cfg = ClientConfig {
transport: multipath_transport_cfg,
..client_config()
};
pair.begin_connect(client_cfg);
pair.drive();
let accept_err = pair
.server
.accepted
.take()
.expect("server didn't try connecting")
.expect_err("server did not raise error for connection");
match accept_err {
crate::ConnectionError::TransportError(error) => {
assert_eq!(error.code, crate::TransportErrorCode::PROTOCOL_VIOLATION);
}
_ => panic!("Not a TransportError"),
}
}
#[test]
fn path_acks() {
let _guard = subscribe();
let mut pair = multipath_pair();
let stats = pair.stats(Client);
assert!(stats.frame_rx.path_acks > 0);
assert!(stats.frame_tx.path_acks > 0);
}
#[test]
fn path_status() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
let prev_status = pair.set_path_status(Client, PathId::ZERO, PathStatus::Backup)?;
assert_eq!(prev_status, PathStatus::Available);
pair.drive();
assert_eq!(
pair.remote_path_status(Server, PathId::ZERO),
Some(PathStatus::Backup)
);
let client_stats = pair.stats(Client);
assert_eq!(client_stats.frame_tx.path_status_available, 0);
assert_eq!(client_stats.frame_tx.path_status_backup, 1);
assert_eq!(client_stats.frame_rx.path_status_available, 0);
assert_eq!(client_stats.frame_rx.path_status_backup, 0);
let server_stats = pair.stats(Server);
assert_eq!(server_stats.frame_tx.path_status_available, 0);
assert_eq!(server_stats.frame_tx.path_status_backup, 0);
assert_eq!(server_stats.frame_rx.path_status_available, 0);
assert_eq!(server_stats.frame_rx.path_status_backup, 1);
Ok(())
}
#[test]
fn path_close_last_path() {
let _guard = subscribe();
let mut pair = multipath_pair();
assert_matches!(
pair.close_path(Client, PathId::ZERO, 0u8.into()),
Err(ClosePathError::LastOpenPath)
);
assert!(!pair.is_closed(Client));
assert!(!pair.is_closed(Server));
}
#[test]
fn cid_issued_multipath() {
let _guard = subscribe();
const ACTIVE_CID_LIMIT: u64 = crate::cid_queue::CidQueue::LEN as _;
let mut pair = multipath_pair();
let client_stats = pair.stats(Client);
dbg!(&client_stats);
assert_eq!(client_stats.frame_tx.new_connection_id, 0);
assert_eq!(
client_stats.frame_tx.path_new_connection_id,
MAX_PATHS as u64 * ACTIVE_CID_LIMIT
);
assert_eq!(
client_stats.frame_rx.new_connection_id,
ACTIVE_CID_LIMIT - 1
);
assert_eq!(
client_stats.frame_rx.path_new_connection_id,
(MAX_PATHS - 1) as u64 * ACTIVE_CID_LIMIT
);
}
#[test]
fn multipath_cid_rotation() {
let _guard = subscribe();
const CID_TIMEOUT: Duration = Duration::from_secs(2);
let cid_generator_factory: fn() -> Box<dyn ConnectionIdGenerator> =
|| Box::new(*RandomConnectionIdGenerator::new(8).set_lifetime(CID_TIMEOUT));
let server_cfg = ServerConfig {
transport: Arc::new(TransportConfig {
max_concurrent_multipath_paths: NonZeroU32::new(MAX_PATHS),
initial_rtt: Duration::from_millis(10),
..TransportConfig::default()
}),
..server_config()
};
let server = Endpoint::new(
Arc::new(EndpointConfig {
connection_id_generator_factory: Arc::new(cid_generator_factory),
..EndpointConfig::default()
}),
Some(Arc::new(server_cfg)),
true,
);
let client = Endpoint::new(Arc::new(EndpointConfig::default()), None, true);
let client_cfg = ClientConfig {
transport: Arc::new(TransportConfig {
max_concurrent_multipath_paths: NonZeroU32::new(MAX_PATHS),
initial_rtt: Duration::from_millis(10),
..TransportConfig::default()
}),
..client_config()
};
let mut pair = ConnPair::connect_with(Pair::new_from_endpoint(client, server), client_cfg);
let mut round: u64 = 1;
let mut stop = pair.time;
let end = pair.time + 5 * CID_TIMEOUT;
let mut active_cid_num = CidQueue::LEN as u64 + 1;
active_cid_num = active_cid_num.min(LOCAL_CID_COUNT);
let mut left_bound = 0;
let mut right_bound = active_cid_num - 1;
while pair.time < end {
stop += CID_TIMEOUT;
while pair.time < stop {
if !pair.step()
&& let Some(time) = min_opt(pair.client.next_wakeup(), pair.server.next_wakeup())
{
pair.time = time;
}
}
info!(
"Checking active cid sequence range before {:?} seconds",
round * CID_TIMEOUT.as_secs()
);
let _bound = (left_bound, right_bound);
for path_id in 0..MAX_PATHS {
assert_matches!(pair.conn(Server).active_local_path_cid_seq(path_id), _bound);
}
round += 1;
left_bound += active_cid_num;
right_bound += active_cid_num;
pair.drive_server();
}
let stats = pair.stats(Server);
assert_eq!(stats.frame_tx.new_connection_id, (CidQueue::LEN - 1) as u64);
let initial_batch: u64 = (MAX_PATHS - 1) as u64 * CidQueue::LEN as u64;
let each_round: u64 = MAX_PATHS as u64 * CidQueue::LEN as u64;
let final_round: u64 = MAX_PATHS as u64;
let path_new_cids = initial_batch + (round - 2) * each_round + final_round;
debug_assert_eq!(path_new_cids, 73);
assert_eq!(stats.frame_tx.path_new_connection_id, path_new_cids);
assert_eq!(stats.frame_tx.retire_connection_id, 0);
assert_eq!(stats.frame_tx.path_retire_connection_id, 1);
assert_eq!(stats.frame_rx.new_connection_id, 0);
assert_eq!(
stats.frame_rx.path_new_connection_id,
MAX_PATHS as u64 * CidQueue::LEN as u64
);
assert_eq!(stats.frame_rx.retire_connection_id, 0);
let path_retire_cids = MAX_PATHS as u64 * CidQueue::LEN as u64 * (round - 2);
debug_assert_eq!(path_retire_cids, 60);
assert_eq!(stats.frame_rx.path_retire_connection_id, path_retire_cids);
}
#[test]
fn issue_max_path_id() -> TestResult {
let _guard = subscribe();
let server_cfg = TransportConfig {
max_concurrent_multipath_paths: NonZeroU32::new(1),
initial_rtt: Duration::from_millis(10),
..TransportConfig::default()
};
let client_cfg = TransportConfig {
max_concurrent_multipath_paths: NonZeroU32::new(MAX_PATHS),
initial_rtt: Duration::from_millis(10),
..TransportConfig::default()
};
let mut pair = ConnPair::with_transport_cfg(server_cfg, client_cfg);
pair.drive();
info!("connected");
let server_new_cids = CidQueue::LEN as u64 - 1;
let mut server_path_new_cids = 0;
let stats = pair.stats(Server);
assert_eq!(stats.frame_tx.max_path_id, 0);
assert_eq!(stats.frame_tx.new_connection_id, server_new_cids);
assert_eq!(stats.frame_tx.path_new_connection_id, server_path_new_cids);
let client_new_cids = 0;
let mut client_path_new_cids = CidQueue::LEN as u64;
assert_eq!(stats.frame_rx.new_connection_id, client_new_cids);
assert_eq!(stats.frame_rx.path_new_connection_id, client_path_new_cids);
pair.set_max_concurrent_paths(Server, MAX_PATHS)?;
pair.drive();
let stats = pair.stats(Server);
server_path_new_cids += (MAX_PATHS as u64 - 1) * CidQueue::LEN as u64;
assert_eq!(stats.frame_tx.max_path_id, 1);
assert_eq!(stats.frame_tx.new_connection_id, server_new_cids);
assert_eq!(stats.frame_tx.path_new_connection_id, server_path_new_cids);
client_path_new_cids += (MAX_PATHS as u64 - 1) * CidQueue::LEN as u64;
assert_eq!(stats.frame_rx.new_connection_id, client_new_cids);
assert_eq!(stats.frame_rx.path_new_connection_id, client_path_new_cids);
Ok(())
}
#[test]
fn issue_max_path_id_reordered() -> TestResult {
let _guard = subscribe();
let server_cfg = TransportConfig {
max_concurrent_multipath_paths: NonZeroU32::new(1),
initial_rtt: Duration::from_millis(10),
..TransportConfig::default()
};
let client_cfg = TransportConfig {
max_concurrent_multipath_paths: NonZeroU32::new(MAX_PATHS),
initial_rtt: Duration::from_millis(10),
..TransportConfig::default()
};
let mut pair = ConnPair::with_transport_cfg(server_cfg, client_cfg);
pair.drive();
info!("connected");
let server_new_cids = CidQueue::LEN as u64 - 1;
let mut server_path_new_cids = 0;
let stats = pair.stats(Server);
assert_eq!(stats.frame_tx.max_path_id, 0);
assert_eq!(stats.frame_tx.new_connection_id, server_new_cids);
assert_eq!(stats.frame_tx.path_new_connection_id, server_path_new_cids);
let client_new_cids = 0;
let mut client_path_new_cids = CidQueue::LEN as u64;
assert_eq!(stats.frame_rx.new_connection_id, client_new_cids);
assert_eq!(stats.frame_rx.path_new_connection_id, client_path_new_cids);
pair.set_max_concurrent_paths(Server, MAX_PATHS)?;
pair.drive_server();
pair.reorder_inbound(Client);
pair.drive();
let stats = pair.stats(Server);
server_path_new_cids += (MAX_PATHS as u64 - 1) * CidQueue::LEN as u64;
assert_eq!(stats.frame_tx.max_path_id, 1);
assert_eq!(stats.frame_tx.new_connection_id, server_new_cids);
assert_eq!(stats.frame_tx.path_new_connection_id, server_path_new_cids);
client_path_new_cids += (MAX_PATHS as u64 - 1) * CidQueue::LEN as u64;
assert_eq!(stats.frame_rx.new_connection_id, client_new_cids);
assert_eq!(stats.frame_rx.path_new_connection_id, client_path_new_cids);
Ok(())
}
#[test]
fn open_path() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
let server_addr = pair.addrs_to_server();
let path_id = pair.open_path(Client, server_addr, PathStatus::Available)?;
pair.drive();
assert_matches!(
pair.poll(Client),
Some(Event::Path(crate::PathEvent::Opened { id })) if id == path_id
);
assert_matches!(
pair.poll(Server),
Some(Event::Path(crate::PathEvent::Opened { id })) if id == path_id
);
Ok(())
}
#[test]
fn open_path_key_update() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
let server_addr = pair.addrs_to_server();
let path_id = pair.open_path(Client, server_addr, PathStatus::Available)?;
pair.force_key_update(Client);
pair.drive();
assert_matches!(
pair.poll(Client),
Some(Event::Path(crate::PathEvent::Opened { id })) if id == path_id
);
assert_matches!(
pair.poll(Server),
Some(Event::Path(crate::PathEvent::Opened { id })) if id == path_id
);
Ok(())
}
#[test]
fn open_path_validation_fails_server_side() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
let different_addr = FourTuple {
remote: SocketAddr::new(
[9, 8, 7, 6].into(),
SERVER_PORTS.lock()?.next().ok_or("no port")?,
),
local_ip: None,
};
let path_id = pair.open_path(Client, different_addr, PathStatus::Available)?;
while pair.blackhole_step(true, false) {}
assert_matches!(
pair.poll(Client),
Some(Event::Path(crate::PathEvent::Abandoned { id, reason: PathAbandonReason::ValidationFailed })) if id == path_id
);
assert!(pair.poll(Server).is_none());
Ok(())
}
#[test]
fn open_path_validation_fails_client_side() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
pair.client.addr = SocketAddr::new(
[9, 8, 7, 6].into(),
CLIENT_PORTS.lock()?.next().ok_or("no port")?,
);
let addr = pair.server.addr;
let network_path = FourTuple {
remote: addr,
local_ip: None,
};
let path_id = pair.open_path(Client, network_path, PathStatus::Available)?;
pair.drive_client();
pair.drive_server();
info!("dropping client inbound queue");
pair.client.inbound.clear();
while pair.blackhole_step(true, true) {}
assert_matches!(
pair.poll(Server),
Some(Event::Path(crate::PathEvent::Abandoned { id, reason: PathAbandonReason::ValidationFailed })) if id == path_id
);
Ok(())
}
#[test]
fn open_path_ensure_after_abandon() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
let mut second_client_addr = pair.client.addr;
let mut second_server_addr = pair.server.addr;
second_client_addr.set_port(second_client_addr.port() + 1);
second_server_addr.set_port(second_server_addr.port() + 1);
pair.routes = Some(RoutingTable::simple_symmetric(
[pair.client.addr, second_client_addr],
[pair.server.addr, second_server_addr],
));
let second_path = FourTuple {
local_ip: Some(second_client_addr.ip()),
remote: second_server_addr,
};
info!("opening path 1");
let path_id = pair.open_path(Client, second_path, PathStatus::Available)?;
pair.drive();
assert_matches!(
pair.poll(Client),
Some(Event::Path(crate::PathEvent::Opened { id })) if id == path_id
);
assert_matches!(
pair.poll(Server),
Some(Event::Path(crate::PathEvent::Opened { id })) if id == path_id
);
info!("closing path {path_id}");
pair.close_path(Client, path_id, 0u8.into())?;
pair.drive();
assert_matches!(
pair.poll(Client),
Some(Event::Path(crate::PathEvent::Abandoned { id, reason: PathAbandonReason::ApplicationClosed { error_code }}))
if id == path_id && error_code == 0u8.into()
);
assert_matches!(
pair.poll(Server),
Some(Event::Path(crate::PathEvent::Abandoned { id, reason: PathAbandonReason::RemoteAbandoned { error_code }}))
if id == path_id && error_code == 0u8.into()
);
pair.drive();
assert_matches!(
pair.poll(Client),
Some(Event::Path(crate::PathEvent::Discarded { id, .. })) if id == path_id
);
assert_matches!(
pair.poll(Server),
Some(Event::Path(crate::PathEvent::Discarded { id, .. })) if id == path_id
);
info!("opening path 2");
let (path_id, existed) = pair.open_path_ensure(Client, second_path, PathStatus::Available)?;
pair.drive();
assert!(!existed);
assert_matches!(
pair.poll(Client),
Some(Event::Path(crate::PathEvent::Opened { id })) if id == path_id
);
assert_matches!(
pair.poll(Server),
Some(Event::Path(crate::PathEvent::Opened { id })) if id == path_id
);
Ok(())
}
#[test]
fn close_path() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
let server_addr = pair.addrs_to_server();
let path_id = pair.open_path(Client, server_addr, PathStatus::Available)?;
pair.drive();
assert_ne!(path_id, PathId::ZERO);
let stats0 = pair.stats(Client);
assert_eq!(stats0.frame_tx.path_abandon, 0);
assert_eq!(stats0.frame_rx.path_abandon, 0);
assert_eq!(stats0.frame_tx.max_path_id, 0);
assert_eq!(stats0.frame_rx.max_path_id, 0);
info!("closing path 0");
pair.close_path(Client, PathId::ZERO, 0u8.into())?;
pair.drive();
let stats1 = pair.stats(Client);
assert_eq!(stats1.frame_tx.path_abandon, 1);
assert_eq!(stats1.frame_rx.path_abandon, 1);
assert_eq!(stats1.frame_tx.max_path_id, 1);
assert_eq!(stats1.frame_rx.max_path_id, 1);
assert!(stats1.frame_tx.path_new_connection_id > stats0.frame_tx.path_new_connection_id);
assert!(stats1.frame_rx.path_new_connection_id > stats0.frame_rx.path_new_connection_id);
Ok(())
}
#[test]
fn close_last_path() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
let server_addr = pair.addrs_to_server();
let path_id = pair.open_path(Client, server_addr, PathStatus::Available)?;
pair.drive();
assert_ne!(path_id, PathId::ZERO);
info!("client closes path 0");
pair.close_path(Client, PathId::ZERO, 0u8.into())?;
info!("server closes path 1");
pair.close_path(Server, PathId(1), 0u8.into())?;
pair.drive();
assert!(pair.is_closed(Server));
assert!(pair.is_closed(Client));
Ok(())
}
#[test]
fn per_path_observed_address() -> TestResult {
let _guard = subscribe();
let transport_cfg = TransportConfig {
max_concurrent_multipath_paths: NonZeroU32::new(MAX_PATHS),
address_discovery_role: crate::address_discovery::Role::Both,
..TransportConfig::default()
};
let mut pair = ConnPair::with_transport_cfg(transport_cfg.clone(), transport_cfg);
info!("connected");
pair.drive();
let expected_addr = pair.client.addr;
assert_matches!(pair.poll(Client), Some(Event::Path(PathEvent::ObservedAddr{id: PathId::ZERO, addr})) if addr == expected_addr);
assert_matches!(pair.poll(Client), None);
let expected_addr = pair.server.addr;
assert_matches!(pair.poll(Server), Some(Event::Path(PathEvent::ObservedAddr{id: PathId::ZERO, addr})) if addr == expected_addr);
assert_matches!(pair.poll(Server), None);
let our_addr = pair.passive_migration(Client);
pair.handle_network_change(Client, None);
pair.drive();
assert_matches!(
pair.poll(Client),
Some(Event::Path(PathEvent::Abandoned {
id: PathId(0),
reason: PathAbandonReason::UnusableAfterNetworkChange
}))
);
assert_matches!(
pair.poll(Client),
Some(Event::Path(PathEvent::Opened { id: PathId(1) }))
);
assert_matches!(
pair.poll(Client),
Some(Event::Path(PathEvent::ObservedAddr{ id: PathId(1), addr })) if addr == our_addr
);
Ok(())
}
#[test]
fn mtud_on_two_paths() -> TestResult {
let _guard = subscribe();
let multipath_transport_cfg = Arc::new(TransportConfig {
max_concurrent_multipath_paths: NonZeroU32::new(MAX_PATHS),
initial_rtt: Duration::from_millis(10),
max_idle_timeout: None,
..TransportConfig::default()
});
let server_cfg = Arc::new(ServerConfig {
transport: multipath_transport_cfg.clone(),
..server_config()
});
let server = Endpoint::new(Default::default(), Some(server_cfg), true);
let client = Endpoint::new(Default::default(), None, true);
let mut pair = Pair::new_from_endpoint(client, server);
pair.mtu = 1200; let client_cfg = ClientConfig {
transport: multipath_transport_cfg,
..client_config()
};
let mut pair = ConnPair::connect_with(pair, client_cfg);
pair.drive();
info!("connected");
assert_eq!(pair.conn(Client).path_mtu(PathId::ZERO), 1200);
let server_addr = pair.addrs_to_server();
let path_id = pair.open_path(Client, server_addr, PathStatus::Available)?;
pair.drive();
assert_matches!(
pair.poll(Client),
Some(Event::Path(crate::PathEvent::Opened { id })) if id == path_id
);
assert_matches!(
pair.poll(Server),
Some(Event::Path(crate::PathEvent::Opened { id })) if id == path_id
);
assert_eq!(pair.conn(Client).path_mtu(PathId::ZERO), 1200);
assert_eq!(pair.conn(Client).path_mtu(path_id), 1200);
pair.mtu = 1452;
pair.time += Duration::from_secs(600);
info!("Bumping MTU to: {}", pair.mtu);
pair.drive();
info!("MTU Path 0: {}", pair.conn(Client).path_mtu(PathId::ZERO));
info!(
"MTU Path {}: {}",
path_id,
pair.conn(Client).path_mtu(path_id)
);
assert_eq!(pair.conn(Client).path_mtu(PathId::ZERO), 1452);
assert_eq!(pair.conn(Client).path_mtu(path_id), 1452);
Ok(())
}
#[test]
fn remote_can_close_last_validated_path() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
pair.passive_migration(Client);
let route = FourTuple {
remote: pair.server.addr,
local_ip: None,
};
pair.open_path(Client, route, PathStatus::Available)?;
pair.drive_client();
pair.close_path(Client, PathId::ZERO, 0u8.into())?;
pair.drive();
let mut close = None;
for side in [Client, Server] {
while let Some(event) = pair.poll(side) {
if let Event::ConnectionLost { reason } = event {
close = Some(reason);
}
}
assert_eq!(close, None);
}
Ok(())
}
#[test]
fn network_change_multipath_no_hint_replaces_path() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
pair.passive_migration(Client);
pair.handle_network_change(Client, None);
pair.drive();
assert_matches!(
pair.poll(Client),
Some(Event::Path(PathEvent::Abandoned {
id: PathId(0),
reason: PathAbandonReason::UnusableAfterNetworkChange
}))
);
assert_matches!(
pair.poll(Client),
Some(Event::Path(PathEvent::Opened { id: PathId(1) }))
);
assert_matches!(
pair.poll(Server),
Some(Event::Path(PathEvent::Abandoned {
id: PathId::ZERO,
reason: PathAbandonReason::RemoteAbandoned { error_code }
}))
if error_code == TransportErrorCode::PATH_UNSTABLE_OR_POOR.into()
);
assert_matches!(
pair.poll(Server),
Some(Event::Path(PathEvent::Opened { id: PathId(1) }))
);
assert_matches!(
pair.poll(Server),
Some(Event::Path(PathEvent::Discarded {
id: PathId::ZERO,
..
}))
);
assert_matches!(
pair.poll(Client),
Some(Event::Path(PathEvent::Discarded {
id: PathId::ZERO,
..
}))
);
let s = pair.streams(Client).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"after network change";
pair.send_stream(Client, s).write(MSG).unwrap();
pair.send_stream(Client, s).finish().unwrap();
pair.drive();
assert_matches!(
pair.poll(Server),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
assert_matches!(pair.streams(Server).accept(Dir::Uni), Some(stream) if stream == s);
let mut recv = pair.recv_stream(Server, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.bytes == MSG
);
let _ = chunks.finalize();
Ok(())
}
#[test]
fn network_change_selective_hint() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
let server_addr = pair.addrs_to_server();
let second_path = pair.open_path(Client, server_addr, PathStatus::Available)?;
pair.drive();
assert_matches!(
pair.poll(Client),
Some(Event::Path(PathEvent::Opened { id })) if id == second_path
);
assert_matches!(
pair.poll(Server),
Some(Event::Path(PathEvent::Opened { id })) if id == second_path
);
#[derive(Debug)]
struct SelectiveHint(PathId);
impl NetworkChangeHint for SelectiveHint {
fn is_path_recoverable(&self, path_id: PathId, _network_path: FourTuple) -> bool {
path_id == self.0
}
}
let hint = SelectiveHint(PathId::ZERO);
pair.passive_migration(Client);
pair.handle_network_change(Client, Some(&hint));
pair.drive();
let mut client_events = Vec::new();
while let Some(event) = pair.poll(Client) {
client_events.push(event);
}
assert!(
client_events
.iter()
.any(|e| matches!(e, Event::Path(PathEvent::Opened { .. }))),
"expected an Opened event for the replacement path, got: {client_events:?}"
);
assert!(
!client_events.iter().any(|e| matches!(
e,
Event::Path(PathEvent::Discarded {
id: PathId::ZERO,
..
})
)),
"PathId::ZERO should not have been closed: {client_events:?}"
);
Ok(())
}
#[test]
fn network_change_server_two_paths_selective_hint() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
let server_addr = pair.addrs_to_server();
let second_path = pair.open_path(Client, server_addr, PathStatus::Available)?;
pair.drive();
assert_matches!(
pair.poll(Client),
Some(Event::Path(PathEvent::Opened { id })) if id == second_path
);
assert_matches!(
pair.poll(Server),
Some(Event::Path(PathEvent::Opened { id })) if id == second_path
);
#[derive(Debug)]
struct SelectiveHint(PathId);
impl NetworkChangeHint for SelectiveHint {
fn is_path_recoverable(&self, path_id: PathId, _network_path: FourTuple) -> bool {
path_id == self.0
}
}
pair.handle_network_change(Server, Some(&SelectiveHint(second_path)));
pair.drive();
assert_matches!(
pair.poll(Server),
Some(Event::Path(PathEvent::Abandoned {
id,
reason: PathAbandonReason::UnusableAfterNetworkChange,
})) if id == PathId::ZERO
);
assert_matches!(
pair.poll(Server),
Some(Event::Path(PathEvent::Discarded { id, .. })) if id == PathId::ZERO
);
assert_matches!(pair.poll(Server), None);
assert_matches!(
pair.poll(Client),
Some(Event::Path(PathEvent::Abandoned {
id: PathId::ZERO,
reason: PathAbandonReason::RemoteAbandoned { .. },
}))
);
assert_matches!(
pair.poll(Client),
Some(Event::Path(PathEvent::Discarded {
id: PathId::ZERO,
..
}))
);
assert_matches!(pair.poll(Client), None);
Ok(())
}
#[test]
fn network_change_server_single_path_non_recoverable_falls_back() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
#[derive(Debug)]
struct NonRecoverableHint;
impl NetworkChangeHint for NonRecoverableHint {
fn is_path_recoverable(&self, _path_id: PathId, _network_path: FourTuple) -> bool {
false
}
}
pair.handle_network_change(Server, Some(&NonRecoverableHint));
pair.drive();
assert_matches!(pair.poll(Server), None);
assert_matches!(pair.poll(Client), None);
Ok(())
}
#[test]
fn network_change_server_no_hint_recovers() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
let server_addr = pair.addrs_to_server();
let second_path = pair.open_path(Client, server_addr, PathStatus::Available)?;
pair.drive();
assert_matches!(
pair.poll(Client),
Some(Event::Path(PathEvent::Opened { id })) if id == second_path
);
assert_matches!(
pair.poll(Server),
Some(Event::Path(PathEvent::Opened { id })) if id == second_path
);
pair.handle_network_change(Server, None);
pair.drive();
assert_matches!(pair.poll(Server), None);
assert_matches!(pair.poll(Client), None);
Ok(())
}
#[test]
fn path_open_deadline_is_set_on_send() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
let server_addr = pair.addrs_to_server();
let path_id = pair.open_path(Client, server_addr, PathStatus::Available)?;
let far_future = pair.time + Duration::from_secs(5);
pair.handle_timeout(Client, far_future);
assert!(
pair.poll(Client).is_none(),
"path was abandoned before any challenge was sent (issue #456)"
);
pair.time = far_future;
pair.drive();
assert_matches!(
pair.poll(Client),
Some(Event::Path(PathEvent::Opened { id })) if id == path_id,
"path should open successfully after the challenge is sent"
);
Ok(())
}
#[test]
fn path_scheduling_path_status() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
info!("Setting Path 0 to PathStatus::Backup");
let prev_status = pair.set_path_status(Client, PathId::ZERO, PathStatus::Backup)?;
assert_eq!(prev_status, PathStatus::Available);
pair.drive();
assert_eq!(
pair.remote_path_status(Server, PathId::ZERO),
Some(PathStatus::Backup)
);
info!("Opening Path 1 with PathStatus::Available");
let server_addr = pair.addrs_to_server();
let path_1 = pair.open_path(Client, server_addr, PathStatus::Available)?;
pair.drive();
let stats_path0_t0 = pair.conn_mut(Client).path_stats(PathId::ZERO).unwrap();
let stats_path1_t0 = pair.conn_mut(Client).path_stats(path_1).unwrap();
info!("Sending STREAM frame");
let s = pair.streams(Client).open(Dir::Uni).unwrap();
pair.send_stream(Client, s).write(b"hello").unwrap();
pair.drive();
let stats_path0_t1 = pair.conn_mut(Client).path_stats(PathId::ZERO).unwrap();
let stats_path1_t1 = pair.conn_mut(Client).path_stats(path_1).unwrap();
info!("assert");
assert!((stats_path0_t1.udp_tx.datagrams - stats_path0_t0.udp_tx.datagrams) == 0);
assert!((stats_path1_t1.udp_tx.datagrams - stats_path1_t0.udp_tx.datagrams) > 0);
Ok(())
}
#[test]
fn server_abandon_last_verified_path() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
pair.passive_migration(Client);
let route = FourTuple {
remote: pair.server.addr,
local_ip: None,
};
pair.open_path(Client, route, PathStatus::Available)?;
pair.close_path(Client, PathId::ZERO, 0u8.into())?;
pair.drive();
let evt = pair.poll(Server);
assert!(matches!(
evt,
Some(Event::Path(PathEvent::Abandoned { .. }))
));
let evt = pair.poll(Server);
assert!(matches!(evt, Some(Event::Path(PathEvent::Opened { .. }))));
let evt = pair.poll(Server);
let Some(Event::Path(PathEvent::Discarded { path_stats, .. })) = evt else {
panic!("did not get path discarded event");
};
assert_eq!(path_stats.frame_tx.path_abandon, 1);
Ok(())
}
#[test]
fn remote_path_abandon_with_remaining_path() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
let server_addr = pair.addrs_to_server();
let _path_id = pair.open_path(Client, server_addr, PathStatus::Available)?;
pair.drive();
while pair.poll(Client).is_some() {}
while pair.poll(Server).is_some() {}
pair.close_path(Server, PathId::ZERO, 42u8.into())?;
pair.drive();
assert_matches!(
pair.poll(Client),
Some(Event::Path(PathEvent::Abandoned {
id: PathId::ZERO,
reason: PathAbandonReason::RemoteAbandoned { error_code }
})) if error_code == 42u8.into()
);
assert!(!pair.is_closed(Client));
assert!(!pair.is_closed(Server));
Ok(())
}
#[test]
fn remote_path_abandon_last_path_closes_connection() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
let server_addr = pair.addrs_to_server();
let _path1 = pair.open_path(Client, server_addr, PathStatus::Available)?;
pair.drive();
while pair.poll(Client).is_some() {}
while pair.poll(Server).is_some() {}
pair.close_path(Client, PathId::ZERO, 0u8.into())?;
pair.drive();
while pair.poll(Client).is_some() {}
while pair.poll(Server).is_some() {}
pair.force_remote_abandon(Client, PathId::from(1u8));
pair.drive();
assert!(
pair.is_closed(Client),
"client should be closed after grace period expired"
);
let mut saw_abandon = false;
let mut saw_close = false;
while let Some(event) = pair.poll(Client) {
match event {
Event::Path(PathEvent::Abandoned {
reason: PathAbandonReason::RemoteAbandoned { .. },
..
}) => saw_abandon = true,
Event::ConnectionLost { .. } => saw_close = true,
_ => {}
}
}
assert!(
saw_abandon,
"client should see path abandon event for last path"
);
assert!(saw_close, "client should see connection lost event");
Ok(())
}
#[test]
fn remote_path_abandon_last_path_client_opens_new() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
let server_addr = pair.addrs_to_server();
let _path1 = pair.open_path(Client, server_addr, PathStatus::Available)?;
pair.drive();
while pair.poll(Client).is_some() {}
while pair.poll(Server).is_some() {}
pair.close_path(Client, PathId::ZERO, 0u8.into())?;
pair.drive();
while pair.poll(Client).is_some() {}
while pair.poll(Server).is_some() {}
pair.force_remote_abandon(Client, PathId::from(1u8));
let new_path = pair.addrs_to_server();
let new_path_id = pair.open_path(Client, new_path, PathStatus::Available)?;
pair.drive();
assert!(!pair.is_closed(Client), "client should survive");
assert!(!pair.is_closed(Server), "server should survive");
let mut saw_abandon = false;
let mut saw_opened = false;
while let Some(event) = pair.poll(Client) {
match event {
Event::Path(PathEvent::Abandoned {
reason: PathAbandonReason::RemoteAbandoned { .. },
..
}) => saw_abandon = true,
Event::Path(PathEvent::Opened { id }) if id == new_path_id => saw_opened = true,
_ => {}
}
}
assert!(saw_abandon, "client should see abandon for last path");
assert!(saw_opened, "client should see new path opened");
Ok(())
}
#[test]
fn abandon_path_data_continues() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair();
let server_addr = pair.addrs_to_server();
let path1 = pair.open_path(Client, server_addr, PathStatus::Available)?;
pair.drive();
while pair.poll(Client).is_some() {}
while pair.poll(Server).is_some() {}
info!("client abandons path 0");
pair.close_path(Client, PathId::ZERO, 0u8.into())?;
pair.drive();
while pair.poll(Client).is_some() {}
while pair.poll(Server).is_some() {}
assert!(
pair.path_status(Client, path1).is_ok(),
"client should still have path 1"
);
assert!(
pair.path_status(Server, path1).is_ok(),
"server should still have path 1"
);
let s = pair.streams(Client).open(Dir::Uni).unwrap();
const MSG: &[u8] = b"data after path abandon";
pair.send_stream(Client, s).write(MSG).unwrap();
pair.send_stream(Client, s).finish().unwrap();
pair.drive();
assert_matches!(
pair.poll(Server),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
assert_matches!(pair.streams(Server).accept(Dir::Uni), Some(stream) if stream == s);
let mut recv = pair.recv_stream(Server, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.bytes == MSG
);
let _ = chunks.finalize();
assert!(!pair.is_closed(Client));
assert!(!pair.is_closed(Server));
Ok(())
}
#[test]
fn abandon_cycle() -> TestResult {
let _guard = subscribe();
let mut cfg = TransportConfig::default();
cfg.max_concurrent_multipath_paths(6);
cfg.initial_rtt(Duration::from_millis(10));
let mut pair = ConnPair::with_transport_cfg(cfg.clone(), cfg);
pair.drive();
let mut addrs_client = vec![pair.client.addr];
let mut addrs_server = vec![pair.server.addr];
for i in 1..6u16 {
let mut ca = pair.client.addr;
ca.set_port(ca.port() + i);
addrs_client.push(ca);
let mut sa = pair.server.addr;
sa.set_port(sa.port() + i);
addrs_server.push(sa);
}
pair.routes = Some(RoutingTable::simple_symmetric(
addrs_client.clone(),
addrs_server.clone(),
));
let mut current_path = PathId::ZERO;
for cycle in 0..3u16 {
let addr_idx = (cycle as usize) + 1;
let new_path_net = FourTuple {
local_ip: Some(addrs_client[addr_idx].ip()),
remote: addrs_server[addr_idx],
};
info!("cycle {cycle}: opening new path on addr index {addr_idx}");
let new_path = pair.open_path(Client, new_path_net, PathStatus::Available)?;
pair.drive();
while pair.poll(Client).is_some() {}
while pair.poll(Server).is_some() {}
info!("cycle {cycle}: abandoning path {current_path}");
pair.close_path(Client, current_path, 0u8.into())?;
pair.drive();
while pair.poll(Client).is_some() {}
while pair.poll(Server).is_some() {}
assert!(
pair.path_status(Client, current_path).is_err(),
"cycle {cycle}: abandoned path should be gone"
);
assert!(
pair.path_status(Client, new_path).is_ok(),
"cycle {cycle}: new path should be alive"
);
assert!(
!pair.is_closed(Client),
"cycle {cycle}: client should be alive"
);
assert!(
!pair.is_closed(Server),
"cycle {cycle}: server should be alive"
);
let s = pair.streams(Client).open(Dir::Uni).unwrap();
let msg = format!("cycle {cycle}");
pair.send_stream(Client, s).write(msg.as_bytes()).unwrap();
pair.send_stream(Client, s).finish().unwrap();
pair.drive();
assert_matches!(
pair.poll(Server),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
assert_matches!(pair.streams(Server).accept(Dir::Uni), Some(stream) if stream == s);
let mut recv = pair.recv_stream(Server, s);
let mut chunks = recv.read(false).unwrap();
assert_matches!(
chunks.next(usize::MAX),
Ok(Some(chunk)) if chunk.bytes == msg.as_bytes()
);
let _ = chunks.finalize();
current_path = new_path;
}
Ok(())
}
#[test]
fn nat_traversal_revalidates_existing_path() -> TestResult {
let _guard = subscribe();
let mut pair = multipath_pair_with_nat_traversal(true);
let server_addr = pair.server.addr;
let client_addr = pair.client.addr;
pair.add_nat_traversal_address(Server, server_addr)?;
pair.add_nat_traversal_address(Client, client_addr)?;
pair.drive();
let probed = pair.initiate_nat_traversal_round(Client)?;
assert_eq!(probed.len(), 1);
assert_eq!(probed[0], server_addr);
pair.drive();
assert_eq!(
pair.path_status(Client, PathId::ZERO)?,
PathStatus::Available
);
let challenges_before = pair.stats(Client).frame_tx.path_challenge;
let probed = pair.initiate_nat_traversal_round(Client)?;
assert_eq!(probed.len(), 1);
pair.drive_bounded(20);
let challenges_after = pair.stats(Client).frame_tx.path_challenge;
assert!(
challenges_after > challenges_before,
"expected new PATH_CHALLENGE for existing path \
(before={challenges_before}, after={challenges_after})"
);
Ok(())
}
#[test]
fn path_recovers_after_silent_gap_via_keepalive() -> TestResult {
let _guard = subscribe();
let mut cfg = TransportConfig::default();
cfg.max_concurrent_multipath_paths(MAX_PATHS);
cfg.initial_rtt(Duration::from_millis(10));
cfg.default_path_max_idle_timeout(Some(Duration::from_secs(60)));
let mut pair = ConnPair::with_transport_cfg(cfg.clone(), cfg);
pair.drive();
while pair.poll(Client).is_some() {}
while pair.poll(Server).is_some() {}
let s = pair.streams(Server).open(Dir::Uni).unwrap();
pair.send_stream(Server, s).write(&[42u8; 5000]).unwrap();
pair.drive();
assert_matches!(
pair.poll(Client),
Some(Event::Stream(StreamEvent::Opened { dir: Dir::Uni }))
);
assert_matches!(pair.streams(Client).accept(Dir::Uni), Some(stream) if stream == s);
let mut recv = pair.recv_stream(Client, s);
let mut chunks = recv.read(false).unwrap();
let mut total_read = 0;
while let Ok(Some(chunk)) = chunks.next(usize::MAX) {
total_read += chunk.bytes.len();
}
let _ = chunks.finalize();
info!("read {total_read} bytes before gap");
assert!(total_read > 0, "should have received initial data");
while pair.poll(Client).is_some() {}
while pair.poll(Server).is_some() {}
pair.send_stream(Server, s).write(&[43u8; 5000]).unwrap();
info!("starting silent gap");
let gap_start = pair.time;
for _ in 0..10 {
if !pair.blackhole_step(true, true) {
break;
}
}
let gap_duration = pair.time - gap_start;
info!("gap lasted {:?}", gap_duration);
pair.send_stream(Server, s).write(b"after gap").unwrap();
pair.send_stream(Server, s).finish().unwrap();
info!("gap ended, driving to recovery");
let mut received_post_gap = false;
for i in 0..50 {
if pair.is_closed(Client) || pair.is_closed(Server) {
info!("connection died at step {i}");
break;
}
pair.step();
while let Some(event) = pair.poll(Client) {
if matches!(&event, Event::Stream(StreamEvent::Readable { .. })) {
info!("client received data at step {i}");
received_post_gap = true;
}
}
if received_post_gap {
break;
}
}
assert!(!pair.is_closed(Client), "client should survive the gap");
assert!(!pair.is_closed(Server), "server should survive the gap");
assert!(
received_post_gap,
"client should receive data after the gap recovers"
);
Ok(())
}