use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::time::Duration;
use std::time::Instant;
use smallvec::SmallVec;
use slab::Slab;
use crate::Config;
use crate::Error;
use crate::Result;
use crate::StartupExit;
use crate::pmtud;
use crate::recovery;
use crate::recovery::Bandwidth;
use crate::recovery::HandshakeStatus;
use crate::recovery::OnLossDetectionTimeoutOutcome;
use crate::recovery::RecoveryOps;
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum PathState {
Failed,
Unknown,
Validating,
ValidatingMTU,
Validated,
}
impl PathState {
#[cfg(feature = "ffi")]
pub fn to_c(self) -> libc::ssize_t {
match self {
PathState::Failed => -1,
PathState::Unknown => 0,
PathState::Validating => 1,
PathState::ValidatingMTU => 2,
PathState::Validated => 3,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum PathEvent {
New(SocketAddr, SocketAddr),
Validated(SocketAddr, SocketAddr),
FailedValidation(SocketAddr, SocketAddr),
Closed(SocketAddr, SocketAddr),
ReusedSourceConnectionId(
u64,
(SocketAddr, SocketAddr),
(SocketAddr, SocketAddr),
),
PeerMigrated(SocketAddr, SocketAddr),
}
#[derive(Debug)]
pub struct Path {
local_addr: SocketAddr,
peer_addr: SocketAddr,
pub active_scid_seq: Option<u64>,
pub active_dcid_seq: Option<u64>,
state: PathState,
active: bool,
pub recovery: recovery::Recovery,
pub pmtud: Option<pmtud::Pmtud>,
in_flight_challenges: VecDeque<([u8; 8], usize, Instant)>,
max_challenge_size: usize,
probing_lost: usize,
last_probe_lost_time: Option<Instant>,
received_challenges: VecDeque<[u8; 8]>,
received_challenges_max_len: usize,
pub sent_count: usize,
pub recv_count: usize,
pub retrans_count: usize,
pub total_pto_count: usize,
pub dgram_sent_count: usize,
pub dgram_recv_count: usize,
pub sent_bytes: u64,
pub recv_bytes: u64,
pub stream_retrans_bytes: u64,
pub max_send_bytes: usize,
pub verified_peer_address: bool,
pub peer_verified_local_address: bool,
challenge_requested: bool,
failure_notified: bool,
migrating: bool,
pub needs_ack_eliciting: bool,
}
impl Path {
pub fn new(
local_addr: SocketAddr, peer_addr: SocketAddr,
recovery_config: &recovery::RecoveryConfig,
path_challenge_recv_max_queue_len: usize, is_initial: bool,
config: Option<&Config>,
) -> Self {
let (state, active_scid_seq, active_dcid_seq) = if is_initial {
(PathState::Validated, Some(0), Some(0))
} else {
(PathState::Unknown, None, None)
};
let pmtud = config.and_then(|c| {
if c.pmtud {
let maximum_supported_mtu: usize = std::cmp::min(
c.local_transport_params
.max_udp_payload_size
.try_into()
.unwrap_or(c.max_send_udp_payload_size),
c.max_send_udp_payload_size,
);
Some(pmtud::Pmtud::new(maximum_supported_mtu, c.pmtud_max_probes))
} else {
None
}
});
Self {
local_addr,
peer_addr,
active_scid_seq,
active_dcid_seq,
state,
active: false,
recovery: recovery::Recovery::new_with_config(recovery_config),
pmtud,
in_flight_challenges: VecDeque::new(),
max_challenge_size: 0,
probing_lost: 0,
last_probe_lost_time: None,
received_challenges: VecDeque::with_capacity(
path_challenge_recv_max_queue_len,
),
received_challenges_max_len: path_challenge_recv_max_queue_len,
sent_count: 0,
recv_count: 0,
retrans_count: 0,
total_pto_count: 0,
dgram_sent_count: 0,
dgram_recv_count: 0,
sent_bytes: 0,
recv_bytes: 0,
stream_retrans_bytes: 0,
max_send_bytes: 0,
verified_peer_address: false,
peer_verified_local_address: false,
challenge_requested: false,
failure_notified: false,
migrating: false,
needs_ack_eliciting: false,
}
}
#[inline]
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
#[inline]
pub fn peer_addr(&self) -> SocketAddr {
self.peer_addr
}
#[inline]
fn working(&self) -> bool {
self.state > PathState::Failed
}
#[inline]
pub fn active(&self) -> bool {
self.active && self.working() && self.active_dcid_seq.is_some()
}
#[inline]
pub fn usable(&self) -> bool {
self.active() ||
(self.state == PathState::Validated &&
self.active_dcid_seq.is_some())
}
#[inline]
fn unused(&self) -> bool {
!self.active() && self.active_dcid_seq.is_none()
}
#[inline]
pub fn probing_required(&self) -> bool {
!self.received_challenges.is_empty() || self.validation_requested()
}
fn promote_to(&mut self, state: PathState) {
if self.state < state {
self.state = state;
}
}
#[inline]
pub fn validated(&self) -> bool {
self.state == PathState::Validated
}
#[inline]
fn validation_failed(&self) -> bool {
self.state == PathState::Failed
}
#[inline]
pub fn under_validation(&self) -> bool {
matches!(self.state, PathState::Validating | PathState::ValidatingMTU)
}
#[inline]
pub fn request_validation(&mut self) {
self.challenge_requested = true;
}
#[inline]
pub fn validation_requested(&self) -> bool {
self.challenge_requested
}
pub fn should_send_pmtu_probe(
&mut self, hs_confirmed: bool, hs_done: bool, out_len: usize,
is_closing: bool, frames_empty: bool,
) -> bool {
let Some(pmtud) = self.pmtud.as_mut() else {
return false;
};
(hs_confirmed && hs_done) &&
self.recovery.cwnd_available() > pmtud.get_probe_size() &&
out_len >= pmtud.get_probe_size() &&
pmtud.should_probe() &&
!is_closing &&
frames_empty
}
pub fn on_challenge_sent(&mut self) {
self.promote_to(PathState::Validating);
self.challenge_requested = false;
}
pub fn add_challenge_sent(
&mut self, data: [u8; 8], pkt_size: usize, sent_time: Instant,
) {
self.on_challenge_sent();
self.in_flight_challenges
.push_back((data, pkt_size, sent_time));
}
pub fn on_challenge_received(&mut self, data: [u8; 8]) {
if self.received_challenges.len() == self.received_challenges_max_len {
return;
}
self.received_challenges.push_back(data);
self.peer_verified_local_address = true;
}
pub fn has_pending_challenge(&self, data: [u8; 8]) -> bool {
self.in_flight_challenges.iter().any(|(d, ..)| *d == data)
}
pub fn on_response_received(&mut self, data: [u8; 8]) -> bool {
self.verified_peer_address = true;
self.probing_lost = 0;
let mut challenge_size = 0;
self.in_flight_challenges.retain(|(d, s, _)| {
if *d == data {
challenge_size = *s;
false
} else {
true
}
});
self.promote_to(PathState::ValidatingMTU);
self.max_challenge_size =
std::cmp::max(self.max_challenge_size, challenge_size);
if self.state == PathState::ValidatingMTU {
if self.max_challenge_size >= crate::MIN_CLIENT_INITIAL_LEN {
self.promote_to(PathState::Validated);
return true;
}
self.request_validation();
}
false
}
fn on_failed_validation(&mut self) {
self.state = PathState::Failed;
self.active = false;
}
#[inline]
pub fn pop_received_challenge(&mut self) -> Option<[u8; 8]> {
self.received_challenges.pop_front()
}
pub fn on_loss_detection_timeout(
&mut self, handshake_status: HandshakeStatus, now: Instant,
is_server: bool, trace_id: &str,
) -> OnLossDetectionTimeoutOutcome {
let outcome = self.recovery.on_loss_detection_timeout(
handshake_status,
now,
trace_id,
);
let mut lost_probe_time = None;
self.in_flight_challenges.retain(|(_, _, sent_time)| {
if *sent_time <= now {
if lost_probe_time.is_none() {
lost_probe_time = Some(*sent_time);
}
false
} else {
true
}
});
if let Some(lost_probe_time) = lost_probe_time {
self.last_probe_lost_time = match self.last_probe_lost_time {
Some(last) => {
if lost_probe_time - last >= self.recovery.rtt() {
self.probing_lost += 1;
Some(lost_probe_time)
} else {
Some(last)
}
},
None => {
self.probing_lost += 1;
Some(lost_probe_time)
},
};
if self.probing_lost >= crate::MAX_PROBING_TIMEOUTS ||
(is_server && self.max_send_bytes < crate::MIN_PROBING_SIZE)
{
self.on_failed_validation();
} else {
self.request_validation();
}
}
self.total_pto_count += 1;
outcome
}
pub fn can_reinit_recovery(&self) -> bool {
self.recovery.bytes_in_flight() == 0 &&
self.recovery.bytes_in_flight_duration() == Duration::ZERO
}
pub fn reinit_recovery(
&mut self, recovery_config: &recovery::RecoveryConfig,
) {
self.recovery = recovery::Recovery::new_with_config(recovery_config)
}
pub fn stats(&self) -> PathStats {
let pmtu = match self.pmtud.as_ref().map(|p| p.get_current_mtu()) {
Some(v) => v,
None => self.recovery.max_datagram_size(),
};
PathStats {
local_addr: self.local_addr,
peer_addr: self.peer_addr,
validation_state: self.state,
active: self.active,
recv: self.recv_count,
sent: self.sent_count,
lost: self.recovery.lost_count(),
retrans: self.retrans_count,
total_pto_count: self.total_pto_count,
dgram_recv: self.dgram_recv_count,
dgram_sent: self.dgram_sent_count,
rtt: self.recovery.rtt(),
min_rtt: self.recovery.min_rtt(),
max_rtt: self.recovery.max_rtt(),
rttvar: self.recovery.rttvar(),
cwnd: self.recovery.cwnd(),
sent_bytes: self.sent_bytes,
recv_bytes: self.recv_bytes,
lost_bytes: self.recovery.bytes_lost(),
stream_retrans_bytes: self.stream_retrans_bytes,
pmtu,
delivery_rate: self.recovery.delivery_rate().to_bytes_per_second(),
max_bandwidth: self
.recovery
.max_bandwidth()
.map(Bandwidth::to_bytes_per_second),
startup_exit: self.recovery.startup_exit(),
}
}
pub fn bytes_in_flight_duration(&self) -> Duration {
self.recovery.bytes_in_flight_duration()
}
}
#[derive(Default)]
pub struct SocketAddrIter {
pub(crate) sockaddrs: SmallVec<[SocketAddr; 8]>,
pub(crate) index: usize,
}
impl Iterator for SocketAddrIter {
type Item = SocketAddr;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
let v = self.sockaddrs.get(self.index)?;
self.index += 1;
Some(*v)
}
}
impl ExactSizeIterator for SocketAddrIter {
#[inline]
fn len(&self) -> usize {
self.sockaddrs.len() - self.index
}
}
pub struct PathMap {
paths: Slab<Path>,
max_concurrent_paths: usize,
addrs_to_paths: BTreeMap<(SocketAddr, SocketAddr), usize>,
events: VecDeque<PathEvent>,
is_server: bool,
}
impl PathMap {
pub fn new(
mut initial_path: Path, max_concurrent_paths: usize, is_server: bool,
) -> Self {
let mut paths = Slab::with_capacity(1); let mut addrs_to_paths = BTreeMap::new();
let local_addr = initial_path.local_addr;
let peer_addr = initial_path.peer_addr;
initial_path.active = true;
let active_path_id = paths.insert(initial_path);
addrs_to_paths.insert((local_addr, peer_addr), active_path_id);
Self {
paths,
max_concurrent_paths,
addrs_to_paths,
events: VecDeque::new(),
is_server,
}
}
#[inline]
pub fn get(&self, path_id: usize) -> Result<&Path> {
self.paths.get(path_id).ok_or(Error::InvalidState)
}
#[inline]
pub fn get_mut(&mut self, path_id: usize) -> Result<&mut Path> {
self.paths.get_mut(path_id).ok_or(Error::InvalidState)
}
#[inline]
pub fn get_active_with_pid(&self) -> Option<(usize, &Path)> {
self.paths.iter().find(|(_, p)| p.active())
}
#[inline]
pub fn get_active(&self) -> Result<&Path> {
self.get_active_with_pid()
.map(|(_, p)| p)
.ok_or(Error::InvalidState)
}
#[inline]
pub fn get_active_path_id(&self) -> Result<usize> {
self.get_active_with_pid()
.map(|(pid, _)| pid)
.ok_or(Error::InvalidState)
}
#[inline]
pub fn get_active_mut(&mut self) -> Result<&mut Path> {
self.paths
.iter_mut()
.map(|(_, p)| p)
.find(|p| p.active())
.ok_or(Error::InvalidState)
}
#[inline]
pub fn iter(&self) -> slab::Iter<'_, Path> {
self.paths.iter()
}
#[inline]
pub fn iter_mut(&mut self) -> slab::IterMut<'_, Path> {
self.paths.iter_mut()
}
#[inline]
pub fn len(&self) -> usize {
self.paths.len()
}
#[inline]
pub fn path_id_from_addrs(
&self, addrs: &(SocketAddr, SocketAddr),
) -> Option<usize> {
self.addrs_to_paths.get(addrs).copied()
}
fn make_room_for_new_path(&mut self) -> Result<()> {
if self.paths.len() < self.max_concurrent_paths {
return Ok(());
}
let (pid_to_remove, _) = self
.paths
.iter()
.find(|(_, p)| p.unused())
.ok_or(Error::Done)?;
let path = self.paths.remove(pid_to_remove);
self.addrs_to_paths
.remove(&(path.local_addr, path.peer_addr));
self.notify_event(PathEvent::Closed(path.local_addr, path.peer_addr));
Ok(())
}
pub fn insert_path(&mut self, path: Path, is_server: bool) -> Result<usize> {
self.make_room_for_new_path()?;
let local_addr = path.local_addr;
let peer_addr = path.peer_addr;
let pid = self.paths.insert(path);
self.addrs_to_paths.insert((local_addr, peer_addr), pid);
if is_server {
self.notify_event(PathEvent::New(local_addr, peer_addr));
}
Ok(pid)
}
pub fn notify_event(&mut self, ev: PathEvent) {
self.events.push_back(ev);
}
pub fn pop_event(&mut self) -> Option<PathEvent> {
self.events.pop_front()
}
pub fn notify_failed_validations(&mut self) {
let validation_failed = self
.paths
.iter_mut()
.filter(|(_, p)| p.validation_failed() && !p.failure_notified);
for (_, p) in validation_failed {
self.events.push_back(PathEvent::FailedValidation(
p.local_addr,
p.peer_addr,
));
p.failure_notified = true;
}
}
pub fn find_candidate_path(&self) -> Option<usize> {
self.paths
.iter()
.find(|(_, p)| p.usable())
.map(|(pid, _)| pid)
}
pub fn on_response_received(&mut self, data: [u8; 8]) -> Result<()> {
let active_pid = self.get_active_path_id()?;
let challenge_pending =
self.iter_mut().find(|(_, p)| p.has_pending_challenge(data));
if let Some((pid, p)) = challenge_pending {
if p.on_response_received(data) {
let local_addr = p.local_addr;
let peer_addr = p.peer_addr;
let was_migrating = p.migrating;
p.migrating = false;
self.notify_event(PathEvent::Validated(local_addr, peer_addr));
if pid == active_pid && was_migrating {
self.notify_event(PathEvent::PeerMigrated(
local_addr, peer_addr,
));
}
}
}
Ok(())
}
pub fn set_active_path(&mut self, path_id: usize) -> Result<()> {
let is_server = self.is_server;
if let Ok(old_active_path) = self.get_active_mut() {
old_active_path.active = false;
}
let new_active_path = self.get_mut(path_id)?;
new_active_path.active = true;
if is_server {
if new_active_path.validated() {
let local_addr = new_active_path.local_addr();
let peer_addr = new_active_path.peer_addr();
self.notify_event(PathEvent::PeerMigrated(local_addr, peer_addr));
} else {
new_active_path.migrating = true;
if !new_active_path.under_validation() {
new_active_path.request_validation();
}
}
}
Ok(())
}
pub fn set_discover_pmtu_on_existing_paths(
&mut self, discover: bool, max_send_udp_payload_size: usize,
pmtud_max_probes: u8,
) {
for (_, path) in self.paths.iter_mut() {
path.pmtud = if discover {
Some(pmtud::Pmtud::new(
max_send_udp_payload_size,
pmtud_max_probes,
))
} else {
None
};
}
}
}
#[derive(Clone)]
pub struct PathStats {
pub local_addr: SocketAddr,
pub peer_addr: SocketAddr,
pub validation_state: PathState,
pub active: bool,
pub recv: usize,
pub sent: usize,
pub lost: usize,
pub retrans: usize,
pub total_pto_count: usize,
pub dgram_recv: usize,
pub dgram_sent: usize,
pub rtt: Duration,
pub min_rtt: Option<Duration>,
pub max_rtt: Option<Duration>,
pub rttvar: Duration,
pub cwnd: usize,
pub sent_bytes: u64,
pub recv_bytes: u64,
pub lost_bytes: u64,
pub stream_retrans_bytes: u64,
pub pmtu: usize,
pub delivery_rate: u64,
pub max_bandwidth: Option<u64>,
pub startup_exit: Option<StartupExit>,
}
impl std::fmt::Debug for PathStats {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"local_addr={:?} peer_addr={:?} ",
self.local_addr, self.peer_addr,
)?;
write!(
f,
"validation_state={:?} active={} ",
self.validation_state, self.active,
)?;
write!(
f,
"recv={} sent={} lost={} retrans={} rtt={:?} min_rtt={:?} rttvar={:?} cwnd={}",
self.recv, self.sent, self.lost, self.retrans, self.rtt, self.min_rtt, self.rttvar, self.cwnd,
)?;
write!(
f,
" sent_bytes={} recv_bytes={} lost_bytes={}",
self.sent_bytes, self.recv_bytes, self.lost_bytes,
)?;
write!(
f,
" stream_retrans_bytes={} pmtu={} delivery_rate={}",
self.stream_retrans_bytes, self.pmtu, self.delivery_rate,
)
}
}
#[cfg(test)]
mod tests {
use crate::rand;
use crate::MIN_CLIENT_INITIAL_LEN;
use crate::recovery::RecoveryConfig;
use crate::Config;
use super::*;
#[test]
fn path_validation_limited_mtu() {
let client_addr = "127.0.0.1:1234".parse().unwrap();
let client_addr_2 = "127.0.0.1:5678".parse().unwrap();
let server_addr = "127.0.0.1:4321".parse().unwrap();
let config = Config::new(crate::PROTOCOL_VERSION).unwrap();
let recovery_config = RecoveryConfig::from_config(&config);
let path = Path::new(
client_addr,
server_addr,
&recovery_config,
config.path_challenge_recv_max_queue_len,
true,
None,
);
let mut path_mgr = PathMap::new(path, 2, false);
let probed_path = Path::new(
client_addr_2,
server_addr,
&recovery_config,
config.path_challenge_recv_max_queue_len,
false,
None,
);
path_mgr.insert_path(probed_path, false).unwrap();
let pid = path_mgr
.path_id_from_addrs(&(client_addr_2, server_addr))
.unwrap();
path_mgr.get_mut(pid).unwrap().request_validation();
assert!(path_mgr.get_mut(pid).unwrap().validation_requested());
assert!(path_mgr.get_mut(pid).unwrap().probing_required());
let data = rand::rand_u64().to_be_bytes();
path_mgr.get_mut(pid).unwrap().add_challenge_sent(
data,
MIN_CLIENT_INITIAL_LEN - 1,
Instant::now(),
);
assert!(!path_mgr.get_mut(pid).unwrap().validation_requested());
assert!(!path_mgr.get_mut(pid).unwrap().probing_required());
assert!(path_mgr.get_mut(pid).unwrap().under_validation());
assert!(!path_mgr.get_mut(pid).unwrap().validated());
assert_eq!(path_mgr.get_mut(pid).unwrap().state, PathState::Validating);
assert_eq!(path_mgr.pop_event(), None);
path_mgr.on_response_received(data).unwrap();
assert!(path_mgr.get_mut(pid).unwrap().validation_requested());
assert!(path_mgr.get_mut(pid).unwrap().probing_required());
assert!(path_mgr.get_mut(pid).unwrap().under_validation());
assert!(!path_mgr.get_mut(pid).unwrap().validated());
assert_eq!(
path_mgr.get_mut(pid).unwrap().state,
PathState::ValidatingMTU
);
assert_eq!(path_mgr.pop_event(), None);
let data = rand::rand_u64().to_be_bytes();
path_mgr.get_mut(pid).unwrap().add_challenge_sent(
data,
MIN_CLIENT_INITIAL_LEN,
Instant::now(),
);
path_mgr.on_response_received(data).unwrap();
assert!(!path_mgr.get_mut(pid).unwrap().validation_requested());
assert!(!path_mgr.get_mut(pid).unwrap().probing_required());
assert!(!path_mgr.get_mut(pid).unwrap().under_validation());
assert!(path_mgr.get_mut(pid).unwrap().validated());
assert_eq!(path_mgr.get_mut(pid).unwrap().state, PathState::Validated);
assert_eq!(
path_mgr.pop_event(),
Some(PathEvent::Validated(client_addr_2, server_addr))
);
}
#[test]
fn multiple_probes() {
let client_addr = "127.0.0.1:1234".parse().unwrap();
let server_addr = "127.0.0.1:4321".parse().unwrap();
let config = Config::new(crate::PROTOCOL_VERSION).unwrap();
let recovery_config = RecoveryConfig::from_config(&config);
let path = Path::new(
client_addr,
server_addr,
&recovery_config,
config.path_challenge_recv_max_queue_len,
true,
None,
);
let mut client_path_mgr = PathMap::new(path, 2, false);
let mut server_path = Path::new(
server_addr,
client_addr,
&recovery_config,
config.path_challenge_recv_max_queue_len,
false,
None,
);
let client_pid = client_path_mgr
.path_id_from_addrs(&(client_addr, server_addr))
.unwrap();
let data = rand::rand_u64().to_be_bytes();
client_path_mgr
.get_mut(client_pid)
.unwrap()
.add_challenge_sent(data, MIN_CLIENT_INITIAL_LEN, Instant::now());
let data_2 = rand::rand_u64().to_be_bytes();
client_path_mgr
.get_mut(client_pid)
.unwrap()
.add_challenge_sent(data_2, MIN_CLIENT_INITIAL_LEN, Instant::now());
assert_eq!(
client_path_mgr
.get(client_pid)
.unwrap()
.in_flight_challenges
.len(),
2
);
server_path.on_challenge_received(data);
assert_eq!(server_path.received_challenges.len(), 1);
server_path.on_challenge_received(data_2);
assert_eq!(server_path.received_challenges.len(), 2);
client_path_mgr.on_response_received(data).unwrap();
assert_eq!(
client_path_mgr
.get(client_pid)
.unwrap()
.in_flight_challenges
.len(),
1
);
client_path_mgr.on_response_received(data_2).unwrap();
assert_eq!(
client_path_mgr
.get(client_pid)
.unwrap()
.in_flight_challenges
.len(),
0
);
}
#[test]
fn too_many_probes() {
let client_addr = "127.0.0.1:1234".parse().unwrap();
let server_addr = "127.0.0.1:4321".parse().unwrap();
let config = Config::new(crate::PROTOCOL_VERSION).unwrap();
let recovery_config = RecoveryConfig::from_config(&config);
let path = Path::new(
client_addr,
server_addr,
&recovery_config,
config.path_challenge_recv_max_queue_len,
true,
None,
);
let mut client_path_mgr = PathMap::new(path, 2, false);
let mut server_path = Path::new(
server_addr,
client_addr,
&recovery_config,
config.path_challenge_recv_max_queue_len,
false,
None,
);
let client_pid = client_path_mgr
.path_id_from_addrs(&(client_addr, server_addr))
.unwrap();
let data = rand::rand_u64().to_be_bytes();
client_path_mgr
.get_mut(client_pid)
.unwrap()
.add_challenge_sent(data, MIN_CLIENT_INITIAL_LEN, Instant::now());
let data_2 = rand::rand_u64().to_be_bytes();
client_path_mgr
.get_mut(client_pid)
.unwrap()
.add_challenge_sent(data_2, MIN_CLIENT_INITIAL_LEN, Instant::now());
assert_eq!(
client_path_mgr
.get(client_pid)
.unwrap()
.in_flight_challenges
.len(),
2
);
let data_3 = rand::rand_u64().to_be_bytes();
client_path_mgr
.get_mut(client_pid)
.unwrap()
.add_challenge_sent(data_3, MIN_CLIENT_INITIAL_LEN, Instant::now());
assert_eq!(
client_path_mgr
.get(client_pid)
.unwrap()
.in_flight_challenges
.len(),
3
);
let data_4 = rand::rand_u64().to_be_bytes();
client_path_mgr
.get_mut(client_pid)
.unwrap()
.add_challenge_sent(data_4, MIN_CLIENT_INITIAL_LEN, Instant::now());
assert_eq!(
client_path_mgr
.get(client_pid)
.unwrap()
.in_flight_challenges
.len(),
4
);
server_path.on_challenge_received(data);
assert_eq!(server_path.received_challenges.len(), 1);
server_path.on_challenge_received(data_2);
assert_eq!(server_path.received_challenges.len(), 2);
server_path.on_challenge_received(data_3);
assert_eq!(server_path.received_challenges.len(), 3);
server_path.on_challenge_received(data_4);
assert_eq!(server_path.received_challenges.len(), 3);
client_path_mgr.on_response_received(data).unwrap();
assert_eq!(
client_path_mgr
.get(client_pid)
.unwrap()
.in_flight_challenges
.len(),
3
);
client_path_mgr.on_response_received(data_2).unwrap();
assert_eq!(
client_path_mgr
.get(client_pid)
.unwrap()
.in_flight_challenges
.len(),
2
);
client_path_mgr.on_response_received(data_3).unwrap();
assert_eq!(
client_path_mgr
.get(client_pid)
.unwrap()
.in_flight_challenges
.len(),
1
);
}
}