1pub mod ack_queue;
2pub mod ordering_channels;
3pub mod priority;
4pub mod reliable_tracker;
5pub mod split_assembler;
6pub mod state;
7pub mod tunables;
8
9pub use priority::RakPriority;
10pub use state::SessionState;
11
12use std::cmp::Ordering;
13use std::collections::{BTreeMap, BinaryHeap, HashMap, VecDeque};
14use std::time::{Duration, Instant};
15
16use bytes::Bytes;
17use zeroize::Zeroize;
18
19use ack_queue::AckQueue;
20use ordering_channels::{OrderedResult, OrderingChannels, SequencedResult};
21use split_assembler::SplitAssembler;
22
23use crate::error::DecodeError;
24use crate::protocol::ack::{AckNackPayload, SequenceRange};
25use crate::protocol::constants::{DatagramFlags, MAX_ACK_SEQUENCES, RAKNET_DATAGRAM_HEADER_SIZE};
26use crate::protocol::datagram::{Datagram, DatagramHeader, DatagramPayload};
27use crate::protocol::frame::Frame;
28use crate::protocol::frame_header::FrameHeader;
29use crate::protocol::reliability::Reliability;
30use crate::protocol::sequence24::Sequence24;
31
32use self::reliable_tracker::ReliableTracker;
33use self::tunables::{AckNackPriority, BackpressureMode, SessionTunables};
34
35#[derive(Debug, Clone)]
36pub struct TrackedDatagram {
37 pub datagram: Datagram,
38 pub send_time: Instant,
39 pub next_send: Instant,
40 pub retries: u32,
41 pub nack_resend_pending: bool,
42 pub resendable: bool,
43 pub receipt_ids: Vec<u64>,
44}
45
46#[derive(Debug, Default, Clone)]
47pub struct ReceiptProgress {
48 pub acked: usize,
49 pub nacked: usize,
50 pub acked_receipt_ids: Vec<u64>,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum QueuePayloadResult {
55 Enqueued { reliable_bytes: usize },
56 Dropped,
57 Deferred,
58 DisconnectRequested,
59}
60
61#[derive(Debug, Default, Clone, Copy)]
62struct SessionMetrics {
63 ingress_datagrams: u64,
64 ingress_frames: u64,
65 duplicate_reliable_drops: u64,
66 ordered_stale_drops: u64,
67 ordered_buffer_full_drops: u64,
68 sequenced_stale_drops: u64,
69 sequenced_missing_index_drops: u64,
70 reliable_sent_datagrams: u64,
71 resent_datagrams: u64,
72 ack_out_datagrams: u64,
73 nack_out_datagrams: u64,
74 acked_datagrams: u64,
75 nacked_datagrams: u64,
76 split_ttl_drops: u64,
77 outgoing_queue_drops: u64,
78 outgoing_queue_defers: u64,
79 outgoing_queue_disconnects: u64,
80 backpressure_delays: u64,
81 backpressure_drops: u64,
82 backpressure_disconnects: u64,
83}
84
85#[derive(Debug, Default, Clone, Copy)]
86pub struct SessionMetricsSnapshot {
87 pub ingress_datagrams: u64,
88 pub ingress_frames: u64,
89 pub duplicate_reliable_drops: u64,
90 pub ordered_stale_drops: u64,
91 pub ordered_buffer_full_drops: u64,
92 pub sequenced_stale_drops: u64,
93 pub sequenced_missing_index_drops: u64,
94 pub reliable_sent_datagrams: u64,
95 pub resent_datagrams: u64,
96 pub ack_out_datagrams: u64,
97 pub nack_out_datagrams: u64,
98 pub acked_datagrams: u64,
99 pub nacked_datagrams: u64,
100 pub split_ttl_drops: u64,
101 pub pending_outgoing_frames: usize,
102 pub pending_outgoing_bytes: usize,
103 pub outgoing_queue_drops: u64,
104 pub outgoing_queue_defers: u64,
105 pub outgoing_queue_disconnects: u64,
106 pub backpressure_delays: u64,
107 pub backpressure_drops: u64,
108 pub backpressure_disconnects: u64,
109 pub srtt_ms: f64,
110 pub rttvar_ms: f64,
111 pub resend_rto_ms: f64,
112 pub congestion_window_packets: f64,
113 pub resend_ratio: f64,
114 pub pacing_budget_bytes: f64,
115 pub pacing_rate_bytes_per_sec: f64,
116}
117
118#[derive(Debug, Clone)]
119struct QueuedFrame {
120 weight: u64,
121 encoded_size: usize,
122 is_reliable: bool,
123 priority: RakPriority,
124 receipt_id: Option<u64>,
125 frame: Frame,
126}
127
128#[derive(Debug, Clone, Copy, PartialEq, Eq)]
129enum BackpressureAction {
130 Allow,
131 Drop,
132 Defer,
133 Disconnect,
134}
135
136impl PartialEq for QueuedFrame {
137 fn eq(&self, other: &Self) -> bool {
138 self.weight == other.weight
139 }
140}
141
142impl Eq for QueuedFrame {}
143
144impl PartialOrd for QueuedFrame {
145 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
146 Some(self.cmp(other))
147 }
148}
149
150impl Ord for QueuedFrame {
151 fn cmp(&self, other: &Self) -> Ordering {
152 other.weight.cmp(&self.weight)
154 }
155}
156
157pub struct Session {
158 state: SessionState,
159 mtu: usize,
160 last_activity: Instant,
161 last_keepalive_sent: Instant,
162 datagram_read_index: Sequence24,
163 datagram_write_index: Sequence24,
164 reliable_write_index: Sequence24,
165 split_write_index: u16,
166 ordering_write_index: Vec<Sequence24>,
167 sequencing_write_index: Vec<Sequence24>,
168 reliable_tracker: ReliableTracker,
169 split_assembler: SplitAssembler,
170 ordering: OrderingChannels,
171 outgoing_heap: BinaryHeap<QueuedFrame>,
172 outgoing_next_weights: [u64; 4],
173 last_min_weight: u64,
174 outgoing_acks: AckQueue,
175 outgoing_nacks: AckQueue,
176 ack_flush_interval: Duration,
177 nack_flush_interval: Duration,
178 ack_max_ranges_per_datagram: usize,
179 nack_max_ranges_per_datagram: usize,
180 ack_nack_priority: AckNackPriority,
181 next_ack_flush_at: Instant,
182 next_nack_flush_at: Instant,
183 incoming_acks: VecDeque<SequenceRange>,
184 incoming_nacks: VecDeque<SequenceRange>,
185 sent_datagrams: BTreeMap<Sequence24, TrackedDatagram>,
186 receipt_tracking: HashMap<u64, usize>,
187 resend_rto: Duration,
188 min_resend_rto: Duration,
189 max_resend_rto: Duration,
190 srtt_ms: Option<f64>,
191 rttvar_ms: f64,
192 congestion_window_packets: f64,
193 min_congestion_window_packets: f64,
194 max_congestion_window_packets: f64,
195 slow_start_threshold_packets: f64,
196 congestion_additive_gain: f64,
197 congestion_multiplicative_decrease_nack: f64,
198 congestion_multiplicative_decrease_timeout: f64,
199 high_rtt_threshold_ms: f64,
200 high_rtt_additive_scale: f64,
201 nack_loss_backoff_cooldown: Duration,
202 next_nack_loss_backoff: Instant,
203 pacing_enabled: bool,
204 pacing_gain: f64,
205 pacing_min_rate_bytes_per_sec: f64,
206 pacing_max_rate_bytes_per_sec: f64,
207 pacing_budget_max_bytes: f64,
208 pacing_budget_bytes: f64,
209 pacing_rate_bytes_per_sec: f64,
210 last_pacing_update: Instant,
211 outgoing_queued_bytes: usize,
212 outgoing_queue_max_frames: usize,
213 outgoing_queue_max_bytes: usize,
214 outgoing_queue_soft_ratio: f64,
215 backpressure_mode: BackpressureMode,
216 best_effort_zeroize_dropped_payloads: bool,
217 disconnect_requested_by_backpressure: bool,
218 metrics: SessionMetrics,
219}
220
221impl Session {
222 pub fn new(mtu: usize) -> Self {
223 Self::with_tunables(mtu, SessionTunables::default())
224 }
225
226 pub fn with_tunables(mtu: usize, tunables: SessionTunables) -> Self {
227 let now = Instant::now();
228 let congestion = tunables.resolved_congestion_settings();
229 let min_cwnd = congestion.min_congestion_window.max(1.0);
230 let max_cwnd = congestion.max_congestion_window.max(min_cwnd);
231 let initial_cwnd = congestion
232 .initial_congestion_window
233 .clamp(min_cwnd, max_cwnd)
234 .max(1.0);
235 let slow_start_threshold = congestion
236 .congestion_slow_start_threshold
237 .clamp(min_cwnd, max_cwnd);
238 let pacing_min_rate = tunables.pacing_min_rate_bytes_per_sec.max(1.0);
239 let pacing_max_rate = tunables.pacing_max_rate_bytes_per_sec.max(pacing_min_rate);
240 let pacing_budget_max = tunables.pacing_max_burst_bytes.max(1) as f64;
241 let initial_pacing_budget = if tunables.pacing_start_full {
242 pacing_budget_max
243 } else {
244 0.0
245 };
246 let ack_nack_flush = tunables.resolved_ack_nack_flush_settings();
247 let mut s = Self {
248 state: SessionState::Offline,
249 mtu,
250 last_activity: now,
251 last_keepalive_sent: now,
252 datagram_read_index: Sequence24::new(0),
253 datagram_write_index: Sequence24::new(0),
254 reliable_write_index: Sequence24::new(0),
255 split_write_index: 0,
256 ordering_write_index: vec![Sequence24::new(0); 16],
257 sequencing_write_index: vec![Sequence24::new(0); 16],
258 reliable_tracker: ReliableTracker::new(tunables.reliable_window),
259 split_assembler: SplitAssembler::new(
260 tunables.split_ttl,
261 tunables.max_split_parts,
262 tunables.max_concurrent_splits,
263 ),
264 ordering: OrderingChannels::new(
265 tunables.max_ordering_channels,
266 tunables.max_ordered_pending_per_channel,
267 tunables.max_order_gap,
268 ),
269 outgoing_heap: BinaryHeap::new(),
270 outgoing_next_weights: [0; 4],
271 last_min_weight: 0,
272 outgoing_acks: AckQueue::new(tunables.ack_queue_capacity),
273 outgoing_nacks: AckQueue::new(tunables.ack_queue_capacity),
274 ack_flush_interval: ack_nack_flush.ack_flush_interval,
275 nack_flush_interval: ack_nack_flush.nack_flush_interval,
276 ack_max_ranges_per_datagram: ack_nack_flush.ack_max_ranges_per_datagram,
277 nack_max_ranges_per_datagram: ack_nack_flush.nack_max_ranges_per_datagram,
278 ack_nack_priority: ack_nack_flush.ack_nack_priority,
279 next_ack_flush_at: now,
280 next_nack_flush_at: now,
281 incoming_acks: VecDeque::new(),
282 incoming_nacks: VecDeque::new(),
283 sent_datagrams: BTreeMap::new(),
284 receipt_tracking: HashMap::new(),
285 resend_rto: congestion
286 .resend_rto
287 .clamp(congestion.min_resend_rto, congestion.max_resend_rto),
288 min_resend_rto: congestion.min_resend_rto,
289 max_resend_rto: congestion.max_resend_rto,
290 srtt_ms: None,
291 rttvar_ms: 0.0,
292 congestion_window_packets: initial_cwnd,
293 min_congestion_window_packets: min_cwnd,
294 max_congestion_window_packets: max_cwnd,
295 slow_start_threshold_packets: slow_start_threshold,
296 congestion_additive_gain: congestion.congestion_additive_gain.max(0.01),
297 congestion_multiplicative_decrease_nack: congestion
298 .congestion_multiplicative_decrease_nack
299 .clamp(0.1, 0.99),
300 congestion_multiplicative_decrease_timeout: congestion
301 .congestion_multiplicative_decrease_timeout
302 .clamp(0.1, 0.99),
303 high_rtt_threshold_ms: congestion.congestion_high_rtt_threshold_ms.max(1.0),
304 high_rtt_additive_scale: congestion
305 .congestion_high_rtt_additive_scale
306 .clamp(0.05, 1.0),
307 nack_loss_backoff_cooldown: congestion
308 .congestion_nack_backoff_cooldown
309 .max(Duration::from_millis(1)),
310 next_nack_loss_backoff: now,
311 pacing_enabled: tunables.pacing_enabled,
312 pacing_gain: tunables.pacing_gain.max(0.05),
313 pacing_min_rate_bytes_per_sec: pacing_min_rate,
314 pacing_max_rate_bytes_per_sec: pacing_max_rate,
315 pacing_budget_max_bytes: pacing_budget_max,
316 pacing_budget_bytes: initial_pacing_budget,
317 pacing_rate_bytes_per_sec: pacing_min_rate,
318 last_pacing_update: now,
319 outgoing_queued_bytes: 0,
320 outgoing_queue_max_frames: tunables.outgoing_queue_max_frames.max(1),
321 outgoing_queue_max_bytes: tunables.outgoing_queue_max_bytes.max(1),
322 outgoing_queue_soft_ratio: tunables.outgoing_queue_soft_ratio.clamp(0.05, 0.99),
323 backpressure_mode: tunables.backpressure_mode,
324 best_effort_zeroize_dropped_payloads: tunables.best_effort_zeroize_dropped_payloads,
325 disconnect_requested_by_backpressure: false,
326 metrics: SessionMetrics::default(),
327 };
328
329 for level in 0..4 {
330 s.outgoing_next_weights[level] = ((1u64 << level) * level as u64) + level as u64;
331 }
332
333 s
334 }
335
336 pub fn state(&self) -> SessionState {
337 self.state
338 }
339
340 pub fn transition_to(&mut self, next: SessionState) -> bool {
341 if self.state.can_transition_to(next) {
342 self.state = next;
343 true
344 } else {
345 false
346 }
347 }
348
349 pub fn mtu(&self) -> usize {
350 self.mtu
351 }
352
353 pub fn set_mtu(&mut self, mtu: usize) {
354 self.mtu = mtu;
355 }
356
357 pub fn touch_activity(&mut self, now: Instant) {
358 self.last_activity = now;
359 }
360
361 pub fn idle_for(&self, now: Instant) -> Duration {
362 now.saturating_duration_since(self.last_activity)
363 }
364
365 pub fn should_send_keepalive(&self, now: Instant, interval: Duration) -> bool {
366 if self.state != SessionState::Connected || interval.is_zero() {
367 return false;
368 }
369
370 self.idle_for(now) >= interval
371 && now.saturating_duration_since(self.last_keepalive_sent) >= interval
372 }
373
374 pub fn mark_keepalive_sent(&mut self, now: Instant) {
375 self.last_keepalive_sent = now;
376 }
377
378 pub fn next_datagram_sequence(&mut self) -> Sequence24 {
379 let seq = self.datagram_write_index;
380 self.datagram_write_index = self.datagram_write_index.next();
381 seq
382 }
383
384 pub fn pending_outgoing_frames(&self) -> usize {
385 self.outgoing_heap.len()
386 }
387
388 pub fn pending_outgoing_bytes(&self) -> usize {
389 self.outgoing_queued_bytes
390 }
391
392 pub fn force_control_flush_deadlines(&mut self, now: Instant) {
393 if !self.outgoing_acks.is_empty() {
394 self.next_ack_flush_at = now;
395 }
396 if !self.outgoing_nacks.is_empty() {
397 self.next_nack_flush_at = now;
398 }
399 }
400
401 pub fn take_backpressure_disconnect(&mut self) -> bool {
402 let should_disconnect = self.disconnect_requested_by_backpressure;
403 self.disconnect_requested_by_backpressure = false;
404 should_disconnect
405 }
406
407 pub fn metrics_snapshot(&self) -> SessionMetricsSnapshot {
408 let resend_ratio = if self.metrics.reliable_sent_datagrams == 0 {
409 0.0
410 } else {
411 self.metrics.resent_datagrams as f64 / self.metrics.reliable_sent_datagrams as f64
412 };
413
414 SessionMetricsSnapshot {
415 ingress_datagrams: self.metrics.ingress_datagrams,
416 ingress_frames: self.metrics.ingress_frames,
417 duplicate_reliable_drops: self.metrics.duplicate_reliable_drops,
418 ordered_stale_drops: self.metrics.ordered_stale_drops,
419 ordered_buffer_full_drops: self.metrics.ordered_buffer_full_drops,
420 sequenced_stale_drops: self.metrics.sequenced_stale_drops,
421 sequenced_missing_index_drops: self.metrics.sequenced_missing_index_drops,
422 reliable_sent_datagrams: self.metrics.reliable_sent_datagrams,
423 resent_datagrams: self.metrics.resent_datagrams,
424 ack_out_datagrams: self.metrics.ack_out_datagrams,
425 nack_out_datagrams: self.metrics.nack_out_datagrams,
426 acked_datagrams: self.metrics.acked_datagrams,
427 nacked_datagrams: self.metrics.nacked_datagrams,
428 split_ttl_drops: self.metrics.split_ttl_drops,
429 pending_outgoing_frames: self.pending_outgoing_frames(),
430 pending_outgoing_bytes: self.pending_outgoing_bytes(),
431 outgoing_queue_drops: self.metrics.outgoing_queue_drops,
432 outgoing_queue_defers: self.metrics.outgoing_queue_defers,
433 outgoing_queue_disconnects: self.metrics.outgoing_queue_disconnects,
434 backpressure_delays: self.metrics.backpressure_delays,
435 backpressure_drops: self.metrics.backpressure_drops,
436 backpressure_disconnects: self.metrics.backpressure_disconnects,
437 srtt_ms: self.srtt_ms.unwrap_or(0.0),
438 rttvar_ms: self.rttvar_ms,
439 resend_rto_ms: self.resend_rto.as_secs_f64() * 1000.0,
440 congestion_window_packets: self.congestion_window_packets,
441 resend_ratio,
442 pacing_budget_bytes: self.pacing_budget_bytes,
443 pacing_rate_bytes_per_sec: self.pacing_rate_bytes_per_sec,
444 }
445 }
446
447 pub fn process_datagram_sequence(&mut self, seq: Sequence24, now: Instant) {
448 let expected = self.datagram_read_index;
449
450 if expected > seq {
451 let ack_was_empty = self.outgoing_acks.is_empty();
452 self.outgoing_acks.push(SequenceRange {
453 start: seq,
454 end: seq,
455 });
456 if ack_was_empty {
457 self.next_ack_flush_at = now + self.ack_flush_interval;
458 }
459 return;
460 }
461
462 self.datagram_read_index = seq.next();
463
464 if seq == expected {
465 let ack_was_empty = self.outgoing_acks.is_empty();
466 self.outgoing_acks.push(SequenceRange {
467 start: seq,
468 end: seq,
469 });
470 if ack_was_empty {
471 self.next_ack_flush_at = now + self.ack_flush_interval;
472 }
473 return;
474 }
475
476 let mut nack_start = expected;
477 let nack_end = seq.prev();
478
479 loop {
480 let mut chunk_end = nack_start;
481 let mut count = 0;
482
483 while count < (MAX_ACK_SEQUENCES - 1) && chunk_end < nack_end {
484 chunk_end = chunk_end.next();
485 count += 1;
486 }
487
488 let nack_was_empty = self.outgoing_nacks.is_empty();
489 self.outgoing_nacks.push(SequenceRange {
490 start: nack_start,
491 end: chunk_end,
492 });
493 if nack_was_empty {
494 self.next_nack_flush_at = now + self.nack_flush_interval;
495 }
496
497 if chunk_end == nack_end {
498 break;
499 }
500
501 nack_start = chunk_end.next();
502 }
503
504 let ack_was_empty = self.outgoing_acks.is_empty();
505 self.outgoing_acks.push(SequenceRange {
506 start: seq,
507 end: seq,
508 });
509 if ack_was_empty {
510 self.next_ack_flush_at = now + self.ack_flush_interval;
511 }
512 }
513
514 pub fn ingest_datagram(
515 &mut self,
516 datagram: Datagram,
517 now: Instant,
518 ) -> Result<Vec<Frame>, DecodeError> {
519 self.metrics.ingress_datagrams = self.metrics.ingress_datagrams.saturating_add(1);
520
521 match datagram.payload {
522 DatagramPayload::Ack(payload) => {
523 self.handle_ack_payload(payload);
524 Ok(Vec::new())
525 }
526 DatagramPayload::Nack(payload) => {
527 self.handle_nack_payload(payload);
528 Ok(Vec::new())
529 }
530 DatagramPayload::Frames(frames) => {
531 self.metrics.ingress_frames = self
532 .metrics
533 .ingress_frames
534 .saturating_add(frames.len() as u64);
535 self.process_datagram_sequence(datagram.header.sequence, now);
536 self.handle_frames(frames, now)
537 }
538 }
539 }
540
541 pub fn handle_ack_payload(&mut self, payload: AckNackPayload) {
542 self.incoming_acks.extend(payload.ranges);
543 }
544
545 pub fn handle_nack_payload(&mut self, payload: AckNackPayload) {
546 self.incoming_nacks.extend(payload.ranges);
547 }
548
549 fn handle_frames(
550 &mut self,
551 frames: Vec<Frame>,
552 now: Instant,
553 ) -> Result<Vec<Frame>, DecodeError> {
554 let mut out = Vec::new();
555
556 for frame in frames {
557 let is_split = frame.header.is_split;
558 let should_drop_duplicate = frame.header.reliability.is_reliable() && !is_split;
559
560 if should_drop_duplicate
561 && let Some(ridx) = frame.reliable_index
562 && self.reliable_tracker.has_seen(ridx)
563 {
564 self.metrics.duplicate_reliable_drops =
565 self.metrics.duplicate_reliable_drops.saturating_add(1);
566 continue;
567 }
568
569 let assembled = self.split_assembler.add(frame, now)?;
570 let Some(frame) = assembled else {
571 continue;
572 };
573
574 if !is_split
575 && frame.header.reliability.is_reliable()
576 && let Some(ridx) = frame.reliable_index
577 {
578 let _ = self.reliable_tracker.see(ridx);
579 }
580
581 if frame.header.reliability.is_sequenced() {
582 match self.ordering.handle_sequenced(&frame) {
583 SequencedResult::Accept => out.push(frame),
584 SequencedResult::DropMissingSequence => {
585 self.metrics.sequenced_missing_index_drops =
586 self.metrics.sequenced_missing_index_drops.saturating_add(1);
587 }
588 SequencedResult::DropStale => {
589 self.metrics.sequenced_stale_drops =
590 self.metrics.sequenced_stale_drops.saturating_add(1);
591 }
592 }
593 continue;
594 }
595
596 if frame.header.reliability.is_ordered() {
597 match self.ordering.handle_ordered(frame) {
598 OrderedResult::Ready(mut ready) => out.append(&mut ready),
599 OrderedResult::Buffered => {}
600 OrderedResult::DroppedStale => {
601 self.metrics.ordered_stale_drops =
602 self.metrics.ordered_stale_drops.saturating_add(1);
603 }
604 OrderedResult::DroppedBufferFull => {
605 self.metrics.ordered_buffer_full_drops =
606 self.metrics.ordered_buffer_full_drops.saturating_add(1);
607 }
608 }
609 continue;
610 }
611
612 out.push(frame);
613 }
614
615 Ok(out)
616 }
617
618 pub fn drain_ack_datagram(&mut self, now: Instant) -> Option<Datagram> {
619 let ranges = self
620 .outgoing_acks
621 .pop_for_mtu(self.mtu, 3, self.ack_max_ranges_per_datagram);
622 if ranges.is_empty() {
623 return None;
624 }
625 self.next_ack_flush_at = now + self.ack_flush_interval;
626 self.metrics.ack_out_datagrams = self.metrics.ack_out_datagrams.saturating_add(1);
627
628 Some(Datagram {
629 header: DatagramHeader {
630 flags: DatagramFlags::VALID | DatagramFlags::ACK,
631 sequence: Sequence24::new(0),
632 },
633 payload: DatagramPayload::Ack(AckNackPayload { ranges }),
634 })
635 }
636
637 pub fn drain_nack_datagram(&mut self, now: Instant) -> Option<Datagram> {
638 let ranges =
639 self.outgoing_nacks
640 .pop_for_mtu(self.mtu, 3, self.nack_max_ranges_per_datagram);
641 if ranges.is_empty() {
642 return None;
643 }
644 self.next_nack_flush_at = now + self.nack_flush_interval;
645 self.metrics.nack_out_datagrams = self.metrics.nack_out_datagrams.saturating_add(1);
646
647 Some(Datagram {
648 header: DatagramHeader {
649 flags: DatagramFlags::VALID | DatagramFlags::NACK,
650 sequence: Sequence24::new(0),
651 },
652 payload: DatagramPayload::Nack(AckNackPayload { ranges }),
653 })
654 }
655
656 pub fn track_sent_reliable_datagram(
657 &mut self,
658 datagram: Datagram,
659 now: Instant,
660 receipt_ids: Vec<u64>,
661 ) {
662 let seq = datagram.header.sequence;
663 let has_reliable = match &datagram.payload {
664 DatagramPayload::Frames(frames) => {
665 frames.iter().any(|f| f.header.reliability.is_reliable())
666 }
667 DatagramPayload::Ack(_) | DatagramPayload::Nack(_) => false,
668 };
669 let has_ack_receipt = match &datagram.payload {
670 DatagramPayload::Frames(frames) => frames
671 .iter()
672 .any(|f| f.header.reliability.is_with_ack_receipt()),
673 DatagramPayload::Ack(_) | DatagramPayload::Nack(_) => false,
674 };
675
676 if !has_reliable && !has_ack_receipt && receipt_ids.is_empty() {
677 return;
678 }
679
680 if has_reliable {
681 self.metrics.reliable_sent_datagrams =
682 self.metrics.reliable_sent_datagrams.saturating_add(1);
683 }
684
685 for receipt_id in &receipt_ids {
686 let counter = self.receipt_tracking.entry(*receipt_id).or_insert(0);
687 *counter = counter.saturating_add(1);
688 }
689
690 self.sent_datagrams.insert(
691 seq,
692 TrackedDatagram {
693 datagram,
694 send_time: now,
695 next_send: now + self.resend_rto,
696 retries: 0,
697 nack_resend_pending: false,
698 resendable: has_reliable,
699 receipt_ids,
700 },
701 );
702 }
703
704 pub fn queue_payload(
705 &mut self,
706 payload: Bytes,
707 reliability: Reliability,
708 channel: u8,
709 priority: RakPriority,
710 ) -> QueuePayloadResult {
711 self.queue_payload_with_receipt(payload, reliability, channel, priority, None)
712 }
713
714 pub fn queue_payload_with_receipt(
715 &mut self,
716 payload: Bytes,
717 reliability: Reliability,
718 channel: u8,
719 priority: RakPriority,
720 receipt_id: Option<u64>,
721 ) -> QueuePayloadResult {
722 let (estimated_frames, estimated_bytes, effective_reliability) =
723 self.estimate_queue_impact(payload.len(), reliability);
724
725 match self.evaluate_backpressure(
726 estimated_frames,
727 estimated_bytes,
728 effective_reliability,
729 priority,
730 ) {
731 BackpressureAction::Allow => {}
732 BackpressureAction::Drop => {
733 if self.best_effort_zeroize_dropped_payloads {
734 let _ = best_effort_zeroize_bytes(payload);
735 }
736 self.metrics.outgoing_queue_drops =
737 self.metrics.outgoing_queue_drops.saturating_add(1);
738 self.metrics.backpressure_drops = self.metrics.backpressure_drops.saturating_add(1);
739 return QueuePayloadResult::Dropped;
740 }
741 BackpressureAction::Defer => {
742 if self.best_effort_zeroize_dropped_payloads {
743 let _ = best_effort_zeroize_bytes(payload);
744 }
745 self.metrics.outgoing_queue_defers =
746 self.metrics.outgoing_queue_defers.saturating_add(1);
747 self.metrics.backpressure_delays =
748 self.metrics.backpressure_delays.saturating_add(1);
749 return QueuePayloadResult::Deferred;
750 }
751 BackpressureAction::Disconnect => {
752 if self.best_effort_zeroize_dropped_payloads {
753 let _ = best_effort_zeroize_bytes(payload);
754 }
755 self.metrics.outgoing_queue_disconnects =
756 self.metrics.outgoing_queue_disconnects.saturating_add(1);
757 self.metrics.backpressure_disconnects =
758 self.metrics.backpressure_disconnects.saturating_add(1);
759 self.disconnect_requested_by_backpressure = true;
760 return QueuePayloadResult::DisconnectRequested;
761 }
762 }
763
764 let max_single = self.max_payload_for(reliability, false);
765 if payload.len() <= max_single {
766 let reliable_bytes =
767 self.enqueue_single_frame(payload, reliability, channel, priority, receipt_id);
768 return QueuePayloadResult::Enqueued { reliable_bytes };
769 }
770
771 let reliable_bytes =
772 self.enqueue_split_frames(payload, reliability, channel, priority, receipt_id);
773 QueuePayloadResult::Enqueued { reliable_bytes }
774 }
775
776 pub fn process_incoming_receipts(&mut self, now: Instant) -> ReceiptProgress {
777 let mut progress = ReceiptProgress::default();
778
779 while let Some(range) = self.incoming_acks.pop_front() {
780 Self::for_each_sequence(range, |seq| {
781 if let Some(acked) = self.sent_datagrams.remove(&seq) {
782 progress.acked += 1;
783 self.on_reliable_ack();
784 if acked.retries == 0 {
785 let rtt_sample = now.saturating_duration_since(acked.send_time);
786 self.observe_rtt_sample(rtt_sample);
787 }
788
789 for receipt_id in acked.receipt_ids {
790 if let Some(pending) = self.receipt_tracking.get_mut(&receipt_id) {
791 if *pending > 1 {
792 *pending -= 1;
793 } else {
794 self.receipt_tracking.remove(&receipt_id);
795 progress.acked_receipt_ids.push(receipt_id);
796 }
797 }
798 }
799 }
800 });
801 }
802 self.metrics.acked_datagrams = self
803 .metrics
804 .acked_datagrams
805 .saturating_add(progress.acked as u64);
806
807 while let Some(range) = self.incoming_nacks.pop_front() {
808 Self::for_each_sequence(range, |seq| {
809 if let Some(entry) = self.sent_datagrams.get_mut(&seq)
810 && entry.resendable
811 && entry.next_send > now
812 {
813 entry.next_send = now;
814 entry.nack_resend_pending = true;
815 progress.nacked += 1;
816 }
817 });
818 }
819 if progress.nacked > 0 {
820 self.on_nack_loss(now);
821 }
822 self.metrics.nacked_datagrams = self
823 .metrics
824 .nacked_datagrams
825 .saturating_add(progress.nacked as u64);
826
827 progress
828 }
829
830 pub fn collect_resendable(
831 &mut self,
832 now: Instant,
833 max_count: usize,
834 max_bytes: usize,
835 ) -> Vec<Datagram> {
836 let mut total_bytes = 0usize;
837 let mut selected = Vec::new();
838 let mut timeout_loss_observed = false;
839
840 for (&seq, tracked) in &self.sent_datagrams {
841 if selected.len() >= max_count {
842 break;
843 }
844 if !tracked.resendable {
845 continue;
846 }
847 if tracked.next_send > now {
848 continue;
849 }
850
851 let size = tracked.datagram.encoded_size();
852 if total_bytes + size > max_bytes {
853 break;
854 }
855
856 total_bytes += size;
857 timeout_loss_observed |= !tracked.nack_resend_pending;
858 selected.push(seq);
859 }
860
861 if selected.is_empty() {
862 return Vec::new();
863 }
864
865 if timeout_loss_observed {
866 self.on_timeout(now);
867 }
868
869 let next_send_at = now + self.resend_rto;
870 let mut out = Vec::with_capacity(selected.len());
871 for seq in selected {
872 let Some(tracked) = self.sent_datagrams.get_mut(&seq) else {
873 continue;
874 };
875 tracked.send_time = now;
876 tracked.next_send = next_send_at;
877 tracked.retries = tracked.retries.saturating_add(1);
878 tracked.nack_resend_pending = false;
879 out.push(tracked.datagram.clone());
880 self.metrics.resent_datagrams = self.metrics.resent_datagrams.saturating_add(1);
881 }
882
883 out
884 }
885
886 pub fn build_data_datagram(
887 &mut self,
888 now: Instant,
889 remaining_bytes_budget: &mut usize,
890 ) -> Option<Datagram> {
891 if self.outgoing_heap.is_empty() || *remaining_bytes_budget == 0 {
892 return None;
893 }
894
895 let mut frames = Vec::new();
896 let mut datagram_receipt_ids = Vec::new();
897 let mut datagram_size = RAKNET_DATAGRAM_HEADER_SIZE;
898 let mut has_reliable = false;
899 let mut has_split = false;
900
901 loop {
902 let allow_reliable = has_reliable || self.can_emit_new_reliable_datagram();
903 let Some(queued) = self.pop_next_frame_for_datagram(
904 allow_reliable,
905 datagram_size,
906 *remaining_bytes_budget,
907 ) else {
908 break;
909 };
910
911 datagram_size += queued.encoded_size;
912 *remaining_bytes_budget = remaining_bytes_budget.saturating_sub(queued.encoded_size);
913 self.outgoing_queued_bytes = self
914 .outgoing_queued_bytes
915 .saturating_sub(queued.encoded_size);
916
917 has_reliable |= queued.is_reliable;
918 has_split |= queued.frame.header.is_split;
919 if let Some(receipt_id) = queued.receipt_id
920 && !datagram_receipt_ids.contains(&receipt_id)
921 {
922 datagram_receipt_ids.push(receipt_id);
923 }
924 frames.push(queued.frame);
925 }
926
927 if frames.is_empty() {
928 return None;
929 }
930
931 let flags = if !self.outgoing_heap.is_empty() || has_split {
932 DatagramFlags::VALID | DatagramFlags::CONTINUOUS_SEND
933 } else {
934 DatagramFlags::VALID | DatagramFlags::HAS_B_AND_AS
935 };
936
937 let datagram = Datagram {
938 header: DatagramHeader {
939 flags,
940 sequence: self.next_datagram_sequence(),
941 },
942 payload: DatagramPayload::Frames(frames),
943 };
944
945 if has_reliable || !datagram_receipt_ids.is_empty() {
946 self.track_sent_reliable_datagram(datagram.clone(), now, datagram_receipt_ids);
947 }
948
949 Some(datagram)
950 }
951
952 pub fn on_tick(
953 &mut self,
954 now: Instant,
955 max_new_datagrams: usize,
956 max_new_bytes: usize,
957 max_resend_datagrams: usize,
958 max_resend_bytes: usize,
959 ) -> Vec<Datagram> {
960 let mut out = Vec::new();
961 self.refresh_pacing_budget(now);
962
963 match self.ack_nack_priority {
964 AckNackPriority::NackFirst => {
965 self.flush_nack_if_due(now, &mut out);
966 self.flush_ack_if_due(now, &mut out);
967 }
968 AckNackPriority::AckFirst => {
969 self.flush_ack_if_due(now, &mut out);
970 self.flush_nack_if_due(now, &mut out);
971 }
972 }
973
974 let pacing_budget = if self.pacing_enabled {
975 self.pacing_budget_bytes.floor() as usize
976 } else {
977 usize::MAX
978 };
979 let resend_bytes_budget = max_resend_bytes.min(pacing_budget);
980 let resend_datagrams =
981 self.collect_resendable(now, max_resend_datagrams, resend_bytes_budget);
982 let resend_bytes_used = resend_datagrams
983 .iter()
984 .map(Datagram::encoded_size)
985 .sum::<usize>();
986 self.consume_pacing_budget(resend_bytes_used);
987 out.extend(resend_datagrams);
988
989 let available_pacing_for_new = if self.pacing_enabled {
990 self.pacing_budget_bytes.floor() as usize
991 } else {
992 usize::MAX
993 };
994 let mut remaining_new_bytes = max_new_bytes.min(available_pacing_for_new);
995 let budget_too_small_for_frame = self
996 .min_queued_frame_size()
997 .is_some_and(|min_frame| remaining_new_bytes < min_frame);
998 let allow_immediate_bypass = self.pacing_enabled
999 && (remaining_new_bytes == 0 || budget_too_small_for_frame)
1000 && max_new_bytes > 0
1001 && self.has_immediate_outgoing_frame();
1002 if allow_immediate_bypass {
1003 remaining_new_bytes = self.mtu.min(max_new_bytes).max(1);
1004 }
1005
1006 let mut new_bytes_used = 0usize;
1007 let mut new_datagram_count = 0usize;
1008 while new_datagram_count < max_new_datagrams {
1009 let Some(datagram) = self.build_data_datagram(now, &mut remaining_new_bytes) else {
1010 break;
1011 };
1012 new_bytes_used = new_bytes_used.saturating_add(datagram.encoded_size());
1013 out.push(datagram);
1014 new_datagram_count += 1;
1015 if remaining_new_bytes == 0 {
1016 break;
1017 }
1018 }
1019 self.consume_pacing_budget(new_bytes_used);
1020
1021 self.prune_split_state(now);
1022 out
1023 }
1024
1025 fn flush_ack_if_due(&mut self, now: Instant, out: &mut Vec<Datagram>) {
1026 if self.outgoing_acks.is_empty() || now < self.next_ack_flush_at {
1027 return;
1028 }
1029 if let Some(ack) = self.drain_ack_datagram(now) {
1030 out.push(ack);
1031 }
1032 }
1033
1034 fn flush_nack_if_due(&mut self, now: Instant, out: &mut Vec<Datagram>) {
1035 if self.outgoing_nacks.is_empty() || now < self.next_nack_flush_at {
1036 return;
1037 }
1038 if let Some(nack) = self.drain_nack_datagram(now) {
1039 out.push(nack);
1040 }
1041 }
1042
1043 pub fn prune_split_state(&mut self, now: Instant) -> usize {
1044 let dropped = self.split_assembler.prune(now);
1045 self.metrics.split_ttl_drops = self.metrics.split_ttl_drops.saturating_add(dropped as u64);
1046 dropped
1047 }
1048
1049 fn enqueue_single_frame(
1050 &mut self,
1051 payload: Bytes,
1052 reliability: Reliability,
1053 channel: u8,
1054 priority: RakPriority,
1055 receipt_id: Option<u64>,
1056 ) -> usize {
1057 let ordering_index = self.next_ordering_index_if_needed(reliability, channel);
1058 let sequence_index = self.next_sequence_index_if_needed(reliability, channel);
1059 let reliable_index = if reliability.is_reliable() {
1060 Some(self.next_reliable_index())
1061 } else {
1062 None
1063 };
1064
1065 let frame = Frame {
1066 header: FrameHeader::new(reliability, false, false),
1067 bit_length: (payload.len() as u16) << 3,
1068 reliable_index,
1069 sequence_index,
1070 ordering_index,
1071 ordering_channel: ordering_index.map(|_| channel),
1072 split: None,
1073 payload,
1074 };
1075
1076 let size = frame.encoded_size();
1077 self.push_outgoing_frame(frame, priority, receipt_id);
1078 if reliability.is_reliable() { size } else { 0 }
1079 }
1080
1081 fn enqueue_split_frames(
1082 &mut self,
1083 mut payload: Bytes,
1084 reliability: Reliability,
1085 channel: u8,
1086 priority: RakPriority,
1087 receipt_id: Option<u64>,
1088 ) -> usize {
1089 let reliability = Self::normalize_reliability_for_split(reliability);
1090 let max_split_payload = self.max_payload_for(reliability, true).max(1);
1091 let part_count = payload.len().div_ceil(max_split_payload);
1092 let split_id = self.split_write_index;
1093 self.split_write_index = self.split_write_index.wrapping_add(1);
1094 let ordering_index = self.next_ordering_index_if_needed(reliability, channel);
1095 let sequence_index = self.next_sequence_index_if_needed(reliability, channel);
1096
1097 let mut reliable_bytes = 0usize;
1098
1099 for idx in 0..part_count {
1100 let take = payload.len().min(max_split_payload);
1101 let part = payload.split_to(take);
1102 let reliable_index = if reliability.is_reliable() {
1103 Some(self.next_reliable_index())
1104 } else {
1105 None
1106 };
1107
1108 let frame = Frame {
1109 header: FrameHeader::new(reliability, true, false),
1110 bit_length: (part.len() as u16) << 3,
1111 reliable_index,
1112 sequence_index,
1113 ordering_index,
1114 ordering_channel: ordering_index.map(|_| channel),
1115 split: Some(crate::protocol::frame::SplitInfo {
1116 part_count: part_count as u32,
1117 part_id: split_id,
1118 part_index: idx as u32,
1119 }),
1120 payload: part,
1121 };
1122
1123 let size = frame.encoded_size();
1124 self.push_outgoing_frame(frame, priority, receipt_id);
1125 if reliability.is_reliable() {
1126 reliable_bytes += size;
1127 }
1128 }
1129
1130 reliable_bytes
1131 }
1132
1133 fn max_payload_for(&self, reliability: Reliability, is_split: bool) -> usize {
1134 let frame_overhead = self.frame_overhead(reliability, is_split);
1135 self.mtu
1136 .saturating_sub(RAKNET_DATAGRAM_HEADER_SIZE + frame_overhead)
1137 .max(1)
1138 }
1139
1140 fn frame_overhead(&self, reliability: Reliability, is_split: bool) -> usize {
1141 let mut size = 3usize;
1142 if reliability.is_reliable() {
1143 size += 3;
1144 }
1145 if reliability.is_sequenced() {
1146 size += 3;
1147 }
1148 if reliability.is_ordered() || reliability.is_sequenced() {
1149 size += 4;
1150 }
1151 if is_split {
1152 size += 10;
1153 }
1154 size
1155 }
1156
1157 fn next_reliable_index(&mut self) -> Sequence24 {
1158 let idx = self.reliable_write_index;
1159 self.reliable_write_index = self.reliable_write_index.next();
1160 idx
1161 }
1162
1163 fn next_ordering_index_if_needed(
1164 &mut self,
1165 reliability: Reliability,
1166 channel: u8,
1167 ) -> Option<Sequence24> {
1168 if !(reliability.is_ordered() || reliability.is_sequenced()) {
1169 return None;
1170 }
1171
1172 let ch = channel as usize;
1173 if ch >= self.ordering_write_index.len() {
1174 self.ordering_write_index.resize(ch + 1, Sequence24::new(0));
1175 }
1176
1177 let idx = self.ordering_write_index[ch];
1178 self.ordering_write_index[ch] = idx.next();
1179 Some(idx)
1180 }
1181
1182 fn next_sequence_index_if_needed(
1183 &mut self,
1184 reliability: Reliability,
1185 channel: u8,
1186 ) -> Option<Sequence24> {
1187 if !reliability.is_sequenced() {
1188 return None;
1189 }
1190
1191 let ch = channel as usize;
1192 if ch >= self.sequencing_write_index.len() {
1193 self.sequencing_write_index
1194 .resize(ch + 1, Sequence24::new(0));
1195 }
1196
1197 let idx = self.sequencing_write_index[ch];
1198 self.sequencing_write_index[ch] = idx.next();
1199 Some(idx)
1200 }
1201
1202 fn push_outgoing_frame(
1203 &mut self,
1204 frame: Frame,
1205 priority: RakPriority,
1206 receipt_id: Option<u64>,
1207 ) {
1208 let weight = self.next_weight(priority);
1209 let encoded_size = frame.encoded_size();
1210 let is_reliable = frame.header.reliability.is_reliable();
1211 self.outgoing_queued_bytes = self.outgoing_queued_bytes.saturating_add(encoded_size);
1212 self.outgoing_heap.push(QueuedFrame {
1213 weight,
1214 encoded_size,
1215 is_reliable,
1216 priority,
1217 receipt_id,
1218 frame,
1219 });
1220 }
1221
1222 fn next_weight(&mut self, priority: RakPriority) -> u64 {
1223 let level = priority.as_index();
1224 let mut next = self.outgoing_next_weights[level];
1225
1226 if !self.outgoing_heap.is_empty() {
1227 if next >= self.last_min_weight {
1228 next = self.last_min_weight + ((1u64 << level) * level as u64) + level as u64;
1229 self.outgoing_next_weights[level] =
1230 next + ((1u64 << level) * (level as u64 + 1)) + level as u64;
1231 }
1232 } else {
1233 for p in 0..4 {
1234 self.outgoing_next_weights[p] = ((1u64 << p) * p as u64) + p as u64;
1235 }
1236 }
1237
1238 self.last_min_weight = next - ((1u64 << level) * level as u64) + level as u64;
1239 next
1240 }
1241
1242 fn normalize_reliability_for_split(reliability: Reliability) -> Reliability {
1243 match reliability {
1244 Reliability::Unreliable => Reliability::Reliable,
1245 Reliability::UnreliableSequenced => Reliability::ReliableSequenced,
1246 Reliability::UnreliableWithAckReceipt => Reliability::ReliableWithAckReceipt,
1247 v => v,
1248 }
1249 }
1250
1251 fn estimate_queue_impact(
1252 &self,
1253 payload_len: usize,
1254 reliability: Reliability,
1255 ) -> (usize, usize, Reliability) {
1256 let max_single = self.max_payload_for(reliability, false);
1257 if payload_len <= max_single {
1258 let bytes = self
1259 .frame_overhead(reliability, false)
1260 .saturating_add(payload_len);
1261 return (1, bytes, reliability);
1262 }
1263
1264 let effective = Self::normalize_reliability_for_split(reliability);
1265 let max_split_payload = self.max_payload_for(effective, true).max(1);
1266 let part_count = payload_len.div_ceil(max_split_payload);
1267 let bytes = payload_len
1268 .saturating_add(part_count.saturating_mul(self.frame_overhead(effective, true)));
1269
1270 (part_count, bytes, effective)
1271 }
1272
1273 fn evaluate_backpressure(
1274 &self,
1275 added_frames: usize,
1276 added_bytes: usize,
1277 reliability: Reliability,
1278 priority: RakPriority,
1279 ) -> BackpressureAction {
1280 let projected_frames = self.pending_outgoing_frames().saturating_add(added_frames);
1281 let projected_bytes = self.pending_outgoing_bytes().saturating_add(added_bytes);
1282
1283 let hard_frames = self.outgoing_queue_max_frames.max(1);
1284 let hard_bytes = self.outgoing_queue_max_bytes.max(1);
1285
1286 let soft_frames = ((hard_frames as f64) * self.outgoing_queue_soft_ratio)
1287 .floor()
1288 .max(1.0) as usize;
1289 let soft_bytes = ((hard_bytes as f64) * self.outgoing_queue_soft_ratio)
1290 .floor()
1291 .max(1.0) as usize;
1292
1293 let exceeds_hard = projected_frames > hard_frames || projected_bytes > hard_bytes;
1294 let exceeds_soft = projected_frames > soft_frames || projected_bytes > soft_bytes;
1295 let reliable = reliability.is_reliable();
1296
1297 if exceeds_hard {
1298 return match self.backpressure_mode {
1299 BackpressureMode::Delay => BackpressureAction::Defer,
1300 BackpressureMode::Shed => {
1301 if !reliable || matches!(priority, RakPriority::Normal | RakPriority::Low) {
1302 BackpressureAction::Drop
1303 } else {
1304 BackpressureAction::Defer
1305 }
1306 }
1307 BackpressureMode::Disconnect => BackpressureAction::Disconnect,
1308 };
1309 }
1310
1311 if exceeds_soft {
1312 return BackpressureAction::Defer;
1313 }
1314
1315 BackpressureAction::Allow
1316 }
1317
1318 fn best_effort_zeroize_buffered_payloads(&mut self) {
1319 for queued in self.outgoing_heap.drain() {
1320 let _ = best_effort_zeroize_bytes(queued.frame.payload);
1321 }
1322
1323 for tracked in self.sent_datagrams.values_mut() {
1324 if let DatagramPayload::Frames(frames) = &mut tracked.datagram.payload {
1325 for frame in frames {
1326 let payload = std::mem::take(&mut frame.payload);
1327 let _ = best_effort_zeroize_bytes(payload);
1328 }
1329 }
1330 }
1331
1332 for frame in self.ordering.drain_pending_ordered_frames() {
1333 let _ = best_effort_zeroize_bytes(frame.payload);
1334 }
1335
1336 for part in self.split_assembler.drain_buffered_parts() {
1337 let _ = best_effort_zeroize_bytes(part);
1338 }
1339 }
1340
1341 fn can_emit_new_reliable_datagram(&self) -> bool {
1342 let in_flight = self.sent_datagrams.len() as f64;
1343 in_flight < self.congestion_window_packets.max(1.0).floor()
1344 }
1345
1346 fn refresh_pacing_budget(&mut self, now: Instant) {
1347 if !self.pacing_enabled {
1348 return;
1349 }
1350
1351 let elapsed = now
1352 .saturating_duration_since(self.last_pacing_update)
1353 .as_secs_f64();
1354 self.last_pacing_update = now;
1355
1356 let rate = self.compute_pacing_rate_bytes_per_sec();
1357 self.pacing_rate_bytes_per_sec = rate;
1358
1359 if elapsed > 0.0 {
1360 self.pacing_budget_bytes = (self.pacing_budget_bytes + elapsed * rate)
1361 .clamp(0.0, self.pacing_budget_max_bytes);
1362 }
1363 }
1364
1365 fn compute_pacing_rate_bytes_per_sec(&self) -> f64 {
1366 if !self.pacing_enabled {
1367 return f64::INFINITY;
1368 }
1369
1370 let reference_rtt_ms = self
1371 .srtt_ms
1372 .unwrap_or_else(|| (self.resend_rto.as_secs_f64() * 1000.0).max(1.0));
1373 let rtt_secs = (reference_rtt_ms.max(1.0)) / 1000.0;
1374 let cwnd_bytes = self.congestion_window_packets.max(1.0) * self.mtu as f64;
1375 let raw_rate = (cwnd_bytes / rtt_secs) * self.pacing_gain;
1376 raw_rate.clamp(
1377 self.pacing_min_rate_bytes_per_sec,
1378 self.pacing_max_rate_bytes_per_sec,
1379 )
1380 }
1381
1382 fn consume_pacing_budget(&mut self, bytes: usize) {
1383 if !self.pacing_enabled {
1384 return;
1385 }
1386 self.pacing_budget_bytes = (self.pacing_budget_bytes - bytes as f64).max(0.0);
1387 }
1388
1389 fn has_immediate_outgoing_frame(&self) -> bool {
1390 self.outgoing_heap
1391 .iter()
1392 .any(|entry| entry.priority == RakPriority::Immediate)
1393 }
1394
1395 fn min_queued_frame_size(&self) -> Option<usize> {
1396 self.outgoing_heap
1397 .iter()
1398 .map(|entry| entry.encoded_size)
1399 .min()
1400 }
1401
1402 fn pop_next_frame_for_datagram(
1403 &mut self,
1404 allow_reliable: bool,
1405 datagram_size: usize,
1406 remaining_bytes_budget: usize,
1407 ) -> Option<QueuedFrame> {
1408 let mut deferred = Vec::new();
1409 let mut selected = None;
1410
1411 while let Some(candidate) = self.outgoing_heap.pop() {
1412 let fits_mtu = datagram_size.saturating_add(candidate.encoded_size) <= self.mtu;
1413 let fits_budget = candidate.encoded_size <= remaining_bytes_budget;
1414 let reliability_ok = allow_reliable || !candidate.is_reliable;
1415
1416 if fits_mtu && fits_budget && reliability_ok {
1417 selected = Some(candidate);
1418 break;
1419 }
1420
1421 deferred.push(candidate);
1422 }
1423
1424 for item in deferred {
1425 self.outgoing_heap.push(item);
1426 }
1427
1428 selected
1429 }
1430
1431 fn observe_rtt_sample(&mut self, sample: Duration) {
1432 let sample_ms = (sample.as_secs_f64() * 1000.0).max(1.0);
1433 match self.srtt_ms {
1434 None => {
1435 self.srtt_ms = Some(sample_ms);
1436 self.rttvar_ms = sample_ms / 2.0;
1437 }
1438 Some(srtt) => {
1439 let alpha = 0.125;
1440 let beta = 0.25;
1441 let variation = (srtt - sample_ms).abs();
1442 self.rttvar_ms = (1.0 - beta) * self.rttvar_ms + beta * variation;
1443 let next_srtt = (1.0 - alpha) * srtt + alpha * sample_ms;
1444 self.srtt_ms = Some(next_srtt);
1445 }
1446 }
1447
1448 self.recompute_rto_from_rtt();
1449 }
1450
1451 fn recompute_rto_from_rtt(&mut self) {
1452 let Some(srtt_ms) = self.srtt_ms else {
1453 return;
1454 };
1455
1456 let rto_ms = srtt_ms + (4.0 * self.rttvar_ms).max(10.0);
1457 let clamped = rto_ms.clamp(
1458 self.min_resend_rto.as_secs_f64() * 1000.0,
1459 self.max_resend_rto.as_secs_f64() * 1000.0,
1460 );
1461 self.resend_rto = Duration::from_secs_f64(clamped / 1000.0);
1462 }
1463
1464 fn on_reliable_ack(&mut self) {
1465 let mut additive = if self.congestion_window_packets < self.slow_start_threshold_packets {
1466 self.congestion_additive_gain.max(1.0)
1467 } else {
1468 self.congestion_additive_gain / self.congestion_window_packets.max(1.0)
1469 };
1470
1471 if let Some(srtt_ms) = self.srtt_ms
1472 && srtt_ms >= self.high_rtt_threshold_ms
1473 {
1474 additive *= self.high_rtt_additive_scale;
1475 }
1476
1477 additive = additive.max(0.001);
1478 self.congestion_window_packets = (self.congestion_window_packets + additive).clamp(
1479 self.min_congestion_window_packets,
1480 self.max_congestion_window_packets,
1481 );
1482 }
1483
1484 fn on_timeout(&mut self, now: Instant) {
1485 self.apply_loss_backoff(self.congestion_multiplicative_decrease_timeout);
1486 let backed_off = self.resend_rto.saturating_mul(2);
1487 self.resend_rto = backed_off.min(self.max_resend_rto);
1488 self.next_nack_loss_backoff = now + self.nack_loss_backoff_cooldown;
1489 }
1490
1491 fn on_nack_loss(&mut self, now: Instant) {
1492 if now < self.next_nack_loss_backoff {
1493 return;
1494 }
1495
1496 self.apply_loss_backoff(self.congestion_multiplicative_decrease_nack);
1497 self.next_nack_loss_backoff = now + self.nack_loss_backoff_cooldown;
1498 }
1499
1500 fn apply_loss_backoff(&mut self, factor: f64) {
1501 let factor = factor.clamp(0.1, 0.99);
1502 let reduced = (self.congestion_window_packets * factor).clamp(
1503 self.min_congestion_window_packets,
1504 self.max_congestion_window_packets,
1505 );
1506 self.slow_start_threshold_packets = reduced;
1507 self.congestion_window_packets = reduced;
1508 }
1509
1510 fn for_each_sequence<F>(range: SequenceRange, mut f: F)
1511 where
1512 F: FnMut(Sequence24),
1513 {
1514 let mut seq = range.start;
1515 loop {
1516 f(seq);
1517 if seq == range.end {
1518 break;
1519 }
1520 seq = seq.next();
1521 }
1522 }
1523}
1524
1525impl Drop for Session {
1526 fn drop(&mut self) {
1527 if self.best_effort_zeroize_dropped_payloads {
1528 self.best_effort_zeroize_buffered_payloads();
1529 }
1530 }
1531}
1532
1533fn best_effort_zeroize_bytes(payload: Bytes) -> bool {
1534 match payload.try_into_mut() {
1535 Ok(mut writable) => {
1536 writable.as_mut().zeroize();
1537 true
1538 }
1539 Err(_) => false,
1540 }
1541}
1542
1543#[cfg(test)]
1544mod tests {
1545 use std::time::{Duration, Instant};
1546
1547 use bytes::Bytes;
1548
1549 use super::{QueuePayloadResult, RakPriority, Session, SessionState};
1550 use crate::protocol::ack::{AckNackPayload, SequenceRange};
1551 use crate::protocol::datagram::DatagramPayload;
1552 use crate::protocol::frame::{Frame, SplitInfo};
1553 use crate::protocol::frame_header::FrameHeader;
1554 use crate::protocol::reliability::Reliability;
1555 use crate::protocol::sequence24::Sequence24;
1556 use crate::session::tunables::{
1557 AckNackFlushProfile, AckNackPriority, BackpressureMode, CongestionProfile, SessionTunables,
1558 };
1559
1560 fn transition_to_connected(session: &mut Session) {
1561 assert!(session.transition_to(SessionState::Req1Recv));
1562 assert!(session.transition_to(SessionState::Reply1Sent));
1563 assert!(session.transition_to(SessionState::Req2Recv));
1564 assert!(session.transition_to(SessionState::Reply2Sent));
1565 assert!(session.transition_to(SessionState::ConnReqRecv));
1566 assert!(session.transition_to(SessionState::ConnReqAcceptedSent));
1567 assert!(session.transition_to(SessionState::NewIncomingRecv));
1568 assert!(session.transition_to(SessionState::Connected));
1569 }
1570
1571 #[test]
1572 fn idle_tracking_updates_when_activity_touched() {
1573 let mut session = Session::new(1400);
1574 let now = Instant::now();
1575 let old = now
1576 .checked_sub(Duration::from_secs(8))
1577 .expect("instant subtraction must succeed");
1578 session.touch_activity(old);
1579 assert!(session.idle_for(now) >= Duration::from_secs(8));
1580
1581 session.touch_activity(now);
1582 assert!(session.idle_for(now) <= Duration::from_millis(1));
1583 }
1584
1585 #[test]
1586 fn keepalive_is_emitted_only_for_connected_idle_sessions() {
1587 let mut session = Session::new(1400);
1588 let now = Instant::now() + Duration::from_secs(6);
1589 let interval = Duration::from_secs(5);
1590
1591 let old = now
1592 .checked_sub(Duration::from_secs(6))
1593 .expect("instant subtraction must succeed");
1594 session.touch_activity(old);
1595 assert!(!session.should_send_keepalive(now, interval));
1596
1597 transition_to_connected(&mut session);
1598 assert!(session.should_send_keepalive(now, interval));
1599
1600 session.mark_keepalive_sent(now);
1601 assert!(!session.should_send_keepalive(now, interval));
1602 }
1603
1604 #[test]
1605 fn soft_backpressure_delays_unreliable_payloads() {
1606 let tunables = SessionTunables {
1607 outgoing_queue_max_frames: 4,
1608 outgoing_queue_max_bytes: 8 * 1024,
1609 outgoing_queue_soft_ratio: 0.5,
1610 ..SessionTunables::default()
1611 };
1612
1613 let mut session = Session::with_tunables(1400, tunables);
1614
1615 assert!(matches!(
1616 session.queue_payload(
1617 Bytes::from_static(b"a"),
1618 Reliability::Reliable,
1619 0,
1620 RakPriority::High
1621 ),
1622 QueuePayloadResult::Enqueued { .. }
1623 ));
1624 assert!(matches!(
1625 session.queue_payload(
1626 Bytes::from_static(b"b"),
1627 Reliability::Reliable,
1628 0,
1629 RakPriority::High
1630 ),
1631 QueuePayloadResult::Enqueued { .. }
1632 ));
1633
1634 assert!(matches!(
1635 session.queue_payload(
1636 Bytes::from_static(b"u"),
1637 Reliability::Unreliable,
1638 0,
1639 RakPriority::Low
1640 ),
1641 QueuePayloadResult::Deferred
1642 ));
1643
1644 let snapshot = session.metrics_snapshot();
1645 assert_eq!(snapshot.outgoing_queue_defers, 1);
1646 assert_eq!(snapshot.backpressure_delays, 1);
1647 }
1648
1649 #[test]
1650 fn hard_backpressure_disconnects_in_disconnect_mode() {
1651 let tunables = SessionTunables {
1652 outgoing_queue_max_frames: 1,
1653 outgoing_queue_max_bytes: 8 * 1024,
1654 outgoing_queue_soft_ratio: 0.5,
1655 backpressure_mode: BackpressureMode::Disconnect,
1656 ..SessionTunables::default()
1657 };
1658
1659 let mut session = Session::with_tunables(1400, tunables);
1660
1661 let _ = session.queue_payload(
1662 Bytes::from_static(b"a"),
1663 Reliability::Reliable,
1664 0,
1665 RakPriority::High,
1666 );
1667
1668 assert!(matches!(
1669 session.queue_payload(
1670 Bytes::from_static(b"b"),
1671 Reliability::Reliable,
1672 0,
1673 RakPriority::Immediate
1674 ),
1675 QueuePayloadResult::DisconnectRequested
1676 ));
1677 assert!(session.take_backpressure_disconnect());
1678 let snapshot = session.metrics_snapshot();
1679 assert_eq!(snapshot.backpressure_disconnects, 1);
1680 }
1681
1682 #[test]
1683 fn hard_backpressure_sheds_low_priority_in_shed_mode() {
1684 let tunables = SessionTunables {
1685 outgoing_queue_max_frames: 1,
1686 outgoing_queue_max_bytes: 8 * 1024,
1687 outgoing_queue_soft_ratio: 0.5,
1688 backpressure_mode: BackpressureMode::Shed,
1689 ..SessionTunables::default()
1690 };
1691 let mut session = Session::with_tunables(1400, tunables);
1692
1693 let _ = session.queue_payload(
1694 Bytes::from_static(b"a"),
1695 Reliability::Reliable,
1696 0,
1697 RakPriority::High,
1698 );
1699
1700 assert!(matches!(
1701 session.queue_payload(
1702 Bytes::from_static(b"b"),
1703 Reliability::Unreliable,
1704 0,
1705 RakPriority::Low
1706 ),
1707 QueuePayloadResult::Dropped
1708 ));
1709 assert!(!session.take_backpressure_disconnect());
1710 let snapshot = session.metrics_snapshot();
1711 assert_eq!(snapshot.backpressure_drops, 1);
1712 }
1713
1714 #[test]
1715 fn hard_backpressure_shed_mode_defers_high_priority_reliable() {
1716 let tunables = SessionTunables {
1717 outgoing_queue_max_frames: 1,
1718 outgoing_queue_max_bytes: 8 * 1024,
1719 outgoing_queue_soft_ratio: 0.5,
1720 backpressure_mode: BackpressureMode::Shed,
1721 ..SessionTunables::default()
1722 };
1723 let mut session = Session::with_tunables(1400, tunables);
1724
1725 let _ = session.queue_payload(
1726 Bytes::from_static(b"a"),
1727 Reliability::Reliable,
1728 0,
1729 RakPriority::High,
1730 );
1731
1732 assert!(matches!(
1733 session.queue_payload(
1734 Bytes::from_static(b"b"),
1735 Reliability::Reliable,
1736 0,
1737 RakPriority::Immediate
1738 ),
1739 QueuePayloadResult::Deferred
1740 ));
1741 assert!(!session.take_backpressure_disconnect());
1742 let snapshot = session.metrics_snapshot();
1743 assert_eq!(snapshot.backpressure_delays, 1);
1744 assert_eq!(snapshot.backpressure_drops, 0);
1745 }
1746
1747 #[test]
1748 fn hard_backpressure_delays_in_delay_mode() {
1749 let tunables = SessionTunables {
1750 outgoing_queue_max_frames: 1,
1751 outgoing_queue_max_bytes: 8 * 1024,
1752 outgoing_queue_soft_ratio: 0.5,
1753 backpressure_mode: BackpressureMode::Delay,
1754 ..SessionTunables::default()
1755 };
1756 let mut session = Session::with_tunables(1400, tunables);
1757
1758 let _ = session.queue_payload(
1759 Bytes::from_static(b"a"),
1760 Reliability::Reliable,
1761 0,
1762 RakPriority::High,
1763 );
1764
1765 assert!(matches!(
1766 session.queue_payload(
1767 Bytes::from_static(b"b"),
1768 Reliability::Reliable,
1769 0,
1770 RakPriority::Low
1771 ),
1772 QueuePayloadResult::Deferred
1773 ));
1774 assert!(!session.take_backpressure_disconnect());
1775 let snapshot = session.metrics_snapshot();
1776 assert_eq!(snapshot.backpressure_delays, 1);
1777 }
1778
1779 #[test]
1780 fn ack_updates_rtt_and_timeout_reduces_congestion() {
1781 let mut session = Session::new(1400);
1782 let now = Instant::now();
1783
1784 assert!(matches!(
1785 session.queue_payload(
1786 Bytes::from_static(b"payload"),
1787 Reliability::ReliableOrdered,
1788 0,
1789 RakPriority::High
1790 ),
1791 QueuePayloadResult::Enqueued { .. }
1792 ));
1793
1794 let sent = session
1795 .on_tick(now, 1, 1400, 0, 0)
1796 .into_iter()
1797 .next()
1798 .expect("data datagram must be produced");
1799 let seq = sent.header.sequence;
1800
1801 session.handle_ack_payload(AckNackPayload {
1802 ranges: vec![SequenceRange {
1803 start: seq,
1804 end: seq,
1805 }],
1806 });
1807 let _ = session.process_incoming_receipts(now + Duration::from_millis(120));
1808
1809 let after_ack = session.metrics_snapshot();
1810 assert!(after_ack.srtt_ms > 0.0);
1811 assert!(after_ack.resend_rto_ms >= 80.0);
1812
1813 let before_timeout_cwnd = after_ack.congestion_window_packets;
1814
1815 assert!(matches!(
1816 session.queue_payload(
1817 Bytes::from_static(b"resend"),
1818 Reliability::ReliableOrdered,
1819 0,
1820 RakPriority::High
1821 ),
1822 QueuePayloadResult::Enqueued { .. }
1823 ));
1824 let _ = session.on_tick(now + Duration::from_millis(125), 1, 1400, 0, 0);
1825
1826 let _ = session.collect_resendable(now + Duration::from_millis(1000), 8, usize::MAX);
1827 let after_timeout = session.metrics_snapshot();
1828 assert!(after_timeout.congestion_window_packets < before_timeout_cwnd);
1829 }
1830
1831 #[test]
1832 fn nack_backoff_cooldown_prevents_repeated_window_cuts() {
1833 let tunables = SessionTunables {
1834 congestion_profile: CongestionProfile::Custom,
1835 initial_congestion_window: 64.0,
1836 min_congestion_window: 8.0,
1837 max_congestion_window: 512.0,
1838 congestion_multiplicative_decrease_nack: 0.8,
1839 congestion_nack_backoff_cooldown: Duration::from_millis(200),
1840 ..SessionTunables::default()
1841 };
1842 let mut session = Session::with_tunables(200, tunables);
1843 let now = Instant::now();
1844
1845 assert!(matches!(
1846 session.queue_payload(
1847 Bytes::from(vec![0xA1; 150]),
1848 Reliability::ReliableOrdered,
1849 0,
1850 RakPriority::High
1851 ),
1852 QueuePayloadResult::Enqueued { .. }
1853 ));
1854 assert!(matches!(
1855 session.queue_payload(
1856 Bytes::from(vec![0xA2; 150]),
1857 Reliability::ReliableOrdered,
1858 0,
1859 RakPriority::High
1860 ),
1861 QueuePayloadResult::Enqueued { .. }
1862 ));
1863
1864 let sent = session.on_tick(now, 2, 64 * 1024, 0, 0);
1865 assert_eq!(
1866 sent.len(),
1867 2,
1868 "mtu=200 should emit one payload per datagram"
1869 );
1870 let seq_a = sent[0].header.sequence;
1871 let seq_b = sent[1].header.sequence;
1872
1873 let before = session.metrics_snapshot().congestion_window_packets;
1874
1875 session.handle_nack_payload(AckNackPayload {
1876 ranges: vec![SequenceRange {
1877 start: seq_a,
1878 end: seq_a,
1879 }],
1880 });
1881 let first = session.process_incoming_receipts(now + Duration::from_millis(1));
1882 assert_eq!(first.nacked, 1);
1883 let after_first = session.metrics_snapshot().congestion_window_packets;
1884 assert!(
1885 after_first < before,
1886 "first nack must reduce congestion window"
1887 );
1888
1889 session.handle_nack_payload(AckNackPayload {
1890 ranges: vec![SequenceRange {
1891 start: seq_b,
1892 end: seq_b,
1893 }],
1894 });
1895 let second = session.process_incoming_receipts(now + Duration::from_millis(10));
1896 assert_eq!(second.nacked, 1);
1897 let after_second = session.metrics_snapshot().congestion_window_packets;
1898 assert!(
1899 (after_second - after_first).abs() < 0.0001,
1900 "second nack within cooldown must not cut cwnd again"
1901 );
1902 }
1903
1904 #[test]
1905 fn timeout_backoff_is_stronger_than_nack_backoff() {
1906 let tunables = SessionTunables {
1907 congestion_profile: CongestionProfile::Custom,
1908 initial_congestion_window: 100.0,
1909 min_congestion_window: 8.0,
1910 max_congestion_window: 512.0,
1911 congestion_multiplicative_decrease_nack: 0.9,
1912 congestion_multiplicative_decrease_timeout: 0.5,
1913 ..SessionTunables::default()
1914 };
1915 let now = Instant::now();
1916
1917 let mut nack_session = Session::with_tunables(200, tunables.clone());
1918 let _ = nack_session.queue_payload(
1919 Bytes::from(vec![0xB1; 150]),
1920 Reliability::ReliableOrdered,
1921 0,
1922 RakPriority::High,
1923 );
1924 let sent_nack = nack_session.on_tick(now, 1, 64 * 1024, 0, 0);
1925 let seq_nack = sent_nack[0].header.sequence;
1926 nack_session.handle_nack_payload(AckNackPayload {
1927 ranges: vec![SequenceRange {
1928 start: seq_nack,
1929 end: seq_nack,
1930 }],
1931 });
1932 let _ = nack_session.process_incoming_receipts(now + Duration::from_millis(1));
1933 let cwnd_after_nack = nack_session.metrics_snapshot().congestion_window_packets;
1934
1935 let mut timeout_session = Session::with_tunables(200, tunables);
1936 let _ = timeout_session.queue_payload(
1937 Bytes::from(vec![0xC1; 150]),
1938 Reliability::ReliableOrdered,
1939 0,
1940 RakPriority::High,
1941 );
1942 let _ = timeout_session.on_tick(now, 1, 64 * 1024, 0, 0);
1943 let _ = timeout_session.collect_resendable(now + Duration::from_secs(2), 8, usize::MAX);
1944 let cwnd_after_timeout = timeout_session.metrics_snapshot().congestion_window_packets;
1945
1946 assert!(
1947 cwnd_after_timeout < cwnd_after_nack,
1948 "timeout loss must reduce cwnd more aggressively than nack loss"
1949 );
1950 }
1951
1952 #[test]
1953 fn pacing_budget_throttles_non_immediate_send_until_budget_refills() {
1954 let tunables = SessionTunables {
1955 pacing_enabled: true,
1956 pacing_start_full: false,
1957 pacing_min_rate_bytes_per_sec: 1024.0,
1958 pacing_max_rate_bytes_per_sec: 1024.0,
1959 pacing_max_burst_bytes: 1024,
1960 ..SessionTunables::default()
1961 };
1962 let mut session = Session::with_tunables(200, tunables);
1963 let now = Instant::now();
1964
1965 assert!(matches!(
1966 session.queue_payload(
1967 Bytes::from(vec![0xD1; 150]),
1968 Reliability::ReliableOrdered,
1969 0,
1970 RakPriority::High
1971 ),
1972 QueuePayloadResult::Enqueued { .. }
1973 ));
1974
1975 let blocked = session.on_tick(now, 1, 64 * 1024, 0, 0);
1976 assert!(
1977 blocked.is_empty(),
1978 "with almost zero burst budget, first send must be paced"
1979 );
1980
1981 let resumed = session.on_tick(now + Duration::from_millis(250), 1, 64 * 1024, 0, 0);
1982 assert_eq!(
1983 resumed.len(),
1984 1,
1985 "after budget refill, datagram must be sent"
1986 );
1987 let snapshot = session.metrics_snapshot();
1988 assert!(
1989 snapshot.pacing_rate_bytes_per_sec >= 1000.0,
1990 "pacing rate should be active in snapshot"
1991 );
1992 }
1993
1994 #[test]
1995 fn immediate_priority_can_bypass_empty_pacing_budget() {
1996 let tunables = SessionTunables {
1997 pacing_enabled: true,
1998 pacing_start_full: false,
1999 pacing_min_rate_bytes_per_sec: 1.0,
2000 pacing_max_rate_bytes_per_sec: 1.0,
2001 pacing_max_burst_bytes: 1,
2002 ..SessionTunables::default()
2003 };
2004 let mut session = Session::with_tunables(200, tunables);
2005 let now = Instant::now();
2006
2007 assert!(matches!(
2008 session.queue_payload(
2009 Bytes::from(vec![0xD2; 150]),
2010 Reliability::ReliableOrdered,
2011 0,
2012 RakPriority::Immediate
2013 ),
2014 QueuePayloadResult::Enqueued { .. }
2015 ));
2016
2017 let sent = session.on_tick(now, 1, 64 * 1024, 0, 0);
2018 assert_eq!(
2019 sent.len(),
2020 1,
2021 "immediate packet should bypass drained pacing budget once"
2022 );
2023 }
2024
2025 #[test]
2026 fn ack_receipt_id_is_reported_once_after_all_datagrams_are_acked() {
2027 let mut session = Session::new(1400);
2028 let now = Instant::now();
2029
2030 assert!(matches!(
2031 session.queue_payload_with_receipt(
2032 Bytes::from(vec![0xAA; 6000]),
2033 Reliability::ReliableOrdered,
2034 0,
2035 RakPriority::High,
2036 Some(42)
2037 ),
2038 QueuePayloadResult::Enqueued { .. }
2039 ));
2040
2041 let sent = session.on_tick(now, 16, usize::MAX, 0, 0);
2042 let mut data_sequences = Vec::new();
2043 for datagram in &sent {
2044 if matches!(
2045 datagram.payload,
2046 crate::protocol::datagram::DatagramPayload::Frames(_)
2047 ) {
2048 data_sequences.push(datagram.header.sequence);
2049 }
2050 }
2051 assert!(
2052 data_sequences.len() > 1,
2053 "split payload should span multiple datagrams"
2054 );
2055
2056 session.handle_ack_payload(AckNackPayload {
2057 ranges: vec![SequenceRange {
2058 start: data_sequences[0],
2059 end: data_sequences[0],
2060 }],
2061 });
2062 let first_progress = session.process_incoming_receipts(now + Duration::from_millis(100));
2063 assert!(first_progress.acked_receipt_ids.is_empty());
2064
2065 for seq in data_sequences.iter().skip(1) {
2066 session.handle_ack_payload(AckNackPayload {
2067 ranges: vec![SequenceRange {
2068 start: *seq,
2069 end: *seq,
2070 }],
2071 });
2072 }
2073 let second_progress = session.process_incoming_receipts(now + Duration::from_millis(120));
2074 assert_eq!(second_progress.acked_receipt_ids, vec![42]);
2075 }
2076
2077 #[test]
2078 fn prune_split_state_increments_split_ttl_drop_metrics() {
2079 let tunables = SessionTunables {
2080 split_ttl: Duration::from_millis(20),
2081 max_split_parts: 8,
2082 max_concurrent_splits: 8,
2083 ..SessionTunables::default()
2084 };
2085 let mut session = Session::with_tunables(1400, tunables);
2086 let now = Instant::now();
2087
2088 let split_frame = Frame {
2089 header: FrameHeader::new(Reliability::ReliableOrdered, true, false),
2090 bit_length: 8,
2091 reliable_index: None,
2092 sequence_index: None,
2093 ordering_index: None,
2094 ordering_channel: None,
2095 split: Some(SplitInfo {
2096 part_count: 2,
2097 part_id: 77,
2098 part_index: 0,
2099 }),
2100 payload: Bytes::from_static(b"a"),
2101 };
2102
2103 assert!(matches!(
2104 session.split_assembler.add(split_frame, now),
2105 Ok(None)
2106 ));
2107 assert_eq!(
2108 session.prune_split_state(now + Duration::from_millis(10)),
2109 0
2110 );
2111 assert_eq!(session.metrics_snapshot().split_ttl_drops, 0);
2112
2113 assert_eq!(
2114 session.prune_split_state(now + Duration::from_millis(30)),
2115 1
2116 );
2117 assert_eq!(session.metrics_snapshot().split_ttl_drops, 1);
2118 }
2119
2120 #[test]
2121 fn nack_marks_reliable_datagram_for_immediate_resend() {
2122 let mut session = Session::new(1400);
2123 let now = Instant::now();
2124
2125 assert!(matches!(
2126 session.queue_payload(
2127 Bytes::from_static(b"resend-me"),
2128 Reliability::ReliableOrdered,
2129 0,
2130 RakPriority::High
2131 ),
2132 QueuePayloadResult::Enqueued { .. }
2133 ));
2134
2135 let sent = session
2136 .on_tick(now, 1, usize::MAX, 0, 0)
2137 .into_iter()
2138 .next()
2139 .expect("reliable datagram should be emitted");
2140 let seq = sent.header.sequence;
2141
2142 session.handle_nack_payload(AckNackPayload {
2143 ranges: vec![SequenceRange {
2144 start: seq,
2145 end: seq,
2146 }],
2147 });
2148 let progress = session.process_incoming_receipts(now + Duration::from_millis(1));
2149 assert_eq!(progress.nacked, 1);
2150
2151 let resent = session.collect_resendable(now + Duration::from_millis(1), 8, usize::MAX);
2152 assert_eq!(resent.len(), 1);
2153 assert_eq!(resent[0].header.sequence, seq);
2154 }
2155
2156 #[test]
2157 fn nack_does_not_resend_unreliable_ack_receipt_datagrams() {
2158 let mut session = Session::new(1400);
2159 let now = Instant::now();
2160
2161 assert!(matches!(
2162 session.queue_payload_with_receipt(
2163 Bytes::from_static(b"fire-and-forget"),
2164 Reliability::UnreliableWithAckReceipt,
2165 0,
2166 RakPriority::Normal,
2167 Some(77)
2168 ),
2169 QueuePayloadResult::Enqueued { .. }
2170 ));
2171
2172 let sent = session
2173 .on_tick(now, 1, usize::MAX, 0, 0)
2174 .into_iter()
2175 .next()
2176 .expect("datagram should be emitted");
2177 let seq = sent.header.sequence;
2178
2179 session.handle_nack_payload(AckNackPayload {
2180 ranges: vec![SequenceRange {
2181 start: seq,
2182 end: seq,
2183 }],
2184 });
2185 let nack_progress = session.process_incoming_receipts(now + Duration::from_millis(1));
2186 assert_eq!(nack_progress.nacked, 0);
2187
2188 let resent = session.collect_resendable(now + Duration::from_millis(1), 8, usize::MAX);
2189 assert!(resent.is_empty());
2190
2191 session.handle_ack_payload(AckNackPayload {
2192 ranges: vec![SequenceRange {
2193 start: seq,
2194 end: seq,
2195 }],
2196 });
2197 let ack_progress = session.process_incoming_receipts(now + Duration::from_millis(10));
2198 assert_eq!(ack_progress.acked, 1);
2199 assert_eq!(ack_progress.acked_receipt_ids, vec![77]);
2200 }
2201
2202 #[test]
2203 fn multiple_receipt_ids_from_single_datagram_are_reported_once_each() {
2204 let mut session = Session::new(1400);
2205 let now = Instant::now();
2206
2207 assert!(matches!(
2208 session.queue_payload_with_receipt(
2209 Bytes::from_static(b"first"),
2210 Reliability::ReliableOrdered,
2211 0,
2212 RakPriority::High,
2213 Some(10)
2214 ),
2215 QueuePayloadResult::Enqueued { .. }
2216 ));
2217 assert!(matches!(
2218 session.queue_payload_with_receipt(
2219 Bytes::from_static(b"second"),
2220 Reliability::ReliableOrdered,
2221 0,
2222 RakPriority::High,
2223 Some(20)
2224 ),
2225 QueuePayloadResult::Enqueued { .. }
2226 ));
2227
2228 let sent = session
2229 .on_tick(now, 1, usize::MAX, 0, 0)
2230 .into_iter()
2231 .next()
2232 .expect("datagram should be emitted");
2233 let seq = sent.header.sequence;
2234
2235 session.handle_ack_payload(AckNackPayload {
2236 ranges: vec![SequenceRange {
2237 start: seq,
2238 end: seq,
2239 }],
2240 });
2241 let mut receipts = session
2242 .process_incoming_receipts(now + Duration::from_millis(5))
2243 .acked_receipt_ids;
2244 receipts.sort_unstable();
2245 assert_eq!(receipts, vec![10, 20]);
2246 }
2247
2248 #[test]
2249 fn ack_flush_interval_defers_ack_until_deadline() {
2250 let tunables = SessionTunables {
2251 ack_nack_flush_profile: AckNackFlushProfile::Custom,
2252 ack_flush_interval: Duration::from_millis(50),
2253 nack_flush_interval: Duration::from_millis(1),
2254 ack_max_ranges_per_datagram: 64,
2255 nack_max_ranges_per_datagram: 64,
2256 ack_nack_priority: AckNackPriority::NackFirst,
2257 ..SessionTunables::default()
2258 };
2259 let mut session = Session::with_tunables(1400, tunables);
2260 let now = Instant::now();
2261
2262 session.process_datagram_sequence(Sequence24::new(0), now);
2263 let early = session.on_tick(now + Duration::from_millis(10), 0, 0, 0, 0);
2264 assert!(
2265 early.is_empty(),
2266 "ack must not flush before configured interval"
2267 );
2268
2269 let due = session.on_tick(now + Duration::from_millis(50), 0, 0, 0, 0);
2270 assert_eq!(due.len(), 1, "ack should flush at deadline");
2271 assert!(
2272 matches!(due[0].payload, DatagramPayload::Ack(_)),
2273 "flushed control packet must be ACK"
2274 );
2275 }
2276
2277 #[test]
2278 fn ack_batch_max_ranges_splits_large_ack_queue_across_ticks() {
2279 let tunables = SessionTunables {
2280 ack_nack_flush_profile: AckNackFlushProfile::Custom,
2281 ack_flush_interval: Duration::from_millis(1),
2282 nack_flush_interval: Duration::from_millis(1),
2283 ack_max_ranges_per_datagram: 2,
2284 nack_max_ranges_per_datagram: 64,
2285 ack_nack_priority: AckNackPriority::NackFirst,
2286 ..SessionTunables::default()
2287 };
2288 let mut session = Session::with_tunables(1400, tunables);
2289 let now = Instant::now();
2290
2291 session.outgoing_acks.push(SequenceRange {
2292 start: Sequence24::new(1),
2293 end: Sequence24::new(1),
2294 });
2295 session.outgoing_acks.push(SequenceRange {
2296 start: Sequence24::new(3),
2297 end: Sequence24::new(3),
2298 });
2299 session.outgoing_acks.push(SequenceRange {
2300 start: Sequence24::new(5),
2301 end: Sequence24::new(5),
2302 });
2303
2304 let first = session.on_tick(now, 0, 0, 0, 0);
2305 assert_eq!(first.len(), 1);
2306 match &first[0].payload {
2307 DatagramPayload::Ack(payload) => assert_eq!(payload.ranges.len(), 2),
2308 _ => panic!("expected ACK payload"),
2309 }
2310
2311 let second = session.on_tick(now + Duration::from_millis(1), 0, 0, 0, 0);
2312 assert_eq!(second.len(), 1);
2313 match &second[0].payload {
2314 DatagramPayload::Ack(payload) => assert_eq!(payload.ranges.len(), 1),
2315 _ => panic!("expected ACK payload"),
2316 }
2317 }
2318
2319 #[test]
2320 fn nack_first_priority_flushes_nack_before_ack() {
2321 let mut session = Session::new(1400);
2322 let now = Instant::now();
2323
2324 session.process_datagram_sequence(Sequence24::new(2), now);
2325 let out = session.on_tick(now + Duration::from_millis(10), 0, 0, 0, 0);
2326 assert_eq!(out.len(), 2, "both NACK and ACK must be flushed");
2327 assert!(
2328 matches!(out[0].payload, DatagramPayload::Nack(_)),
2329 "NACK must be emitted before ACK when priority is NackFirst"
2330 );
2331 assert!(
2332 matches!(out[1].payload, DatagramPayload::Ack(_)),
2333 "ACK must be emitted after NACK"
2334 );
2335
2336 let snapshot = session.metrics_snapshot();
2337 assert_eq!(snapshot.ack_out_datagrams, 1);
2338 assert_eq!(snapshot.nack_out_datagrams, 1);
2339 }
2340
2341 #[test]
2342 fn ack_first_priority_flushes_ack_before_nack_in_custom_policy() {
2343 let tunables = SessionTunables {
2344 ack_nack_flush_profile: AckNackFlushProfile::Custom,
2345 ack_flush_interval: Duration::from_millis(1),
2346 nack_flush_interval: Duration::from_millis(1),
2347 ack_max_ranges_per_datagram: 64,
2348 nack_max_ranges_per_datagram: 64,
2349 ack_nack_priority: AckNackPriority::AckFirst,
2350 ..SessionTunables::default()
2351 };
2352 let mut session = Session::with_tunables(1400, tunables);
2353 let now = Instant::now();
2354
2355 session.process_datagram_sequence(Sequence24::new(2), now);
2356 let out = session.on_tick(now + Duration::from_millis(1), 0, 0, 0, 0);
2357 assert_eq!(out.len(), 2, "both ACK and NACK must be flushed");
2358 assert!(
2359 matches!(out[0].payload, DatagramPayload::Ack(_)),
2360 "ACK must be emitted before NACK when priority is AckFirst"
2361 );
2362 assert!(
2363 matches!(out[1].payload, DatagramPayload::Nack(_)),
2364 "NACK must be emitted after ACK"
2365 );
2366
2367 let snapshot = session.metrics_snapshot();
2368 assert_eq!(snapshot.ack_out_datagrams, 1);
2369 assert_eq!(snapshot.nack_out_datagrams, 1);
2370 }
2371
2372 #[test]
2373 fn best_effort_zeroize_bytes_reports_success_for_unique_buffer() {
2374 let payload = Bytes::from(vec![0xAA, 0xBB, 0xCC]);
2375 assert!(
2376 super::best_effort_zeroize_bytes(payload),
2377 "uniquely-owned buffer should be writable for zeroize"
2378 );
2379 }
2380
2381 #[test]
2382 fn best_effort_zeroize_bytes_reports_failure_for_shared_buffer() {
2383 let payload = Bytes::from(vec![0xAA, 0xBB, 0xCC]);
2384 let shared = payload.clone();
2385 assert!(
2386 !super::best_effort_zeroize_bytes(payload),
2387 "shared buffer cannot be zeroized in-place without unique ownership"
2388 );
2389 drop(shared);
2390 }
2391}