use std::collections::{HashMap, VecDeque};
use std::net::{SocketAddr, UdpSocket};
use std::time::{Duration, Instant};
use crate::networking::protocol::{
Packet, PacketEncoder, PacketDecoder, PacketKind, ProtocolError, PacketHeader,
};
pub const MTU: usize = 1400;
pub const RETRANSMIT_BASE_MS: u64 = 100;
pub const RETRANSMIT_MAX_MS: u64 = 8000;
pub const MAX_RETRANSMIT: u32 = 10;
pub const KEEPALIVE_MS: u64 = 500;
pub const PEER_TIMEOUT_MS: u64 = 10_000;
pub const RTT_ALPHA: f64 = 0.125;
pub const JITTER_ALPHA: f64 = 0.25;
pub const FRAGMENT_TIMEOUT_MS: u64 = 5_000;
pub const ACK_WINDOW: u32 = 32;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Channel {
Reliable,
Unreliable,
ReliableOrdered,
UnreliableOrdered,
}
impl Channel {
pub fn is_reliable(self) -> bool {
matches!(self, Channel::Reliable | Channel::ReliableOrdered)
}
pub fn is_ordered(self) -> bool {
matches!(self, Channel::ReliableOrdered | Channel::UnreliableOrdered)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
Disconnected,
Connecting,
Connected,
TimedOut,
Kicked,
}
#[derive(Debug, Clone, Default)]
pub struct TransportStats {
pub rtt_ms: f64,
pub packet_loss_pct: f64,
pub jitter_ms: f64,
pub bandwidth_up: f64,
pub bandwidth_down: f64,
pub packets_sent: u64,
pub packets_recv: u64,
pub retransmits: u64,
}
#[derive(Debug, Clone)]
pub struct ReceivedPacket {
pub from: SocketAddr,
pub packet: Packet,
}
pub struct NonBlockingSocket {
socket: UdpSocket,
pub local_addr: SocketAddr,
}
impl NonBlockingSocket {
pub fn bind(addr: SocketAddr) -> Result<Self, std::io::Error> {
let socket = UdpSocket::bind(addr)?;
socket.set_nonblocking(true)?;
let local_addr = socket.local_addr()?;
Ok(Self { socket, local_addr })
}
pub fn send_to(&self, data: &[u8], dest: SocketAddr) -> Result<usize, std::io::Error> {
self.socket.send_to(data, dest)
}
pub fn poll(&self, buf: &mut Vec<u8>) -> Vec<(SocketAddr, Vec<u8>)> {
let mut results = Vec::new();
buf.resize(65535, 0);
loop {
match self.socket.recv_from(buf) {
Ok((len, addr)) => {
results.push((addr, buf[..len].to_vec()));
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(_) => break,
}
}
results
}
}
#[derive(Debug, Clone)]
struct FragmentHeader {
packet_id: u16,
fragment_idx: u8,
total_fragments: u8,
}
impl FragmentHeader {
const SIZE: usize = 4;
fn encode(&self) -> [u8; Self::SIZE] {
let id_bytes = self.packet_id.to_be_bytes();
[id_bytes[0], id_bytes[1], self.fragment_idx, self.total_fragments]
}
fn decode(b: &[u8]) -> Option<Self> {
if b.len() < Self::SIZE { return None; }
Some(Self {
packet_id: u16::from_be_bytes([b[0], b[1]]),
fragment_idx: b[2],
total_fragments: b[3],
})
}
}
#[derive(Debug)]
struct PartialMessage {
total_fragments: u8,
received: Vec<Option<Vec<u8>>>,
created_at: Instant,
}
impl PartialMessage {
fn new(total: u8) -> Self {
Self {
total_fragments: total,
received: vec![None; total as usize],
created_at: Instant::now(),
}
}
fn is_complete(&self) -> bool {
self.received.iter().all(|s| s.is_some())
}
fn is_expired(&self) -> bool {
self.created_at.elapsed() > Duration::from_millis(FRAGMENT_TIMEOUT_MS)
}
fn reassemble(&self) -> Vec<u8> {
self.received.iter().flat_map(|s| s.as_ref().unwrap().iter().copied()).collect()
}
}
pub struct Fragmenter {
next_packet_id: u16,
reassembly: HashMap<(u64, u16), PartialMessage>,
}
impl Fragmenter {
pub fn new() -> Self {
Self {
next_packet_id: 0,
reassembly: HashMap::new(),
}
}
pub fn fragment(&mut self, data: &[u8]) -> Vec<Vec<u8>> {
let max_body = MTU - PacketHeader::SIZE - FragmentHeader::SIZE;
let chunks: Vec<&[u8]> = data.chunks(max_body).collect();
let total = chunks.len().min(255) as u8;
let id = self.next_packet_id;
self.next_packet_id = self.next_packet_id.wrapping_add(1);
chunks.iter().enumerate().take(255).map(|(i, chunk)| {
let fh = FragmentHeader {
packet_id: id,
fragment_idx: i as u8,
total_fragments: total,
};
let mut out = Vec::with_capacity(FragmentHeader::SIZE + chunk.len());
out.extend_from_slice(&fh.encode());
out.extend_from_slice(chunk);
out
}).collect()
}
pub fn receive_fragment(&mut self, peer_key: u64, raw: &[u8]) -> Option<Vec<u8>> {
let fh = FragmentHeader::decode(raw)?;
let body = raw[FragmentHeader::SIZE..].to_vec();
let entry = self.reassembly
.entry((peer_key, fh.packet_id))
.or_insert_with(|| PartialMessage::new(fh.total_fragments));
if fh.fragment_idx as usize >= entry.received.len() {
return None; }
entry.received[fh.fragment_idx as usize] = Some(body);
if entry.is_complete() {
let data = entry.reassemble();
self.reassembly.remove(&(peer_key, fh.packet_id));
Some(data)
} else {
None
}
}
pub fn gc(&mut self) {
self.reassembly.retain(|_, v| !v.is_expired());
}
}
impl Default for Fragmenter {
fn default() -> Self { Self::new() }
}
#[derive(Debug, Clone)]
struct SendEntry {
sequence: u32,
data: Vec<u8>,
sent_at: Instant,
next_retransmit: Instant,
retransmit_count: u32,
retransmit_delay: Duration,
}
impl SendEntry {
fn new(sequence: u32, data: Vec<u8>, now: Instant) -> Self {
let delay = Duration::from_millis(RETRANSMIT_BASE_MS);
Self {
sequence,
data,
sent_at: now,
next_retransmit: now + delay,
retransmit_count: 0,
retransmit_delay: delay,
}
}
fn backoff(&mut self, now: Instant) {
self.retransmit_count += 1;
self.retransmit_delay = Duration::from_millis(
(self.retransmit_delay.as_millis() as u64 * 2).min(RETRANSMIT_MAX_MS),
);
self.next_retransmit = now + self.retransmit_delay;
}
fn is_due(&self, now: Instant) -> bool {
now >= self.next_retransmit
}
}
struct ReorderBuffer {
next_expected: u32,
buffer: HashMap<u32, Packet>,
max_hold: usize,
}
impl ReorderBuffer {
fn new() -> Self {
Self { next_expected: 0, buffer: HashMap::new(), max_hold: 64 }
}
fn insert(&mut self, pkt: Packet) -> Vec<Packet> {
let seq = pkt.sequence;
if seq == self.next_expected {
let mut out = vec![pkt];
self.next_expected = self.next_expected.wrapping_add(1);
loop {
if let Some(next) = self.buffer.remove(&self.next_expected) {
out.push(next);
self.next_expected = self.next_expected.wrapping_add(1);
} else {
break;
}
}
out
} else {
self.buffer.insert(seq, pkt);
if self.buffer.len() > self.max_hold {
let mut all: Vec<Packet> = self.buffer.drain().map(|(_, p)| p).collect();
all.sort_by_key(|p| p.sequence);
if let Some(last) = all.last() {
self.next_expected = last.sequence.wrapping_add(1);
}
return all;
}
Vec::new()
}
}
fn reset(&mut self) {
self.next_expected = 0;
self.buffer.clear();
}
}
struct AckAccumulator {
last_received: u32,
ack_bits: u32,
}
impl AckAccumulator {
fn new() -> Self { Self { last_received: 0, ack_bits: 0 } }
fn record(&mut self, seq: u32) {
let diff = self.last_received.wrapping_sub(seq);
if seq == self.last_received {
} else if seq.wrapping_sub(self.last_received) < 0x8000_0000 {
let advance = seq.wrapping_sub(self.last_received);
if advance >= 32 {
self.ack_bits = 0;
} else {
self.ack_bits <<= advance;
self.ack_bits |= 1 << (advance - 1);
}
self.last_received = seq;
} else if diff < 32 {
self.ack_bits |= 1 << (diff - 1);
}
}
fn ack(&self) -> u32 { self.last_received }
fn ack_bits(&self) -> u32 { self.ack_bits }
}
struct CongestionControl {
pub cwnd: u32,
ssthresh: u32,
}
impl CongestionControl {
fn new() -> Self { Self { cwnd: 16, ssthresh: 64 } }
fn on_ack(&mut self) {
if self.cwnd < self.ssthresh {
self.cwnd = (self.cwnd + 2).min(256);
} else {
self.cwnd = (self.cwnd + 1).min(256);
}
}
fn on_loss(&mut self) {
self.ssthresh = (self.cwnd / 2).max(4);
self.cwnd = self.ssthresh;
}
fn can_send(&self, in_flight: u32) -> bool {
in_flight < self.cwnd
}
}
pub struct ReliableUdp {
pub peer_addr: SocketAddr,
state: ConnectionState,
next_sequence: u32,
ack_accum: AckAccumulator,
send_queue: VecDeque<SendEntry>,
reorder_buf: ReorderBuffer,
congestion: CongestionControl,
rtt_ms: f64,
jitter_ms: f64,
last_recv: Instant,
last_keepalive: Instant,
ping_map: HashMap<u32, Instant>,
encoder: PacketEncoder,
decoder: PacketDecoder,
stats: TransportStats,
in_flight: u32,
}
impl ReliableUdp {
pub fn new(peer_addr: SocketAddr) -> Self {
let now = Instant::now();
Self {
peer_addr,
state: ConnectionState::Connecting,
next_sequence: 0,
ack_accum: AckAccumulator::new(),
send_queue: VecDeque::new(),
reorder_buf: ReorderBuffer::new(),
congestion: CongestionControl::new(),
rtt_ms: 50.0,
jitter_ms: 0.0,
last_recv: now,
last_keepalive: now,
ping_map: HashMap::new(),
encoder: PacketEncoder { reliable: true, ..PacketEncoder::default() },
decoder: PacketDecoder::new(),
stats: TransportStats::default(),
in_flight: 0,
}
}
pub fn state(&self) -> ConnectionState { self.state }
pub fn stats(&self) -> &TransportStats { &self.stats }
pub fn rtt_ms(&self) -> f64 { self.rtt_ms }
fn next_seq(&mut self) -> u32 {
let s = self.next_sequence;
self.next_sequence = self.next_sequence.wrapping_add(1);
s
}
pub fn send_reliable(&mut self, socket: &NonBlockingSocket, mut packet: Packet) {
packet.sequence = self.next_seq();
packet.ack = self.ack_accum.ack();
packet.ack_bits = self.ack_accum.ack_bits();
packet.flags |= PacketHeader::FLAG_RELIABLE;
if let Ok(data) = self.encoder.encode(&packet) {
if self.congestion.can_send(self.in_flight) {
let _ = socket.send_to(&data, self.peer_addr);
self.in_flight += 1;
self.stats.packets_sent += 1;
self.stats.bandwidth_up += data.len() as f64;
if packet.kind == PacketKind::Ping {
self.ping_map.insert(packet.sequence, Instant::now());
}
let entry = SendEntry::new(packet.sequence, data, Instant::now());
self.send_queue.push_back(entry);
} else {
let entry = SendEntry::new(packet.sequence, data, Instant::now());
self.send_queue.push_back(entry);
}
}
}
pub fn send_unreliable(&mut self, socket: &NonBlockingSocket, mut packet: Packet) {
packet.sequence = self.next_seq();
packet.ack = self.ack_accum.ack();
packet.ack_bits = self.ack_accum.ack_bits();
if let Ok(data) = self.encoder.encode(&packet) {
let _ = socket.send_to(&data, self.peer_addr);
self.stats.packets_sent += 1;
self.stats.bandwidth_up += data.len() as f64;
}
}
pub fn receive(&mut self, raw: &[u8]) -> Vec<Packet> {
self.last_recv = Instant::now();
self.stats.bandwidth_down += raw.len() as f64;
let (pkt, _) = match self.decoder.decode(raw) {
Ok(p) => p,
Err(_) => return Vec::new(),
};
self.stats.packets_recv += 1;
self.process_acks(pkt.ack, pkt.ack_bits);
self.ack_accum.record(pkt.sequence);
match pkt.kind {
PacketKind::Pong => {
self.handle_pong(&pkt);
return Vec::new();
}
PacketKind::Heartbeat => {
if self.state == ConnectionState::Connecting {
self.state = ConnectionState::Connected;
}
return Vec::new();
}
PacketKind::Disconnect => {
self.state = ConnectionState::Disconnected;
return Vec::new();
}
PacketKind::Connect => {
self.state = ConnectionState::Connected;
return Vec::new();
}
_ => {}
}
if self.state == ConnectionState::Connecting {
self.state = ConnectionState::Connected;
}
if pkt.is_reliable() || pkt.flags & PacketHeader::FLAG_ORDERED != 0 {
self.reorder_buf.insert(pkt)
} else {
vec![pkt]
}
}
fn process_acks(&mut self, ack: u32, ack_bits: u32) {
let mut acked_seqs = Vec::new();
acked_seqs.push(ack);
for i in 0..32u32 {
if ack_bits & (1 << i) != 0 {
acked_seqs.push(ack.wrapping_sub(i + 1));
}
}
let now = Instant::now();
let mut any_acked = false;
self.send_queue.retain(|entry| {
if acked_seqs.contains(&entry.sequence) {
any_acked = true;
if let Some(&sent_at) = self.ping_map.get(&entry.sequence) {
let rtt = now.duration_since(sent_at).as_secs_f64() * 1000.0;
let err = (rtt - self.rtt_ms).abs();
self.jitter_ms = JITTER_ALPHA * err + (1.0 - JITTER_ALPHA) * self.jitter_ms;
self.rtt_ms = RTT_ALPHA * rtt + (1.0 - RTT_ALPHA) * self.rtt_ms;
self.ping_map.remove(&entry.sequence);
}
self.in_flight = self.in_flight.saturating_sub(1);
false } else {
true }
});
if any_acked {
self.congestion.on_ack();
}
let loss = self.estimate_packet_loss(ack, ack_bits);
self.stats.packet_loss_pct = loss;
self.stats.rtt_ms = self.rtt_ms;
self.stats.jitter_ms = self.jitter_ms;
}
fn estimate_packet_loss(&self, _ack: u32, ack_bits: u32) -> f64 {
let received = ack_bits.count_ones();
let window = 32u32;
let lost = window - received;
(lost as f64 / window as f64) * 100.0
}
fn handle_pong(&mut self, pkt: &Packet) {
if pkt.payload.len() < 8 { return; }
let ping_seq_bytes: [u8; 8] = pkt.payload[0..8].try_into().unwrap_or_default();
let _ping_ts = u64::from_be_bytes(ping_seq_bytes);
}
pub fn tick(&mut self, socket: &NonBlockingSocket) {
let now = Instant::now();
if now.duration_since(self.last_recv) > Duration::from_millis(PEER_TIMEOUT_MS) {
self.state = ConnectionState::TimedOut;
return;
}
let mut lost_count = 0u32;
for entry in self.send_queue.iter_mut() {
if entry.is_due(now) {
if entry.retransmit_count >= MAX_RETRANSMIT {
lost_count += 1;
continue;
}
let _ = socket.send_to(&entry.data, self.peer_addr);
self.stats.retransmits += 1;
entry.backoff(now);
}
}
self.send_queue.retain(|e| e.retransmit_count < MAX_RETRANSMIT);
if lost_count > 0 {
self.congestion.on_loss();
self.in_flight = self.in_flight.saturating_sub(lost_count);
}
if now.duration_since(self.last_keepalive) > Duration::from_millis(KEEPALIVE_MS) {
self.last_keepalive = now;
let seq = self.next_seq();
let hb = Packet::heartbeat(seq, self.ack_accum.ack(), self.ack_accum.ack_bits());
if let Ok(data) = self.encoder.encode(&hb) {
let _ = socket.send_to(&data, self.peer_addr);
self.stats.packets_sent += 1;
}
}
}
pub fn disconnect(&mut self, socket: &NonBlockingSocket) {
let seq = self.next_seq();
let pkt = Packet::new(
PacketKind::Disconnect, seq,
self.ack_accum.ack(), self.ack_accum.ack_bits(), Vec::new(),
);
if let Ok(data) = self.encoder.encode(&pkt) {
let _ = socket.send_to(&data, self.peer_addr);
}
self.state = ConnectionState::Disconnected;
}
pub fn reset_reorder(&mut self) {
self.reorder_buf.reset();
}
}
pub struct ConnectionManager {
socket: NonBlockingSocket,
peers: HashMap<SocketAddr, ReliableUdp>,
fragmenter: Fragmenter,
encoder: PacketEncoder,
recv_buf: Vec<u8>,
}
impl ConnectionManager {
pub fn bind(local_addr: SocketAddr) -> Result<Self, std::io::Error> {
Ok(Self {
socket: NonBlockingSocket::bind(local_addr)?,
peers: HashMap::new(),
fragmenter: Fragmenter::new(),
encoder: PacketEncoder::new(),
recv_buf: vec![0u8; 65535],
})
}
pub fn connect(&mut self, addr: SocketAddr) {
let mut conn = ReliableUdp::new(addr);
let pkt = Packet::new(PacketKind::Connect, 0, 0, 0, Vec::new());
conn.send_reliable(&self.socket, pkt);
self.peers.insert(addr, conn);
}
pub fn disconnect(&mut self, addr: SocketAddr) {
if let Some(conn) = self.peers.get_mut(&addr) {
conn.disconnect(&self.socket);
}
self.peers.remove(&addr);
}
pub fn send(&mut self, addr: SocketAddr, channel: Channel, data: Vec<u8>) {
let needs_fragment = data.len() > MTU - PacketHeader::SIZE;
let peer = self.peers.entry(addr).or_insert_with(|| ReliableUdp::new(addr));
if needs_fragment {
let frags = self.fragmenter.fragment(&data);
for frag in frags {
let mut pkt = Packet::new(
PacketKind::StateUpdate,
0, 0, 0, frag,
);
pkt.flags |= PacketHeader::FLAG_FRAGMENTED;
if channel.is_reliable() {
peer.send_reliable(&self.socket, pkt);
} else {
peer.send_unreliable(&self.socket, pkt);
}
}
} else {
let pkt = Packet::new(PacketKind::StateUpdate, 0, 0, 0, data);
if channel.is_reliable() {
peer.send_reliable(&self.socket, pkt);
} else {
peer.send_unreliable(&self.socket, pkt);
}
}
}
pub fn send_packet(&mut self, addr: SocketAddr, channel: Channel, packet: Packet) {
let peer = self.peers.entry(addr).or_insert_with(|| ReliableUdp::new(addr));
if channel.is_reliable() {
peer.send_reliable(&self.socket, packet);
} else {
peer.send_unreliable(&self.socket, packet);
}
}
pub fn poll(&mut self) -> Vec<ReceivedPacket> {
let mut out = Vec::new();
let datagrams = self.socket.poll(&mut self.recv_buf);
for (addr, raw) in datagrams {
let peer = self.peers.entry(addr).or_insert_with(|| ReliableUdp::new(addr));
let packets = peer.receive(&raw);
for pkt in packets {
out.push(ReceivedPacket { from: addr, packet: pkt });
}
}
for conn in self.peers.values_mut() {
conn.tick(&self.socket);
}
self.peers.retain(|_, conn| {
!matches!(conn.state(), ConnectionState::TimedOut | ConnectionState::Disconnected)
});
self.fragmenter.gc();
out
}
pub fn peer_state(&self, addr: SocketAddr) -> Option<ConnectionState> {
self.peers.get(&addr).map(|c| c.state())
}
pub fn peer_stats(&self, addr: SocketAddr) -> Option<&TransportStats> {
self.peers.get(&addr).map(|c| c.stats())
}
pub fn peer_count(&self) -> usize {
self.peers.len()
}
pub fn peer_addrs(&self) -> Vec<SocketAddr> {
self.peers.keys().copied().collect()
}
pub fn broadcast(&mut self, channel: Channel, packet: Packet) {
let addrs: Vec<SocketAddr> = self.peers.keys().copied().collect();
for addr in addrs {
self.send_packet(addr, channel, packet.clone());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::SocketAddr;
fn loopback(port: u16) -> SocketAddr {
format!("127.0.0.1:{port}").parse().unwrap()
}
#[test]
fn test_channel_flags() {
assert!(Channel::Reliable.is_reliable());
assert!(Channel::ReliableOrdered.is_reliable());
assert!(!Channel::Unreliable.is_reliable());
assert!(!Channel::UnreliableOrdered.is_reliable());
assert!(Channel::ReliableOrdered.is_ordered());
assert!(Channel::UnreliableOrdered.is_ordered());
assert!(!Channel::Reliable.is_ordered());
assert!(!Channel::Unreliable.is_ordered());
}
#[test]
fn test_fragmenter_roundtrip_small() {
let mut f = Fragmenter::new();
let data = vec![0xABu8; 100];
let frags = f.fragment(&data);
assert_eq!(frags.len(), 1);
let result = f.receive_fragment(1, &frags[0]);
assert!(result.is_some());
assert_eq!(result.unwrap(), data);
}
#[test]
fn test_fragmenter_roundtrip_large() {
let mut f = Fragmenter::new();
let data: Vec<u8> = (0..4000).map(|i| (i % 251) as u8).collect();
let frags = f.fragment(&data);
assert!(frags.len() > 1);
let mut assembled = None;
for frag in frags {
assembled = f.receive_fragment(99, &frag);
}
assert!(assembled.is_some());
assert_eq!(assembled.unwrap(), data);
}
#[test]
fn test_fragmenter_out_of_order() {
let mut f = Fragmenter::new();
let data: Vec<u8> = (0..4000).map(|i| (i % 127) as u8).collect();
let mut frags = f.fragment(&data);
frags.reverse();
let mut assembled = None;
for frag in frags {
assembled = f.receive_fragment(7, &frag);
}
assert!(assembled.is_some());
assert_eq!(assembled.unwrap().len(), data.len());
}
#[test]
fn test_ack_accumulator_basic() {
let mut acc = AckAccumulator::new();
acc.record(0);
acc.record(1);
acc.record(2);
assert_eq!(acc.ack(), 2);
assert!(acc.ack_bits() & 1 != 0); assert!(acc.ack_bits() & 2 != 0); }
#[test]
fn test_congestion_control_aimd() {
let mut cc = CongestionControl::new();
let initial = cc.cwnd;
cc.on_ack();
cc.on_ack();
assert!(cc.cwnd >= initial); let before_loss = cc.cwnd;
cc.on_loss();
assert!(cc.cwnd < before_loss); }
#[test]
fn test_reorder_buffer_in_order() {
let mut rb = ReorderBuffer::new();
let p0 = Packet::new(PacketKind::StateUpdate, 0, 0, 0, vec![]);
let p1 = Packet::new(PacketKind::StateUpdate, 1, 0, 0, vec![]);
let out0 = rb.insert(p0);
let out1 = rb.insert(p1);
assert_eq!(out0.len(), 1);
assert_eq!(out1.len(), 1);
}
#[test]
fn test_reorder_buffer_out_of_order() {
let mut rb = ReorderBuffer::new();
let p0 = Packet::new(PacketKind::StateUpdate, 0, 0, 0, vec![1]);
let p2 = Packet::new(PacketKind::StateUpdate, 2, 0, 0, vec![3]);
let p1 = Packet::new(PacketKind::StateUpdate, 1, 0, 0, vec![2]);
let out_p0 = rb.insert(p0); let out_p2 = rb.insert(p2); assert_eq!(out_p0.len(), 1);
assert_eq!(out_p2.len(), 0);
let out_p1 = rb.insert(p1); assert_eq!(out_p1.len(), 2);
}
#[test]
fn test_connection_manager_bind() {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mgr = ConnectionManager::bind(addr);
assert!(mgr.is_ok());
}
#[test]
fn test_connection_manager_peer_count() {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut mgr = ConnectionManager::bind(addr).unwrap();
assert_eq!(mgr.peer_count(), 0);
}
#[test]
fn test_non_blocking_socket_bind() {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let sock = NonBlockingSocket::bind(addr);
assert!(sock.is_ok());
let sock = sock.unwrap();
assert_ne!(sock.local_addr.port(), 0);
}
#[test]
fn test_send_entry_backoff_growth() {
let mut entry = SendEntry::new(1, vec![0u8; 10], Instant::now());
let d0 = entry.retransmit_delay;
entry.backoff(Instant::now());
let d1 = entry.retransmit_delay;
entry.backoff(Instant::now());
let d2 = entry.retransmit_delay;
assert!(d1 >= d0);
assert!(d2 >= d1);
assert!(d2.as_millis() <= RETRANSMIT_MAX_MS as u128);
}
}