Skip to main content

raknet_rust/session/
mod.rs

1pub mod ack_queue;
2pub mod ordering_channels;
3pub mod priority;
4pub mod reliable_tracker;
5pub mod split_assembler;
6pub mod state;
7pub mod tunables;
8
9pub use priority::RakPriority;
10pub use state::SessionState;
11
12use std::cmp::Ordering;
13use std::collections::{BTreeMap, BinaryHeap, HashMap, VecDeque};
14use std::time::{Duration, Instant};
15
16use bytes::Bytes;
17use zeroize::Zeroize;
18
19use ack_queue::AckQueue;
20use ordering_channels::{OrderedResult, OrderingChannels, SequencedResult};
21use split_assembler::SplitAssembler;
22
23use crate::error::DecodeError;
24use crate::protocol::ack::{AckNackPayload, SequenceRange};
25use crate::protocol::constants::{DatagramFlags, MAX_ACK_SEQUENCES, RAKNET_DATAGRAM_HEADER_SIZE};
26use crate::protocol::datagram::{Datagram, DatagramHeader, DatagramPayload};
27use crate::protocol::frame::Frame;
28use crate::protocol::frame_header::FrameHeader;
29use crate::protocol::reliability::Reliability;
30use crate::protocol::sequence24::Sequence24;
31
32use self::reliable_tracker::ReliableTracker;
33use self::tunables::{AckNackPriority, BackpressureMode, SessionTunables};
34
35#[derive(Debug, Clone)]
36pub struct TrackedDatagram {
37    pub datagram: Datagram,
38    pub send_time: Instant,
39    pub next_send: Instant,
40    pub retries: u32,
41    pub nack_resend_pending: bool,
42    pub resendable: bool,
43    pub receipt_ids: Vec<u64>,
44}
45
46#[derive(Debug, Default, Clone)]
47pub struct ReceiptProgress {
48    pub acked: usize,
49    pub nacked: usize,
50    pub acked_receipt_ids: Vec<u64>,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum QueuePayloadResult {
55    Enqueued { reliable_bytes: usize },
56    Dropped,
57    Deferred,
58    DisconnectRequested,
59}
60
61#[derive(Debug, Default, Clone, Copy)]
62struct SessionMetrics {
63    ingress_datagrams: u64,
64    ingress_frames: u64,
65    duplicate_reliable_drops: u64,
66    ordered_stale_drops: u64,
67    ordered_buffer_full_drops: u64,
68    sequenced_stale_drops: u64,
69    sequenced_missing_index_drops: u64,
70    reliable_sent_datagrams: u64,
71    resent_datagrams: u64,
72    ack_out_datagrams: u64,
73    nack_out_datagrams: u64,
74    acked_datagrams: u64,
75    nacked_datagrams: u64,
76    split_ttl_drops: u64,
77    outgoing_queue_drops: u64,
78    outgoing_queue_defers: u64,
79    outgoing_queue_disconnects: u64,
80    backpressure_delays: u64,
81    backpressure_drops: u64,
82    backpressure_disconnects: u64,
83}
84
85#[derive(Debug, Default, Clone, Copy)]
86pub struct SessionMetricsSnapshot {
87    pub ingress_datagrams: u64,
88    pub ingress_frames: u64,
89    pub duplicate_reliable_drops: u64,
90    pub ordered_stale_drops: u64,
91    pub ordered_buffer_full_drops: u64,
92    pub sequenced_stale_drops: u64,
93    pub sequenced_missing_index_drops: u64,
94    pub reliable_sent_datagrams: u64,
95    pub resent_datagrams: u64,
96    pub ack_out_datagrams: u64,
97    pub nack_out_datagrams: u64,
98    pub acked_datagrams: u64,
99    pub nacked_datagrams: u64,
100    pub split_ttl_drops: u64,
101    pub pending_outgoing_frames: usize,
102    pub pending_outgoing_bytes: usize,
103    pub outgoing_queue_drops: u64,
104    pub outgoing_queue_defers: u64,
105    pub outgoing_queue_disconnects: u64,
106    pub backpressure_delays: u64,
107    pub backpressure_drops: u64,
108    pub backpressure_disconnects: u64,
109    pub srtt_ms: f64,
110    pub rttvar_ms: f64,
111    pub resend_rto_ms: f64,
112    pub congestion_window_packets: f64,
113    pub resend_ratio: f64,
114    pub pacing_budget_bytes: f64,
115    pub pacing_rate_bytes_per_sec: f64,
116}
117
118#[derive(Debug, Clone)]
119struct QueuedFrame {
120    weight: u64,
121    encoded_size: usize,
122    is_reliable: bool,
123    priority: RakPriority,
124    receipt_id: Option<u64>,
125    frame: Frame,
126}
127
128#[derive(Debug, Clone, Copy, PartialEq, Eq)]
129enum BackpressureAction {
130    Allow,
131    Drop,
132    Defer,
133    Disconnect,
134}
135
136impl PartialEq for QueuedFrame {
137    fn eq(&self, other: &Self) -> bool {
138        self.weight == other.weight
139    }
140}
141
142impl Eq for QueuedFrame {}
143
144impl PartialOrd for QueuedFrame {
145    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
146        Some(self.cmp(other))
147    }
148}
149
150impl Ord for QueuedFrame {
151    fn cmp(&self, other: &Self) -> Ordering {
152        // Lower weight is higher priority.
153        other.weight.cmp(&self.weight)
154    }
155}
156
157pub struct Session {
158    state: SessionState,
159    mtu: usize,
160    last_activity: Instant,
161    last_keepalive_sent: Instant,
162    datagram_read_index: Sequence24,
163    datagram_write_index: Sequence24,
164    reliable_write_index: Sequence24,
165    split_write_index: u16,
166    ordering_write_index: Vec<Sequence24>,
167    sequencing_write_index: Vec<Sequence24>,
168    reliable_tracker: ReliableTracker,
169    split_assembler: SplitAssembler,
170    ordering: OrderingChannels,
171    outgoing_heap: BinaryHeap<QueuedFrame>,
172    outgoing_next_weights: [u64; 4],
173    last_min_weight: u64,
174    outgoing_acks: AckQueue,
175    outgoing_nacks: AckQueue,
176    ack_flush_interval: Duration,
177    nack_flush_interval: Duration,
178    ack_max_ranges_per_datagram: usize,
179    nack_max_ranges_per_datagram: usize,
180    ack_nack_priority: AckNackPriority,
181    next_ack_flush_at: Instant,
182    next_nack_flush_at: Instant,
183    incoming_acks: VecDeque<SequenceRange>,
184    incoming_nacks: VecDeque<SequenceRange>,
185    sent_datagrams: BTreeMap<Sequence24, TrackedDatagram>,
186    receipt_tracking: HashMap<u64, usize>,
187    resend_rto: Duration,
188    min_resend_rto: Duration,
189    max_resend_rto: Duration,
190    srtt_ms: Option<f64>,
191    rttvar_ms: f64,
192    congestion_window_packets: f64,
193    min_congestion_window_packets: f64,
194    max_congestion_window_packets: f64,
195    slow_start_threshold_packets: f64,
196    congestion_additive_gain: f64,
197    congestion_multiplicative_decrease_nack: f64,
198    congestion_multiplicative_decrease_timeout: f64,
199    high_rtt_threshold_ms: f64,
200    high_rtt_additive_scale: f64,
201    nack_loss_backoff_cooldown: Duration,
202    next_nack_loss_backoff: Instant,
203    pacing_enabled: bool,
204    pacing_gain: f64,
205    pacing_min_rate_bytes_per_sec: f64,
206    pacing_max_rate_bytes_per_sec: f64,
207    pacing_budget_max_bytes: f64,
208    pacing_budget_bytes: f64,
209    pacing_rate_bytes_per_sec: f64,
210    last_pacing_update: Instant,
211    outgoing_queued_bytes: usize,
212    outgoing_queue_max_frames: usize,
213    outgoing_queue_max_bytes: usize,
214    outgoing_queue_soft_ratio: f64,
215    backpressure_mode: BackpressureMode,
216    best_effort_zeroize_dropped_payloads: bool,
217    disconnect_requested_by_backpressure: bool,
218    metrics: SessionMetrics,
219}
220
221impl Session {
222    pub fn new(mtu: usize) -> Self {
223        Self::with_tunables(mtu, SessionTunables::default())
224    }
225
226    pub fn with_tunables(mtu: usize, tunables: SessionTunables) -> Self {
227        let now = Instant::now();
228        let congestion = tunables.resolved_congestion_settings();
229        let min_cwnd = congestion.min_congestion_window.max(1.0);
230        let max_cwnd = congestion.max_congestion_window.max(min_cwnd);
231        let initial_cwnd = congestion
232            .initial_congestion_window
233            .clamp(min_cwnd, max_cwnd)
234            .max(1.0);
235        let slow_start_threshold = congestion
236            .congestion_slow_start_threshold
237            .clamp(min_cwnd, max_cwnd);
238        let pacing_min_rate = tunables.pacing_min_rate_bytes_per_sec.max(1.0);
239        let pacing_max_rate = tunables.pacing_max_rate_bytes_per_sec.max(pacing_min_rate);
240        let pacing_budget_max = tunables.pacing_max_burst_bytes.max(1) as f64;
241        let initial_pacing_budget = if tunables.pacing_start_full {
242            pacing_budget_max
243        } else {
244            0.0
245        };
246        let ack_nack_flush = tunables.resolved_ack_nack_flush_settings();
247        let mut s = Self {
248            state: SessionState::Offline,
249            mtu,
250            last_activity: now,
251            last_keepalive_sent: now,
252            datagram_read_index: Sequence24::new(0),
253            datagram_write_index: Sequence24::new(0),
254            reliable_write_index: Sequence24::new(0),
255            split_write_index: 0,
256            ordering_write_index: vec![Sequence24::new(0); 16],
257            sequencing_write_index: vec![Sequence24::new(0); 16],
258            reliable_tracker: ReliableTracker::new(tunables.reliable_window),
259            split_assembler: SplitAssembler::new(
260                tunables.split_ttl,
261                tunables.max_split_parts,
262                tunables.max_concurrent_splits,
263            ),
264            ordering: OrderingChannels::new(
265                tunables.max_ordering_channels,
266                tunables.max_ordered_pending_per_channel,
267                tunables.max_order_gap,
268            ),
269            outgoing_heap: BinaryHeap::new(),
270            outgoing_next_weights: [0; 4],
271            last_min_weight: 0,
272            outgoing_acks: AckQueue::new(tunables.ack_queue_capacity),
273            outgoing_nacks: AckQueue::new(tunables.ack_queue_capacity),
274            ack_flush_interval: ack_nack_flush.ack_flush_interval,
275            nack_flush_interval: ack_nack_flush.nack_flush_interval,
276            ack_max_ranges_per_datagram: ack_nack_flush.ack_max_ranges_per_datagram,
277            nack_max_ranges_per_datagram: ack_nack_flush.nack_max_ranges_per_datagram,
278            ack_nack_priority: ack_nack_flush.ack_nack_priority,
279            next_ack_flush_at: now,
280            next_nack_flush_at: now,
281            incoming_acks: VecDeque::new(),
282            incoming_nacks: VecDeque::new(),
283            sent_datagrams: BTreeMap::new(),
284            receipt_tracking: HashMap::new(),
285            resend_rto: congestion
286                .resend_rto
287                .clamp(congestion.min_resend_rto, congestion.max_resend_rto),
288            min_resend_rto: congestion.min_resend_rto,
289            max_resend_rto: congestion.max_resend_rto,
290            srtt_ms: None,
291            rttvar_ms: 0.0,
292            congestion_window_packets: initial_cwnd,
293            min_congestion_window_packets: min_cwnd,
294            max_congestion_window_packets: max_cwnd,
295            slow_start_threshold_packets: slow_start_threshold,
296            congestion_additive_gain: congestion.congestion_additive_gain.max(0.01),
297            congestion_multiplicative_decrease_nack: congestion
298                .congestion_multiplicative_decrease_nack
299                .clamp(0.1, 0.99),
300            congestion_multiplicative_decrease_timeout: congestion
301                .congestion_multiplicative_decrease_timeout
302                .clamp(0.1, 0.99),
303            high_rtt_threshold_ms: congestion.congestion_high_rtt_threshold_ms.max(1.0),
304            high_rtt_additive_scale: congestion
305                .congestion_high_rtt_additive_scale
306                .clamp(0.05, 1.0),
307            nack_loss_backoff_cooldown: congestion
308                .congestion_nack_backoff_cooldown
309                .max(Duration::from_millis(1)),
310            next_nack_loss_backoff: now,
311            pacing_enabled: tunables.pacing_enabled,
312            pacing_gain: tunables.pacing_gain.max(0.05),
313            pacing_min_rate_bytes_per_sec: pacing_min_rate,
314            pacing_max_rate_bytes_per_sec: pacing_max_rate,
315            pacing_budget_max_bytes: pacing_budget_max,
316            pacing_budget_bytes: initial_pacing_budget,
317            pacing_rate_bytes_per_sec: pacing_min_rate,
318            last_pacing_update: now,
319            outgoing_queued_bytes: 0,
320            outgoing_queue_max_frames: tunables.outgoing_queue_max_frames.max(1),
321            outgoing_queue_max_bytes: tunables.outgoing_queue_max_bytes.max(1),
322            outgoing_queue_soft_ratio: tunables.outgoing_queue_soft_ratio.clamp(0.05, 0.99),
323            backpressure_mode: tunables.backpressure_mode,
324            best_effort_zeroize_dropped_payloads: tunables.best_effort_zeroize_dropped_payloads,
325            disconnect_requested_by_backpressure: false,
326            metrics: SessionMetrics::default(),
327        };
328
329        for level in 0..4 {
330            s.outgoing_next_weights[level] = ((1u64 << level) * level as u64) + level as u64;
331        }
332
333        s
334    }
335
336    pub fn state(&self) -> SessionState {
337        self.state
338    }
339
340    pub fn transition_to(&mut self, next: SessionState) -> bool {
341        if self.state.can_transition_to(next) {
342            self.state = next;
343            true
344        } else {
345            false
346        }
347    }
348
349    pub fn mtu(&self) -> usize {
350        self.mtu
351    }
352
353    pub fn set_mtu(&mut self, mtu: usize) {
354        self.mtu = mtu;
355    }
356
357    pub fn touch_activity(&mut self, now: Instant) {
358        self.last_activity = now;
359    }
360
361    pub fn idle_for(&self, now: Instant) -> Duration {
362        now.saturating_duration_since(self.last_activity)
363    }
364
365    pub fn should_send_keepalive(&self, now: Instant, interval: Duration) -> bool {
366        if self.state != SessionState::Connected || interval.is_zero() {
367            return false;
368        }
369
370        self.idle_for(now) >= interval
371            && now.saturating_duration_since(self.last_keepalive_sent) >= interval
372    }
373
374    pub fn mark_keepalive_sent(&mut self, now: Instant) {
375        self.last_keepalive_sent = now;
376    }
377
378    pub fn next_datagram_sequence(&mut self) -> Sequence24 {
379        let seq = self.datagram_write_index;
380        self.datagram_write_index = self.datagram_write_index.next();
381        seq
382    }
383
384    pub fn pending_outgoing_frames(&self) -> usize {
385        self.outgoing_heap.len()
386    }
387
388    pub fn pending_outgoing_bytes(&self) -> usize {
389        self.outgoing_queued_bytes
390    }
391
392    pub fn force_control_flush_deadlines(&mut self, now: Instant) {
393        if !self.outgoing_acks.is_empty() {
394            self.next_ack_flush_at = now;
395        }
396        if !self.outgoing_nacks.is_empty() {
397            self.next_nack_flush_at = now;
398        }
399    }
400
401    pub fn take_backpressure_disconnect(&mut self) -> bool {
402        let should_disconnect = self.disconnect_requested_by_backpressure;
403        self.disconnect_requested_by_backpressure = false;
404        should_disconnect
405    }
406
407    pub fn metrics_snapshot(&self) -> SessionMetricsSnapshot {
408        let resend_ratio = if self.metrics.reliable_sent_datagrams == 0 {
409            0.0
410        } else {
411            self.metrics.resent_datagrams as f64 / self.metrics.reliable_sent_datagrams as f64
412        };
413
414        SessionMetricsSnapshot {
415            ingress_datagrams: self.metrics.ingress_datagrams,
416            ingress_frames: self.metrics.ingress_frames,
417            duplicate_reliable_drops: self.metrics.duplicate_reliable_drops,
418            ordered_stale_drops: self.metrics.ordered_stale_drops,
419            ordered_buffer_full_drops: self.metrics.ordered_buffer_full_drops,
420            sequenced_stale_drops: self.metrics.sequenced_stale_drops,
421            sequenced_missing_index_drops: self.metrics.sequenced_missing_index_drops,
422            reliable_sent_datagrams: self.metrics.reliable_sent_datagrams,
423            resent_datagrams: self.metrics.resent_datagrams,
424            ack_out_datagrams: self.metrics.ack_out_datagrams,
425            nack_out_datagrams: self.metrics.nack_out_datagrams,
426            acked_datagrams: self.metrics.acked_datagrams,
427            nacked_datagrams: self.metrics.nacked_datagrams,
428            split_ttl_drops: self.metrics.split_ttl_drops,
429            pending_outgoing_frames: self.pending_outgoing_frames(),
430            pending_outgoing_bytes: self.pending_outgoing_bytes(),
431            outgoing_queue_drops: self.metrics.outgoing_queue_drops,
432            outgoing_queue_defers: self.metrics.outgoing_queue_defers,
433            outgoing_queue_disconnects: self.metrics.outgoing_queue_disconnects,
434            backpressure_delays: self.metrics.backpressure_delays,
435            backpressure_drops: self.metrics.backpressure_drops,
436            backpressure_disconnects: self.metrics.backpressure_disconnects,
437            srtt_ms: self.srtt_ms.unwrap_or(0.0),
438            rttvar_ms: self.rttvar_ms,
439            resend_rto_ms: self.resend_rto.as_secs_f64() * 1000.0,
440            congestion_window_packets: self.congestion_window_packets,
441            resend_ratio,
442            pacing_budget_bytes: self.pacing_budget_bytes,
443            pacing_rate_bytes_per_sec: self.pacing_rate_bytes_per_sec,
444        }
445    }
446
447    pub fn process_datagram_sequence(&mut self, seq: Sequence24, now: Instant) {
448        let expected = self.datagram_read_index;
449
450        if expected > seq {
451            let ack_was_empty = self.outgoing_acks.is_empty();
452            self.outgoing_acks.push(SequenceRange {
453                start: seq,
454                end: seq,
455            });
456            if ack_was_empty {
457                self.next_ack_flush_at = now + self.ack_flush_interval;
458            }
459            return;
460        }
461
462        self.datagram_read_index = seq.next();
463
464        if seq == expected {
465            let ack_was_empty = self.outgoing_acks.is_empty();
466            self.outgoing_acks.push(SequenceRange {
467                start: seq,
468                end: seq,
469            });
470            if ack_was_empty {
471                self.next_ack_flush_at = now + self.ack_flush_interval;
472            }
473            return;
474        }
475
476        let mut nack_start = expected;
477        let nack_end = seq.prev();
478
479        loop {
480            let mut chunk_end = nack_start;
481            let mut count = 0;
482
483            while count < (MAX_ACK_SEQUENCES - 1) && chunk_end < nack_end {
484                chunk_end = chunk_end.next();
485                count += 1;
486            }
487
488            let nack_was_empty = self.outgoing_nacks.is_empty();
489            self.outgoing_nacks.push(SequenceRange {
490                start: nack_start,
491                end: chunk_end,
492            });
493            if nack_was_empty {
494                self.next_nack_flush_at = now + self.nack_flush_interval;
495            }
496
497            if chunk_end == nack_end {
498                break;
499            }
500
501            nack_start = chunk_end.next();
502        }
503
504        let ack_was_empty = self.outgoing_acks.is_empty();
505        self.outgoing_acks.push(SequenceRange {
506            start: seq,
507            end: seq,
508        });
509        if ack_was_empty {
510            self.next_ack_flush_at = now + self.ack_flush_interval;
511        }
512    }
513
514    pub fn ingest_datagram(
515        &mut self,
516        datagram: Datagram,
517        now: Instant,
518    ) -> Result<Vec<Frame>, DecodeError> {
519        self.metrics.ingress_datagrams = self.metrics.ingress_datagrams.saturating_add(1);
520
521        match datagram.payload {
522            DatagramPayload::Ack(payload) => {
523                self.handle_ack_payload(payload);
524                Ok(Vec::new())
525            }
526            DatagramPayload::Nack(payload) => {
527                self.handle_nack_payload(payload);
528                Ok(Vec::new())
529            }
530            DatagramPayload::Frames(frames) => {
531                self.metrics.ingress_frames = self
532                    .metrics
533                    .ingress_frames
534                    .saturating_add(frames.len() as u64);
535                self.process_datagram_sequence(datagram.header.sequence, now);
536                self.handle_frames(frames, now)
537            }
538        }
539    }
540
541    pub fn handle_ack_payload(&mut self, payload: AckNackPayload) {
542        self.incoming_acks.extend(payload.ranges);
543    }
544
545    pub fn handle_nack_payload(&mut self, payload: AckNackPayload) {
546        self.incoming_nacks.extend(payload.ranges);
547    }
548
549    fn handle_frames(
550        &mut self,
551        frames: Vec<Frame>,
552        now: Instant,
553    ) -> Result<Vec<Frame>, DecodeError> {
554        let mut out = Vec::new();
555
556        for frame in frames {
557            let is_split = frame.header.is_split;
558            let should_drop_duplicate = frame.header.reliability.is_reliable() && !is_split;
559
560            if should_drop_duplicate
561                && let Some(ridx) = frame.reliable_index
562                && self.reliable_tracker.has_seen(ridx)
563            {
564                self.metrics.duplicate_reliable_drops =
565                    self.metrics.duplicate_reliable_drops.saturating_add(1);
566                continue;
567            }
568
569            let assembled = self.split_assembler.add(frame, now)?;
570            let Some(frame) = assembled else {
571                continue;
572            };
573
574            if !is_split
575                && frame.header.reliability.is_reliable()
576                && let Some(ridx) = frame.reliable_index
577            {
578                let _ = self.reliable_tracker.see(ridx);
579            }
580
581            if frame.header.reliability.is_sequenced() {
582                match self.ordering.handle_sequenced(&frame) {
583                    SequencedResult::Accept => out.push(frame),
584                    SequencedResult::DropMissingSequence => {
585                        self.metrics.sequenced_missing_index_drops =
586                            self.metrics.sequenced_missing_index_drops.saturating_add(1);
587                    }
588                    SequencedResult::DropStale => {
589                        self.metrics.sequenced_stale_drops =
590                            self.metrics.sequenced_stale_drops.saturating_add(1);
591                    }
592                }
593                continue;
594            }
595
596            if frame.header.reliability.is_ordered() {
597                match self.ordering.handle_ordered(frame) {
598                    OrderedResult::Ready(mut ready) => out.append(&mut ready),
599                    OrderedResult::Buffered => {}
600                    OrderedResult::DroppedStale => {
601                        self.metrics.ordered_stale_drops =
602                            self.metrics.ordered_stale_drops.saturating_add(1);
603                    }
604                    OrderedResult::DroppedBufferFull => {
605                        self.metrics.ordered_buffer_full_drops =
606                            self.metrics.ordered_buffer_full_drops.saturating_add(1);
607                    }
608                }
609                continue;
610            }
611
612            out.push(frame);
613        }
614
615        Ok(out)
616    }
617
618    pub fn drain_ack_datagram(&mut self, now: Instant) -> Option<Datagram> {
619        let ranges = self
620            .outgoing_acks
621            .pop_for_mtu(self.mtu, 3, self.ack_max_ranges_per_datagram);
622        if ranges.is_empty() {
623            return None;
624        }
625        self.next_ack_flush_at = now + self.ack_flush_interval;
626        self.metrics.ack_out_datagrams = self.metrics.ack_out_datagrams.saturating_add(1);
627
628        Some(Datagram {
629            header: DatagramHeader {
630                flags: DatagramFlags::VALID | DatagramFlags::ACK,
631                sequence: Sequence24::new(0),
632            },
633            payload: DatagramPayload::Ack(AckNackPayload { ranges }),
634        })
635    }
636
637    pub fn drain_nack_datagram(&mut self, now: Instant) -> Option<Datagram> {
638        let ranges =
639            self.outgoing_nacks
640                .pop_for_mtu(self.mtu, 3, self.nack_max_ranges_per_datagram);
641        if ranges.is_empty() {
642            return None;
643        }
644        self.next_nack_flush_at = now + self.nack_flush_interval;
645        self.metrics.nack_out_datagrams = self.metrics.nack_out_datagrams.saturating_add(1);
646
647        Some(Datagram {
648            header: DatagramHeader {
649                flags: DatagramFlags::VALID | DatagramFlags::NACK,
650                sequence: Sequence24::new(0),
651            },
652            payload: DatagramPayload::Nack(AckNackPayload { ranges }),
653        })
654    }
655
656    pub fn track_sent_reliable_datagram(
657        &mut self,
658        datagram: Datagram,
659        now: Instant,
660        receipt_ids: Vec<u64>,
661    ) {
662        let seq = datagram.header.sequence;
663        let has_reliable = match &datagram.payload {
664            DatagramPayload::Frames(frames) => {
665                frames.iter().any(|f| f.header.reliability.is_reliable())
666            }
667            DatagramPayload::Ack(_) | DatagramPayload::Nack(_) => false,
668        };
669        let has_ack_receipt = match &datagram.payload {
670            DatagramPayload::Frames(frames) => frames
671                .iter()
672                .any(|f| f.header.reliability.is_with_ack_receipt()),
673            DatagramPayload::Ack(_) | DatagramPayload::Nack(_) => false,
674        };
675
676        if !has_reliable && !has_ack_receipt && receipt_ids.is_empty() {
677            return;
678        }
679
680        if has_reliable {
681            self.metrics.reliable_sent_datagrams =
682                self.metrics.reliable_sent_datagrams.saturating_add(1);
683        }
684
685        for receipt_id in &receipt_ids {
686            let counter = self.receipt_tracking.entry(*receipt_id).or_insert(0);
687            *counter = counter.saturating_add(1);
688        }
689
690        self.sent_datagrams.insert(
691            seq,
692            TrackedDatagram {
693                datagram,
694                send_time: now,
695                next_send: now + self.resend_rto,
696                retries: 0,
697                nack_resend_pending: false,
698                resendable: has_reliable,
699                receipt_ids,
700            },
701        );
702    }
703
704    pub fn queue_payload(
705        &mut self,
706        payload: Bytes,
707        reliability: Reliability,
708        channel: u8,
709        priority: RakPriority,
710    ) -> QueuePayloadResult {
711        self.queue_payload_with_receipt(payload, reliability, channel, priority, None)
712    }
713
714    pub fn queue_payload_with_receipt(
715        &mut self,
716        payload: Bytes,
717        reliability: Reliability,
718        channel: u8,
719        priority: RakPriority,
720        receipt_id: Option<u64>,
721    ) -> QueuePayloadResult {
722        let (estimated_frames, estimated_bytes, effective_reliability) =
723            self.estimate_queue_impact(payload.len(), reliability);
724
725        match self.evaluate_backpressure(
726            estimated_frames,
727            estimated_bytes,
728            effective_reliability,
729            priority,
730        ) {
731            BackpressureAction::Allow => {}
732            BackpressureAction::Drop => {
733                if self.best_effort_zeroize_dropped_payloads {
734                    let _ = best_effort_zeroize_bytes(payload);
735                }
736                self.metrics.outgoing_queue_drops =
737                    self.metrics.outgoing_queue_drops.saturating_add(1);
738                self.metrics.backpressure_drops = self.metrics.backpressure_drops.saturating_add(1);
739                return QueuePayloadResult::Dropped;
740            }
741            BackpressureAction::Defer => {
742                if self.best_effort_zeroize_dropped_payloads {
743                    let _ = best_effort_zeroize_bytes(payload);
744                }
745                self.metrics.outgoing_queue_defers =
746                    self.metrics.outgoing_queue_defers.saturating_add(1);
747                self.metrics.backpressure_delays =
748                    self.metrics.backpressure_delays.saturating_add(1);
749                return QueuePayloadResult::Deferred;
750            }
751            BackpressureAction::Disconnect => {
752                if self.best_effort_zeroize_dropped_payloads {
753                    let _ = best_effort_zeroize_bytes(payload);
754                }
755                self.metrics.outgoing_queue_disconnects =
756                    self.metrics.outgoing_queue_disconnects.saturating_add(1);
757                self.metrics.backpressure_disconnects =
758                    self.metrics.backpressure_disconnects.saturating_add(1);
759                self.disconnect_requested_by_backpressure = true;
760                return QueuePayloadResult::DisconnectRequested;
761            }
762        }
763
764        let max_single = self.max_payload_for(reliability, false);
765        if payload.len() <= max_single {
766            let reliable_bytes =
767                self.enqueue_single_frame(payload, reliability, channel, priority, receipt_id);
768            return QueuePayloadResult::Enqueued { reliable_bytes };
769        }
770
771        let reliable_bytes =
772            self.enqueue_split_frames(payload, reliability, channel, priority, receipt_id);
773        QueuePayloadResult::Enqueued { reliable_bytes }
774    }
775
776    pub fn process_incoming_receipts(&mut self, now: Instant) -> ReceiptProgress {
777        let mut progress = ReceiptProgress::default();
778
779        while let Some(range) = self.incoming_acks.pop_front() {
780            Self::for_each_sequence(range, |seq| {
781                if let Some(acked) = self.sent_datagrams.remove(&seq) {
782                    progress.acked += 1;
783                    self.on_reliable_ack();
784                    if acked.retries == 0 {
785                        let rtt_sample = now.saturating_duration_since(acked.send_time);
786                        self.observe_rtt_sample(rtt_sample);
787                    }
788
789                    for receipt_id in acked.receipt_ids {
790                        if let Some(pending) = self.receipt_tracking.get_mut(&receipt_id) {
791                            if *pending > 1 {
792                                *pending -= 1;
793                            } else {
794                                self.receipt_tracking.remove(&receipt_id);
795                                progress.acked_receipt_ids.push(receipt_id);
796                            }
797                        }
798                    }
799                }
800            });
801        }
802        self.metrics.acked_datagrams = self
803            .metrics
804            .acked_datagrams
805            .saturating_add(progress.acked as u64);
806
807        while let Some(range) = self.incoming_nacks.pop_front() {
808            Self::for_each_sequence(range, |seq| {
809                if let Some(entry) = self.sent_datagrams.get_mut(&seq)
810                    && entry.resendable
811                    && entry.next_send > now
812                {
813                    entry.next_send = now;
814                    entry.nack_resend_pending = true;
815                    progress.nacked += 1;
816                }
817            });
818        }
819        if progress.nacked > 0 {
820            self.on_nack_loss(now);
821        }
822        self.metrics.nacked_datagrams = self
823            .metrics
824            .nacked_datagrams
825            .saturating_add(progress.nacked as u64);
826
827        progress
828    }
829
830    pub fn collect_resendable(
831        &mut self,
832        now: Instant,
833        max_count: usize,
834        max_bytes: usize,
835    ) -> Vec<Datagram> {
836        let mut total_bytes = 0usize;
837        let mut selected = Vec::new();
838        let mut timeout_loss_observed = false;
839
840        for (&seq, tracked) in &self.sent_datagrams {
841            if selected.len() >= max_count {
842                break;
843            }
844            if !tracked.resendable {
845                continue;
846            }
847            if tracked.next_send > now {
848                continue;
849            }
850
851            let size = tracked.datagram.encoded_size();
852            if total_bytes + size > max_bytes {
853                break;
854            }
855
856            total_bytes += size;
857            timeout_loss_observed |= !tracked.nack_resend_pending;
858            selected.push(seq);
859        }
860
861        if selected.is_empty() {
862            return Vec::new();
863        }
864
865        if timeout_loss_observed {
866            self.on_timeout(now);
867        }
868
869        let next_send_at = now + self.resend_rto;
870        let mut out = Vec::with_capacity(selected.len());
871        for seq in selected {
872            let Some(tracked) = self.sent_datagrams.get_mut(&seq) else {
873                continue;
874            };
875            tracked.send_time = now;
876            tracked.next_send = next_send_at;
877            tracked.retries = tracked.retries.saturating_add(1);
878            tracked.nack_resend_pending = false;
879            out.push(tracked.datagram.clone());
880            self.metrics.resent_datagrams = self.metrics.resent_datagrams.saturating_add(1);
881        }
882
883        out
884    }
885
886    pub fn build_data_datagram(
887        &mut self,
888        now: Instant,
889        remaining_bytes_budget: &mut usize,
890    ) -> Option<Datagram> {
891        if self.outgoing_heap.is_empty() || *remaining_bytes_budget == 0 {
892            return None;
893        }
894
895        let mut frames = Vec::new();
896        let mut datagram_receipt_ids = Vec::new();
897        let mut datagram_size = RAKNET_DATAGRAM_HEADER_SIZE;
898        let mut has_reliable = false;
899        let mut has_split = false;
900
901        loop {
902            let allow_reliable = has_reliable || self.can_emit_new_reliable_datagram();
903            let Some(queued) = self.pop_next_frame_for_datagram(
904                allow_reliable,
905                datagram_size,
906                *remaining_bytes_budget,
907            ) else {
908                break;
909            };
910
911            datagram_size += queued.encoded_size;
912            *remaining_bytes_budget = remaining_bytes_budget.saturating_sub(queued.encoded_size);
913            self.outgoing_queued_bytes = self
914                .outgoing_queued_bytes
915                .saturating_sub(queued.encoded_size);
916
917            has_reliable |= queued.is_reliable;
918            has_split |= queued.frame.header.is_split;
919            if let Some(receipt_id) = queued.receipt_id
920                && !datagram_receipt_ids.contains(&receipt_id)
921            {
922                datagram_receipt_ids.push(receipt_id);
923            }
924            frames.push(queued.frame);
925        }
926
927        if frames.is_empty() {
928            return None;
929        }
930
931        let flags = if !self.outgoing_heap.is_empty() || has_split {
932            DatagramFlags::VALID | DatagramFlags::CONTINUOUS_SEND
933        } else {
934            DatagramFlags::VALID | DatagramFlags::HAS_B_AND_AS
935        };
936
937        let datagram = Datagram {
938            header: DatagramHeader {
939                flags,
940                sequence: self.next_datagram_sequence(),
941            },
942            payload: DatagramPayload::Frames(frames),
943        };
944
945        if has_reliable || !datagram_receipt_ids.is_empty() {
946            self.track_sent_reliable_datagram(datagram.clone(), now, datagram_receipt_ids);
947        }
948
949        Some(datagram)
950    }
951
952    pub fn on_tick(
953        &mut self,
954        now: Instant,
955        max_new_datagrams: usize,
956        max_new_bytes: usize,
957        max_resend_datagrams: usize,
958        max_resend_bytes: usize,
959    ) -> Vec<Datagram> {
960        let mut out = Vec::new();
961        self.refresh_pacing_budget(now);
962
963        match self.ack_nack_priority {
964            AckNackPriority::NackFirst => {
965                self.flush_nack_if_due(now, &mut out);
966                self.flush_ack_if_due(now, &mut out);
967            }
968            AckNackPriority::AckFirst => {
969                self.flush_ack_if_due(now, &mut out);
970                self.flush_nack_if_due(now, &mut out);
971            }
972        }
973
974        let pacing_budget = if self.pacing_enabled {
975            self.pacing_budget_bytes.floor() as usize
976        } else {
977            usize::MAX
978        };
979        let resend_bytes_budget = max_resend_bytes.min(pacing_budget);
980        let resend_datagrams =
981            self.collect_resendable(now, max_resend_datagrams, resend_bytes_budget);
982        let resend_bytes_used = resend_datagrams
983            .iter()
984            .map(Datagram::encoded_size)
985            .sum::<usize>();
986        self.consume_pacing_budget(resend_bytes_used);
987        out.extend(resend_datagrams);
988
989        let available_pacing_for_new = if self.pacing_enabled {
990            self.pacing_budget_bytes.floor() as usize
991        } else {
992            usize::MAX
993        };
994        let mut remaining_new_bytes = max_new_bytes.min(available_pacing_for_new);
995        let budget_too_small_for_frame = self
996            .min_queued_frame_size()
997            .is_some_and(|min_frame| remaining_new_bytes < min_frame);
998        let allow_immediate_bypass = self.pacing_enabled
999            && (remaining_new_bytes == 0 || budget_too_small_for_frame)
1000            && max_new_bytes > 0
1001            && self.has_immediate_outgoing_frame();
1002        if allow_immediate_bypass {
1003            remaining_new_bytes = self.mtu.min(max_new_bytes).max(1);
1004        }
1005
1006        let mut new_bytes_used = 0usize;
1007        let mut new_datagram_count = 0usize;
1008        while new_datagram_count < max_new_datagrams {
1009            let Some(datagram) = self.build_data_datagram(now, &mut remaining_new_bytes) else {
1010                break;
1011            };
1012            new_bytes_used = new_bytes_used.saturating_add(datagram.encoded_size());
1013            out.push(datagram);
1014            new_datagram_count += 1;
1015            if remaining_new_bytes == 0 {
1016                break;
1017            }
1018        }
1019        self.consume_pacing_budget(new_bytes_used);
1020
1021        self.prune_split_state(now);
1022        out
1023    }
1024
1025    fn flush_ack_if_due(&mut self, now: Instant, out: &mut Vec<Datagram>) {
1026        if self.outgoing_acks.is_empty() || now < self.next_ack_flush_at {
1027            return;
1028        }
1029        if let Some(ack) = self.drain_ack_datagram(now) {
1030            out.push(ack);
1031        }
1032    }
1033
1034    fn flush_nack_if_due(&mut self, now: Instant, out: &mut Vec<Datagram>) {
1035        if self.outgoing_nacks.is_empty() || now < self.next_nack_flush_at {
1036            return;
1037        }
1038        if let Some(nack) = self.drain_nack_datagram(now) {
1039            out.push(nack);
1040        }
1041    }
1042
1043    pub fn prune_split_state(&mut self, now: Instant) -> usize {
1044        let dropped = self.split_assembler.prune(now);
1045        self.metrics.split_ttl_drops = self.metrics.split_ttl_drops.saturating_add(dropped as u64);
1046        dropped
1047    }
1048
1049    fn enqueue_single_frame(
1050        &mut self,
1051        payload: Bytes,
1052        reliability: Reliability,
1053        channel: u8,
1054        priority: RakPriority,
1055        receipt_id: Option<u64>,
1056    ) -> usize {
1057        let ordering_index = self.next_ordering_index_if_needed(reliability, channel);
1058        let sequence_index = self.next_sequence_index_if_needed(reliability, channel);
1059        let reliable_index = if reliability.is_reliable() {
1060            Some(self.next_reliable_index())
1061        } else {
1062            None
1063        };
1064
1065        let frame = Frame {
1066            header: FrameHeader::new(reliability, false, false),
1067            bit_length: (payload.len() as u16) << 3,
1068            reliable_index,
1069            sequence_index,
1070            ordering_index,
1071            ordering_channel: ordering_index.map(|_| channel),
1072            split: None,
1073            payload,
1074        };
1075
1076        let size = frame.encoded_size();
1077        self.push_outgoing_frame(frame, priority, receipt_id);
1078        if reliability.is_reliable() { size } else { 0 }
1079    }
1080
1081    fn enqueue_split_frames(
1082        &mut self,
1083        mut payload: Bytes,
1084        reliability: Reliability,
1085        channel: u8,
1086        priority: RakPriority,
1087        receipt_id: Option<u64>,
1088    ) -> usize {
1089        let reliability = Self::normalize_reliability_for_split(reliability);
1090        let max_split_payload = self.max_payload_for(reliability, true).max(1);
1091        let part_count = payload.len().div_ceil(max_split_payload);
1092        let split_id = self.split_write_index;
1093        self.split_write_index = self.split_write_index.wrapping_add(1);
1094        let ordering_index = self.next_ordering_index_if_needed(reliability, channel);
1095        let sequence_index = self.next_sequence_index_if_needed(reliability, channel);
1096
1097        let mut reliable_bytes = 0usize;
1098
1099        for idx in 0..part_count {
1100            let take = payload.len().min(max_split_payload);
1101            let part = payload.split_to(take);
1102            let reliable_index = if reliability.is_reliable() {
1103                Some(self.next_reliable_index())
1104            } else {
1105                None
1106            };
1107
1108            let frame = Frame {
1109                header: FrameHeader::new(reliability, true, false),
1110                bit_length: (part.len() as u16) << 3,
1111                reliable_index,
1112                sequence_index,
1113                ordering_index,
1114                ordering_channel: ordering_index.map(|_| channel),
1115                split: Some(crate::protocol::frame::SplitInfo {
1116                    part_count: part_count as u32,
1117                    part_id: split_id,
1118                    part_index: idx as u32,
1119                }),
1120                payload: part,
1121            };
1122
1123            let size = frame.encoded_size();
1124            self.push_outgoing_frame(frame, priority, receipt_id);
1125            if reliability.is_reliable() {
1126                reliable_bytes += size;
1127            }
1128        }
1129
1130        reliable_bytes
1131    }
1132
1133    fn max_payload_for(&self, reliability: Reliability, is_split: bool) -> usize {
1134        let frame_overhead = self.frame_overhead(reliability, is_split);
1135        self.mtu
1136            .saturating_sub(RAKNET_DATAGRAM_HEADER_SIZE + frame_overhead)
1137            .max(1)
1138    }
1139
1140    fn frame_overhead(&self, reliability: Reliability, is_split: bool) -> usize {
1141        let mut size = 3usize;
1142        if reliability.is_reliable() {
1143            size += 3;
1144        }
1145        if reliability.is_sequenced() {
1146            size += 3;
1147        }
1148        if reliability.is_ordered() || reliability.is_sequenced() {
1149            size += 4;
1150        }
1151        if is_split {
1152            size += 10;
1153        }
1154        size
1155    }
1156
1157    fn next_reliable_index(&mut self) -> Sequence24 {
1158        let idx = self.reliable_write_index;
1159        self.reliable_write_index = self.reliable_write_index.next();
1160        idx
1161    }
1162
1163    fn next_ordering_index_if_needed(
1164        &mut self,
1165        reliability: Reliability,
1166        channel: u8,
1167    ) -> Option<Sequence24> {
1168        if !(reliability.is_ordered() || reliability.is_sequenced()) {
1169            return None;
1170        }
1171
1172        let ch = channel as usize;
1173        if ch >= self.ordering_write_index.len() {
1174            self.ordering_write_index.resize(ch + 1, Sequence24::new(0));
1175        }
1176
1177        let idx = self.ordering_write_index[ch];
1178        self.ordering_write_index[ch] = idx.next();
1179        Some(idx)
1180    }
1181
1182    fn next_sequence_index_if_needed(
1183        &mut self,
1184        reliability: Reliability,
1185        channel: u8,
1186    ) -> Option<Sequence24> {
1187        if !reliability.is_sequenced() {
1188            return None;
1189        }
1190
1191        let ch = channel as usize;
1192        if ch >= self.sequencing_write_index.len() {
1193            self.sequencing_write_index
1194                .resize(ch + 1, Sequence24::new(0));
1195        }
1196
1197        let idx = self.sequencing_write_index[ch];
1198        self.sequencing_write_index[ch] = idx.next();
1199        Some(idx)
1200    }
1201
1202    fn push_outgoing_frame(
1203        &mut self,
1204        frame: Frame,
1205        priority: RakPriority,
1206        receipt_id: Option<u64>,
1207    ) {
1208        let weight = self.next_weight(priority);
1209        let encoded_size = frame.encoded_size();
1210        let is_reliable = frame.header.reliability.is_reliable();
1211        self.outgoing_queued_bytes = self.outgoing_queued_bytes.saturating_add(encoded_size);
1212        self.outgoing_heap.push(QueuedFrame {
1213            weight,
1214            encoded_size,
1215            is_reliable,
1216            priority,
1217            receipt_id,
1218            frame,
1219        });
1220    }
1221
1222    fn next_weight(&mut self, priority: RakPriority) -> u64 {
1223        let level = priority.as_index();
1224        let mut next = self.outgoing_next_weights[level];
1225
1226        if !self.outgoing_heap.is_empty() {
1227            if next >= self.last_min_weight {
1228                next = self.last_min_weight + ((1u64 << level) * level as u64) + level as u64;
1229                self.outgoing_next_weights[level] =
1230                    next + ((1u64 << level) * (level as u64 + 1)) + level as u64;
1231            }
1232        } else {
1233            for p in 0..4 {
1234                self.outgoing_next_weights[p] = ((1u64 << p) * p as u64) + p as u64;
1235            }
1236        }
1237
1238        self.last_min_weight = next - ((1u64 << level) * level as u64) + level as u64;
1239        next
1240    }
1241
1242    fn normalize_reliability_for_split(reliability: Reliability) -> Reliability {
1243        match reliability {
1244            Reliability::Unreliable => Reliability::Reliable,
1245            Reliability::UnreliableSequenced => Reliability::ReliableSequenced,
1246            Reliability::UnreliableWithAckReceipt => Reliability::ReliableWithAckReceipt,
1247            v => v,
1248        }
1249    }
1250
1251    fn estimate_queue_impact(
1252        &self,
1253        payload_len: usize,
1254        reliability: Reliability,
1255    ) -> (usize, usize, Reliability) {
1256        let max_single = self.max_payload_for(reliability, false);
1257        if payload_len <= max_single {
1258            let bytes = self
1259                .frame_overhead(reliability, false)
1260                .saturating_add(payload_len);
1261            return (1, bytes, reliability);
1262        }
1263
1264        let effective = Self::normalize_reliability_for_split(reliability);
1265        let max_split_payload = self.max_payload_for(effective, true).max(1);
1266        let part_count = payload_len.div_ceil(max_split_payload);
1267        let bytes = payload_len
1268            .saturating_add(part_count.saturating_mul(self.frame_overhead(effective, true)));
1269
1270        (part_count, bytes, effective)
1271    }
1272
1273    fn evaluate_backpressure(
1274        &self,
1275        added_frames: usize,
1276        added_bytes: usize,
1277        reliability: Reliability,
1278        priority: RakPriority,
1279    ) -> BackpressureAction {
1280        let projected_frames = self.pending_outgoing_frames().saturating_add(added_frames);
1281        let projected_bytes = self.pending_outgoing_bytes().saturating_add(added_bytes);
1282
1283        let hard_frames = self.outgoing_queue_max_frames.max(1);
1284        let hard_bytes = self.outgoing_queue_max_bytes.max(1);
1285
1286        let soft_frames = ((hard_frames as f64) * self.outgoing_queue_soft_ratio)
1287            .floor()
1288            .max(1.0) as usize;
1289        let soft_bytes = ((hard_bytes as f64) * self.outgoing_queue_soft_ratio)
1290            .floor()
1291            .max(1.0) as usize;
1292
1293        let exceeds_hard = projected_frames > hard_frames || projected_bytes > hard_bytes;
1294        let exceeds_soft = projected_frames > soft_frames || projected_bytes > soft_bytes;
1295        let reliable = reliability.is_reliable();
1296
1297        if exceeds_hard {
1298            return match self.backpressure_mode {
1299                BackpressureMode::Delay => BackpressureAction::Defer,
1300                BackpressureMode::Shed => {
1301                    if !reliable || matches!(priority, RakPriority::Normal | RakPriority::Low) {
1302                        BackpressureAction::Drop
1303                    } else {
1304                        BackpressureAction::Defer
1305                    }
1306                }
1307                BackpressureMode::Disconnect => BackpressureAction::Disconnect,
1308            };
1309        }
1310
1311        if exceeds_soft {
1312            return BackpressureAction::Defer;
1313        }
1314
1315        BackpressureAction::Allow
1316    }
1317
1318    fn best_effort_zeroize_buffered_payloads(&mut self) {
1319        for queued in self.outgoing_heap.drain() {
1320            let _ = best_effort_zeroize_bytes(queued.frame.payload);
1321        }
1322
1323        for tracked in self.sent_datagrams.values_mut() {
1324            if let DatagramPayload::Frames(frames) = &mut tracked.datagram.payload {
1325                for frame in frames {
1326                    let payload = std::mem::take(&mut frame.payload);
1327                    let _ = best_effort_zeroize_bytes(payload);
1328                }
1329            }
1330        }
1331
1332        for frame in self.ordering.drain_pending_ordered_frames() {
1333            let _ = best_effort_zeroize_bytes(frame.payload);
1334        }
1335
1336        for part in self.split_assembler.drain_buffered_parts() {
1337            let _ = best_effort_zeroize_bytes(part);
1338        }
1339    }
1340
1341    fn can_emit_new_reliable_datagram(&self) -> bool {
1342        let in_flight = self.sent_datagrams.len() as f64;
1343        in_flight < self.congestion_window_packets.max(1.0).floor()
1344    }
1345
1346    fn refresh_pacing_budget(&mut self, now: Instant) {
1347        if !self.pacing_enabled {
1348            return;
1349        }
1350
1351        let elapsed = now
1352            .saturating_duration_since(self.last_pacing_update)
1353            .as_secs_f64();
1354        self.last_pacing_update = now;
1355
1356        let rate = self.compute_pacing_rate_bytes_per_sec();
1357        self.pacing_rate_bytes_per_sec = rate;
1358
1359        if elapsed > 0.0 {
1360            self.pacing_budget_bytes = (self.pacing_budget_bytes + elapsed * rate)
1361                .clamp(0.0, self.pacing_budget_max_bytes);
1362        }
1363    }
1364
1365    fn compute_pacing_rate_bytes_per_sec(&self) -> f64 {
1366        if !self.pacing_enabled {
1367            return f64::INFINITY;
1368        }
1369
1370        let reference_rtt_ms = self
1371            .srtt_ms
1372            .unwrap_or_else(|| (self.resend_rto.as_secs_f64() * 1000.0).max(1.0));
1373        let rtt_secs = (reference_rtt_ms.max(1.0)) / 1000.0;
1374        let cwnd_bytes = self.congestion_window_packets.max(1.0) * self.mtu as f64;
1375        let raw_rate = (cwnd_bytes / rtt_secs) * self.pacing_gain;
1376        raw_rate.clamp(
1377            self.pacing_min_rate_bytes_per_sec,
1378            self.pacing_max_rate_bytes_per_sec,
1379        )
1380    }
1381
1382    fn consume_pacing_budget(&mut self, bytes: usize) {
1383        if !self.pacing_enabled {
1384            return;
1385        }
1386        self.pacing_budget_bytes = (self.pacing_budget_bytes - bytes as f64).max(0.0);
1387    }
1388
1389    fn has_immediate_outgoing_frame(&self) -> bool {
1390        self.outgoing_heap
1391            .iter()
1392            .any(|entry| entry.priority == RakPriority::Immediate)
1393    }
1394
1395    fn min_queued_frame_size(&self) -> Option<usize> {
1396        self.outgoing_heap
1397            .iter()
1398            .map(|entry| entry.encoded_size)
1399            .min()
1400    }
1401
1402    fn pop_next_frame_for_datagram(
1403        &mut self,
1404        allow_reliable: bool,
1405        datagram_size: usize,
1406        remaining_bytes_budget: usize,
1407    ) -> Option<QueuedFrame> {
1408        let mut deferred = Vec::new();
1409        let mut selected = None;
1410
1411        while let Some(candidate) = self.outgoing_heap.pop() {
1412            let fits_mtu = datagram_size.saturating_add(candidate.encoded_size) <= self.mtu;
1413            let fits_budget = candidate.encoded_size <= remaining_bytes_budget;
1414            let reliability_ok = allow_reliable || !candidate.is_reliable;
1415
1416            if fits_mtu && fits_budget && reliability_ok {
1417                selected = Some(candidate);
1418                break;
1419            }
1420
1421            deferred.push(candidate);
1422        }
1423
1424        for item in deferred {
1425            self.outgoing_heap.push(item);
1426        }
1427
1428        selected
1429    }
1430
1431    fn observe_rtt_sample(&mut self, sample: Duration) {
1432        let sample_ms = (sample.as_secs_f64() * 1000.0).max(1.0);
1433        match self.srtt_ms {
1434            None => {
1435                self.srtt_ms = Some(sample_ms);
1436                self.rttvar_ms = sample_ms / 2.0;
1437            }
1438            Some(srtt) => {
1439                let alpha = 0.125;
1440                let beta = 0.25;
1441                let variation = (srtt - sample_ms).abs();
1442                self.rttvar_ms = (1.0 - beta) * self.rttvar_ms + beta * variation;
1443                let next_srtt = (1.0 - alpha) * srtt + alpha * sample_ms;
1444                self.srtt_ms = Some(next_srtt);
1445            }
1446        }
1447
1448        self.recompute_rto_from_rtt();
1449    }
1450
1451    fn recompute_rto_from_rtt(&mut self) {
1452        let Some(srtt_ms) = self.srtt_ms else {
1453            return;
1454        };
1455
1456        let rto_ms = srtt_ms + (4.0 * self.rttvar_ms).max(10.0);
1457        let clamped = rto_ms.clamp(
1458            self.min_resend_rto.as_secs_f64() * 1000.0,
1459            self.max_resend_rto.as_secs_f64() * 1000.0,
1460        );
1461        self.resend_rto = Duration::from_secs_f64(clamped / 1000.0);
1462    }
1463
1464    fn on_reliable_ack(&mut self) {
1465        let mut additive = if self.congestion_window_packets < self.slow_start_threshold_packets {
1466            self.congestion_additive_gain.max(1.0)
1467        } else {
1468            self.congestion_additive_gain / self.congestion_window_packets.max(1.0)
1469        };
1470
1471        if let Some(srtt_ms) = self.srtt_ms
1472            && srtt_ms >= self.high_rtt_threshold_ms
1473        {
1474            additive *= self.high_rtt_additive_scale;
1475        }
1476
1477        additive = additive.max(0.001);
1478        self.congestion_window_packets = (self.congestion_window_packets + additive).clamp(
1479            self.min_congestion_window_packets,
1480            self.max_congestion_window_packets,
1481        );
1482    }
1483
1484    fn on_timeout(&mut self, now: Instant) {
1485        self.apply_loss_backoff(self.congestion_multiplicative_decrease_timeout);
1486        let backed_off = self.resend_rto.saturating_mul(2);
1487        self.resend_rto = backed_off.min(self.max_resend_rto);
1488        self.next_nack_loss_backoff = now + self.nack_loss_backoff_cooldown;
1489    }
1490
1491    fn on_nack_loss(&mut self, now: Instant) {
1492        if now < self.next_nack_loss_backoff {
1493            return;
1494        }
1495
1496        self.apply_loss_backoff(self.congestion_multiplicative_decrease_nack);
1497        self.next_nack_loss_backoff = now + self.nack_loss_backoff_cooldown;
1498    }
1499
1500    fn apply_loss_backoff(&mut self, factor: f64) {
1501        let factor = factor.clamp(0.1, 0.99);
1502        let reduced = (self.congestion_window_packets * factor).clamp(
1503            self.min_congestion_window_packets,
1504            self.max_congestion_window_packets,
1505        );
1506        self.slow_start_threshold_packets = reduced;
1507        self.congestion_window_packets = reduced;
1508    }
1509
1510    fn for_each_sequence<F>(range: SequenceRange, mut f: F)
1511    where
1512        F: FnMut(Sequence24),
1513    {
1514        let mut seq = range.start;
1515        loop {
1516            f(seq);
1517            if seq == range.end {
1518                break;
1519            }
1520            seq = seq.next();
1521        }
1522    }
1523}
1524
1525impl Drop for Session {
1526    fn drop(&mut self) {
1527        if self.best_effort_zeroize_dropped_payloads {
1528            self.best_effort_zeroize_buffered_payloads();
1529        }
1530    }
1531}
1532
1533fn best_effort_zeroize_bytes(payload: Bytes) -> bool {
1534    match payload.try_into_mut() {
1535        Ok(mut writable) => {
1536            writable.as_mut().zeroize();
1537            true
1538        }
1539        Err(_) => false,
1540    }
1541}
1542
1543#[cfg(test)]
1544mod tests {
1545    use std::time::{Duration, Instant};
1546
1547    use bytes::Bytes;
1548
1549    use super::{QueuePayloadResult, RakPriority, Session, SessionState};
1550    use crate::protocol::ack::{AckNackPayload, SequenceRange};
1551    use crate::protocol::datagram::DatagramPayload;
1552    use crate::protocol::frame::{Frame, SplitInfo};
1553    use crate::protocol::frame_header::FrameHeader;
1554    use crate::protocol::reliability::Reliability;
1555    use crate::protocol::sequence24::Sequence24;
1556    use crate::session::tunables::{
1557        AckNackFlushProfile, AckNackPriority, BackpressureMode, CongestionProfile, SessionTunables,
1558    };
1559
1560    fn transition_to_connected(session: &mut Session) {
1561        assert!(session.transition_to(SessionState::Req1Recv));
1562        assert!(session.transition_to(SessionState::Reply1Sent));
1563        assert!(session.transition_to(SessionState::Req2Recv));
1564        assert!(session.transition_to(SessionState::Reply2Sent));
1565        assert!(session.transition_to(SessionState::ConnReqRecv));
1566        assert!(session.transition_to(SessionState::ConnReqAcceptedSent));
1567        assert!(session.transition_to(SessionState::NewIncomingRecv));
1568        assert!(session.transition_to(SessionState::Connected));
1569    }
1570
1571    #[test]
1572    fn idle_tracking_updates_when_activity_touched() {
1573        let mut session = Session::new(1400);
1574        let now = Instant::now();
1575        let old = now
1576            .checked_sub(Duration::from_secs(8))
1577            .expect("instant subtraction must succeed");
1578        session.touch_activity(old);
1579        assert!(session.idle_for(now) >= Duration::from_secs(8));
1580
1581        session.touch_activity(now);
1582        assert!(session.idle_for(now) <= Duration::from_millis(1));
1583    }
1584
1585    #[test]
1586    fn keepalive_is_emitted_only_for_connected_idle_sessions() {
1587        let mut session = Session::new(1400);
1588        let now = Instant::now() + Duration::from_secs(6);
1589        let interval = Duration::from_secs(5);
1590
1591        let old = now
1592            .checked_sub(Duration::from_secs(6))
1593            .expect("instant subtraction must succeed");
1594        session.touch_activity(old);
1595        assert!(!session.should_send_keepalive(now, interval));
1596
1597        transition_to_connected(&mut session);
1598        assert!(session.should_send_keepalive(now, interval));
1599
1600        session.mark_keepalive_sent(now);
1601        assert!(!session.should_send_keepalive(now, interval));
1602    }
1603
1604    #[test]
1605    fn soft_backpressure_delays_unreliable_payloads() {
1606        let tunables = SessionTunables {
1607            outgoing_queue_max_frames: 4,
1608            outgoing_queue_max_bytes: 8 * 1024,
1609            outgoing_queue_soft_ratio: 0.5,
1610            ..SessionTunables::default()
1611        };
1612
1613        let mut session = Session::with_tunables(1400, tunables);
1614
1615        assert!(matches!(
1616            session.queue_payload(
1617                Bytes::from_static(b"a"),
1618                Reliability::Reliable,
1619                0,
1620                RakPriority::High
1621            ),
1622            QueuePayloadResult::Enqueued { .. }
1623        ));
1624        assert!(matches!(
1625            session.queue_payload(
1626                Bytes::from_static(b"b"),
1627                Reliability::Reliable,
1628                0,
1629                RakPriority::High
1630            ),
1631            QueuePayloadResult::Enqueued { .. }
1632        ));
1633
1634        assert!(matches!(
1635            session.queue_payload(
1636                Bytes::from_static(b"u"),
1637                Reliability::Unreliable,
1638                0,
1639                RakPriority::Low
1640            ),
1641            QueuePayloadResult::Deferred
1642        ));
1643
1644        let snapshot = session.metrics_snapshot();
1645        assert_eq!(snapshot.outgoing_queue_defers, 1);
1646        assert_eq!(snapshot.backpressure_delays, 1);
1647    }
1648
1649    #[test]
1650    fn hard_backpressure_disconnects_in_disconnect_mode() {
1651        let tunables = SessionTunables {
1652            outgoing_queue_max_frames: 1,
1653            outgoing_queue_max_bytes: 8 * 1024,
1654            outgoing_queue_soft_ratio: 0.5,
1655            backpressure_mode: BackpressureMode::Disconnect,
1656            ..SessionTunables::default()
1657        };
1658
1659        let mut session = Session::with_tunables(1400, tunables);
1660
1661        let _ = session.queue_payload(
1662            Bytes::from_static(b"a"),
1663            Reliability::Reliable,
1664            0,
1665            RakPriority::High,
1666        );
1667
1668        assert!(matches!(
1669            session.queue_payload(
1670                Bytes::from_static(b"b"),
1671                Reliability::Reliable,
1672                0,
1673                RakPriority::Immediate
1674            ),
1675            QueuePayloadResult::DisconnectRequested
1676        ));
1677        assert!(session.take_backpressure_disconnect());
1678        let snapshot = session.metrics_snapshot();
1679        assert_eq!(snapshot.backpressure_disconnects, 1);
1680    }
1681
1682    #[test]
1683    fn hard_backpressure_sheds_low_priority_in_shed_mode() {
1684        let tunables = SessionTunables {
1685            outgoing_queue_max_frames: 1,
1686            outgoing_queue_max_bytes: 8 * 1024,
1687            outgoing_queue_soft_ratio: 0.5,
1688            backpressure_mode: BackpressureMode::Shed,
1689            ..SessionTunables::default()
1690        };
1691        let mut session = Session::with_tunables(1400, tunables);
1692
1693        let _ = session.queue_payload(
1694            Bytes::from_static(b"a"),
1695            Reliability::Reliable,
1696            0,
1697            RakPriority::High,
1698        );
1699
1700        assert!(matches!(
1701            session.queue_payload(
1702                Bytes::from_static(b"b"),
1703                Reliability::Unreliable,
1704                0,
1705                RakPriority::Low
1706            ),
1707            QueuePayloadResult::Dropped
1708        ));
1709        assert!(!session.take_backpressure_disconnect());
1710        let snapshot = session.metrics_snapshot();
1711        assert_eq!(snapshot.backpressure_drops, 1);
1712    }
1713
1714    #[test]
1715    fn hard_backpressure_shed_mode_defers_high_priority_reliable() {
1716        let tunables = SessionTunables {
1717            outgoing_queue_max_frames: 1,
1718            outgoing_queue_max_bytes: 8 * 1024,
1719            outgoing_queue_soft_ratio: 0.5,
1720            backpressure_mode: BackpressureMode::Shed,
1721            ..SessionTunables::default()
1722        };
1723        let mut session = Session::with_tunables(1400, tunables);
1724
1725        let _ = session.queue_payload(
1726            Bytes::from_static(b"a"),
1727            Reliability::Reliable,
1728            0,
1729            RakPriority::High,
1730        );
1731
1732        assert!(matches!(
1733            session.queue_payload(
1734                Bytes::from_static(b"b"),
1735                Reliability::Reliable,
1736                0,
1737                RakPriority::Immediate
1738            ),
1739            QueuePayloadResult::Deferred
1740        ));
1741        assert!(!session.take_backpressure_disconnect());
1742        let snapshot = session.metrics_snapshot();
1743        assert_eq!(snapshot.backpressure_delays, 1);
1744        assert_eq!(snapshot.backpressure_drops, 0);
1745    }
1746
1747    #[test]
1748    fn hard_backpressure_delays_in_delay_mode() {
1749        let tunables = SessionTunables {
1750            outgoing_queue_max_frames: 1,
1751            outgoing_queue_max_bytes: 8 * 1024,
1752            outgoing_queue_soft_ratio: 0.5,
1753            backpressure_mode: BackpressureMode::Delay,
1754            ..SessionTunables::default()
1755        };
1756        let mut session = Session::with_tunables(1400, tunables);
1757
1758        let _ = session.queue_payload(
1759            Bytes::from_static(b"a"),
1760            Reliability::Reliable,
1761            0,
1762            RakPriority::High,
1763        );
1764
1765        assert!(matches!(
1766            session.queue_payload(
1767                Bytes::from_static(b"b"),
1768                Reliability::Reliable,
1769                0,
1770                RakPriority::Low
1771            ),
1772            QueuePayloadResult::Deferred
1773        ));
1774        assert!(!session.take_backpressure_disconnect());
1775        let snapshot = session.metrics_snapshot();
1776        assert_eq!(snapshot.backpressure_delays, 1);
1777    }
1778
1779    #[test]
1780    fn ack_updates_rtt_and_timeout_reduces_congestion() {
1781        let mut session = Session::new(1400);
1782        let now = Instant::now();
1783
1784        assert!(matches!(
1785            session.queue_payload(
1786                Bytes::from_static(b"payload"),
1787                Reliability::ReliableOrdered,
1788                0,
1789                RakPriority::High
1790            ),
1791            QueuePayloadResult::Enqueued { .. }
1792        ));
1793
1794        let sent = session
1795            .on_tick(now, 1, 1400, 0, 0)
1796            .into_iter()
1797            .next()
1798            .expect("data datagram must be produced");
1799        let seq = sent.header.sequence;
1800
1801        session.handle_ack_payload(AckNackPayload {
1802            ranges: vec![SequenceRange {
1803                start: seq,
1804                end: seq,
1805            }],
1806        });
1807        let _ = session.process_incoming_receipts(now + Duration::from_millis(120));
1808
1809        let after_ack = session.metrics_snapshot();
1810        assert!(after_ack.srtt_ms > 0.0);
1811        assert!(after_ack.resend_rto_ms >= 80.0);
1812
1813        let before_timeout_cwnd = after_ack.congestion_window_packets;
1814
1815        assert!(matches!(
1816            session.queue_payload(
1817                Bytes::from_static(b"resend"),
1818                Reliability::ReliableOrdered,
1819                0,
1820                RakPriority::High
1821            ),
1822            QueuePayloadResult::Enqueued { .. }
1823        ));
1824        let _ = session.on_tick(now + Duration::from_millis(125), 1, 1400, 0, 0);
1825
1826        let _ = session.collect_resendable(now + Duration::from_millis(1000), 8, usize::MAX);
1827        let after_timeout = session.metrics_snapshot();
1828        assert!(after_timeout.congestion_window_packets < before_timeout_cwnd);
1829    }
1830
1831    #[test]
1832    fn nack_backoff_cooldown_prevents_repeated_window_cuts() {
1833        let tunables = SessionTunables {
1834            congestion_profile: CongestionProfile::Custom,
1835            initial_congestion_window: 64.0,
1836            min_congestion_window: 8.0,
1837            max_congestion_window: 512.0,
1838            congestion_multiplicative_decrease_nack: 0.8,
1839            congestion_nack_backoff_cooldown: Duration::from_millis(200),
1840            ..SessionTunables::default()
1841        };
1842        let mut session = Session::with_tunables(200, tunables);
1843        let now = Instant::now();
1844
1845        assert!(matches!(
1846            session.queue_payload(
1847                Bytes::from(vec![0xA1; 150]),
1848                Reliability::ReliableOrdered,
1849                0,
1850                RakPriority::High
1851            ),
1852            QueuePayloadResult::Enqueued { .. }
1853        ));
1854        assert!(matches!(
1855            session.queue_payload(
1856                Bytes::from(vec![0xA2; 150]),
1857                Reliability::ReliableOrdered,
1858                0,
1859                RakPriority::High
1860            ),
1861            QueuePayloadResult::Enqueued { .. }
1862        ));
1863
1864        let sent = session.on_tick(now, 2, 64 * 1024, 0, 0);
1865        assert_eq!(
1866            sent.len(),
1867            2,
1868            "mtu=200 should emit one payload per datagram"
1869        );
1870        let seq_a = sent[0].header.sequence;
1871        let seq_b = sent[1].header.sequence;
1872
1873        let before = session.metrics_snapshot().congestion_window_packets;
1874
1875        session.handle_nack_payload(AckNackPayload {
1876            ranges: vec![SequenceRange {
1877                start: seq_a,
1878                end: seq_a,
1879            }],
1880        });
1881        let first = session.process_incoming_receipts(now + Duration::from_millis(1));
1882        assert_eq!(first.nacked, 1);
1883        let after_first = session.metrics_snapshot().congestion_window_packets;
1884        assert!(
1885            after_first < before,
1886            "first nack must reduce congestion window"
1887        );
1888
1889        session.handle_nack_payload(AckNackPayload {
1890            ranges: vec![SequenceRange {
1891                start: seq_b,
1892                end: seq_b,
1893            }],
1894        });
1895        let second = session.process_incoming_receipts(now + Duration::from_millis(10));
1896        assert_eq!(second.nacked, 1);
1897        let after_second = session.metrics_snapshot().congestion_window_packets;
1898        assert!(
1899            (after_second - after_first).abs() < 0.0001,
1900            "second nack within cooldown must not cut cwnd again"
1901        );
1902    }
1903
1904    #[test]
1905    fn timeout_backoff_is_stronger_than_nack_backoff() {
1906        let tunables = SessionTunables {
1907            congestion_profile: CongestionProfile::Custom,
1908            initial_congestion_window: 100.0,
1909            min_congestion_window: 8.0,
1910            max_congestion_window: 512.0,
1911            congestion_multiplicative_decrease_nack: 0.9,
1912            congestion_multiplicative_decrease_timeout: 0.5,
1913            ..SessionTunables::default()
1914        };
1915        let now = Instant::now();
1916
1917        let mut nack_session = Session::with_tunables(200, tunables.clone());
1918        let _ = nack_session.queue_payload(
1919            Bytes::from(vec![0xB1; 150]),
1920            Reliability::ReliableOrdered,
1921            0,
1922            RakPriority::High,
1923        );
1924        let sent_nack = nack_session.on_tick(now, 1, 64 * 1024, 0, 0);
1925        let seq_nack = sent_nack[0].header.sequence;
1926        nack_session.handle_nack_payload(AckNackPayload {
1927            ranges: vec![SequenceRange {
1928                start: seq_nack,
1929                end: seq_nack,
1930            }],
1931        });
1932        let _ = nack_session.process_incoming_receipts(now + Duration::from_millis(1));
1933        let cwnd_after_nack = nack_session.metrics_snapshot().congestion_window_packets;
1934
1935        let mut timeout_session = Session::with_tunables(200, tunables);
1936        let _ = timeout_session.queue_payload(
1937            Bytes::from(vec![0xC1; 150]),
1938            Reliability::ReliableOrdered,
1939            0,
1940            RakPriority::High,
1941        );
1942        let _ = timeout_session.on_tick(now, 1, 64 * 1024, 0, 0);
1943        let _ = timeout_session.collect_resendable(now + Duration::from_secs(2), 8, usize::MAX);
1944        let cwnd_after_timeout = timeout_session.metrics_snapshot().congestion_window_packets;
1945
1946        assert!(
1947            cwnd_after_timeout < cwnd_after_nack,
1948            "timeout loss must reduce cwnd more aggressively than nack loss"
1949        );
1950    }
1951
1952    #[test]
1953    fn pacing_budget_throttles_non_immediate_send_until_budget_refills() {
1954        let tunables = SessionTunables {
1955            pacing_enabled: true,
1956            pacing_start_full: false,
1957            pacing_min_rate_bytes_per_sec: 1024.0,
1958            pacing_max_rate_bytes_per_sec: 1024.0,
1959            pacing_max_burst_bytes: 1024,
1960            ..SessionTunables::default()
1961        };
1962        let mut session = Session::with_tunables(200, tunables);
1963        let now = Instant::now();
1964
1965        assert!(matches!(
1966            session.queue_payload(
1967                Bytes::from(vec![0xD1; 150]),
1968                Reliability::ReliableOrdered,
1969                0,
1970                RakPriority::High
1971            ),
1972            QueuePayloadResult::Enqueued { .. }
1973        ));
1974
1975        let blocked = session.on_tick(now, 1, 64 * 1024, 0, 0);
1976        assert!(
1977            blocked.is_empty(),
1978            "with almost zero burst budget, first send must be paced"
1979        );
1980
1981        let resumed = session.on_tick(now + Duration::from_millis(250), 1, 64 * 1024, 0, 0);
1982        assert_eq!(
1983            resumed.len(),
1984            1,
1985            "after budget refill, datagram must be sent"
1986        );
1987        let snapshot = session.metrics_snapshot();
1988        assert!(
1989            snapshot.pacing_rate_bytes_per_sec >= 1000.0,
1990            "pacing rate should be active in snapshot"
1991        );
1992    }
1993
1994    #[test]
1995    fn immediate_priority_can_bypass_empty_pacing_budget() {
1996        let tunables = SessionTunables {
1997            pacing_enabled: true,
1998            pacing_start_full: false,
1999            pacing_min_rate_bytes_per_sec: 1.0,
2000            pacing_max_rate_bytes_per_sec: 1.0,
2001            pacing_max_burst_bytes: 1,
2002            ..SessionTunables::default()
2003        };
2004        let mut session = Session::with_tunables(200, tunables);
2005        let now = Instant::now();
2006
2007        assert!(matches!(
2008            session.queue_payload(
2009                Bytes::from(vec![0xD2; 150]),
2010                Reliability::ReliableOrdered,
2011                0,
2012                RakPriority::Immediate
2013            ),
2014            QueuePayloadResult::Enqueued { .. }
2015        ));
2016
2017        let sent = session.on_tick(now, 1, 64 * 1024, 0, 0);
2018        assert_eq!(
2019            sent.len(),
2020            1,
2021            "immediate packet should bypass drained pacing budget once"
2022        );
2023    }
2024
2025    #[test]
2026    fn ack_receipt_id_is_reported_once_after_all_datagrams_are_acked() {
2027        let mut session = Session::new(1400);
2028        let now = Instant::now();
2029
2030        assert!(matches!(
2031            session.queue_payload_with_receipt(
2032                Bytes::from(vec![0xAA; 6000]),
2033                Reliability::ReliableOrdered,
2034                0,
2035                RakPriority::High,
2036                Some(42)
2037            ),
2038            QueuePayloadResult::Enqueued { .. }
2039        ));
2040
2041        let sent = session.on_tick(now, 16, usize::MAX, 0, 0);
2042        let mut data_sequences = Vec::new();
2043        for datagram in &sent {
2044            if matches!(
2045                datagram.payload,
2046                crate::protocol::datagram::DatagramPayload::Frames(_)
2047            ) {
2048                data_sequences.push(datagram.header.sequence);
2049            }
2050        }
2051        assert!(
2052            data_sequences.len() > 1,
2053            "split payload should span multiple datagrams"
2054        );
2055
2056        session.handle_ack_payload(AckNackPayload {
2057            ranges: vec![SequenceRange {
2058                start: data_sequences[0],
2059                end: data_sequences[0],
2060            }],
2061        });
2062        let first_progress = session.process_incoming_receipts(now + Duration::from_millis(100));
2063        assert!(first_progress.acked_receipt_ids.is_empty());
2064
2065        for seq in data_sequences.iter().skip(1) {
2066            session.handle_ack_payload(AckNackPayload {
2067                ranges: vec![SequenceRange {
2068                    start: *seq,
2069                    end: *seq,
2070                }],
2071            });
2072        }
2073        let second_progress = session.process_incoming_receipts(now + Duration::from_millis(120));
2074        assert_eq!(second_progress.acked_receipt_ids, vec![42]);
2075    }
2076
2077    #[test]
2078    fn prune_split_state_increments_split_ttl_drop_metrics() {
2079        let tunables = SessionTunables {
2080            split_ttl: Duration::from_millis(20),
2081            max_split_parts: 8,
2082            max_concurrent_splits: 8,
2083            ..SessionTunables::default()
2084        };
2085        let mut session = Session::with_tunables(1400, tunables);
2086        let now = Instant::now();
2087
2088        let split_frame = Frame {
2089            header: FrameHeader::new(Reliability::ReliableOrdered, true, false),
2090            bit_length: 8,
2091            reliable_index: None,
2092            sequence_index: None,
2093            ordering_index: None,
2094            ordering_channel: None,
2095            split: Some(SplitInfo {
2096                part_count: 2,
2097                part_id: 77,
2098                part_index: 0,
2099            }),
2100            payload: Bytes::from_static(b"a"),
2101        };
2102
2103        assert!(matches!(
2104            session.split_assembler.add(split_frame, now),
2105            Ok(None)
2106        ));
2107        assert_eq!(
2108            session.prune_split_state(now + Duration::from_millis(10)),
2109            0
2110        );
2111        assert_eq!(session.metrics_snapshot().split_ttl_drops, 0);
2112
2113        assert_eq!(
2114            session.prune_split_state(now + Duration::from_millis(30)),
2115            1
2116        );
2117        assert_eq!(session.metrics_snapshot().split_ttl_drops, 1);
2118    }
2119
2120    #[test]
2121    fn nack_marks_reliable_datagram_for_immediate_resend() {
2122        let mut session = Session::new(1400);
2123        let now = Instant::now();
2124
2125        assert!(matches!(
2126            session.queue_payload(
2127                Bytes::from_static(b"resend-me"),
2128                Reliability::ReliableOrdered,
2129                0,
2130                RakPriority::High
2131            ),
2132            QueuePayloadResult::Enqueued { .. }
2133        ));
2134
2135        let sent = session
2136            .on_tick(now, 1, usize::MAX, 0, 0)
2137            .into_iter()
2138            .next()
2139            .expect("reliable datagram should be emitted");
2140        let seq = sent.header.sequence;
2141
2142        session.handle_nack_payload(AckNackPayload {
2143            ranges: vec![SequenceRange {
2144                start: seq,
2145                end: seq,
2146            }],
2147        });
2148        let progress = session.process_incoming_receipts(now + Duration::from_millis(1));
2149        assert_eq!(progress.nacked, 1);
2150
2151        let resent = session.collect_resendable(now + Duration::from_millis(1), 8, usize::MAX);
2152        assert_eq!(resent.len(), 1);
2153        assert_eq!(resent[0].header.sequence, seq);
2154    }
2155
2156    #[test]
2157    fn nack_does_not_resend_unreliable_ack_receipt_datagrams() {
2158        let mut session = Session::new(1400);
2159        let now = Instant::now();
2160
2161        assert!(matches!(
2162            session.queue_payload_with_receipt(
2163                Bytes::from_static(b"fire-and-forget"),
2164                Reliability::UnreliableWithAckReceipt,
2165                0,
2166                RakPriority::Normal,
2167                Some(77)
2168            ),
2169            QueuePayloadResult::Enqueued { .. }
2170        ));
2171
2172        let sent = session
2173            .on_tick(now, 1, usize::MAX, 0, 0)
2174            .into_iter()
2175            .next()
2176            .expect("datagram should be emitted");
2177        let seq = sent.header.sequence;
2178
2179        session.handle_nack_payload(AckNackPayload {
2180            ranges: vec![SequenceRange {
2181                start: seq,
2182                end: seq,
2183            }],
2184        });
2185        let nack_progress = session.process_incoming_receipts(now + Duration::from_millis(1));
2186        assert_eq!(nack_progress.nacked, 0);
2187
2188        let resent = session.collect_resendable(now + Duration::from_millis(1), 8, usize::MAX);
2189        assert!(resent.is_empty());
2190
2191        session.handle_ack_payload(AckNackPayload {
2192            ranges: vec![SequenceRange {
2193                start: seq,
2194                end: seq,
2195            }],
2196        });
2197        let ack_progress = session.process_incoming_receipts(now + Duration::from_millis(10));
2198        assert_eq!(ack_progress.acked, 1);
2199        assert_eq!(ack_progress.acked_receipt_ids, vec![77]);
2200    }
2201
2202    #[test]
2203    fn multiple_receipt_ids_from_single_datagram_are_reported_once_each() {
2204        let mut session = Session::new(1400);
2205        let now = Instant::now();
2206
2207        assert!(matches!(
2208            session.queue_payload_with_receipt(
2209                Bytes::from_static(b"first"),
2210                Reliability::ReliableOrdered,
2211                0,
2212                RakPriority::High,
2213                Some(10)
2214            ),
2215            QueuePayloadResult::Enqueued { .. }
2216        ));
2217        assert!(matches!(
2218            session.queue_payload_with_receipt(
2219                Bytes::from_static(b"second"),
2220                Reliability::ReliableOrdered,
2221                0,
2222                RakPriority::High,
2223                Some(20)
2224            ),
2225            QueuePayloadResult::Enqueued { .. }
2226        ));
2227
2228        let sent = session
2229            .on_tick(now, 1, usize::MAX, 0, 0)
2230            .into_iter()
2231            .next()
2232            .expect("datagram should be emitted");
2233        let seq = sent.header.sequence;
2234
2235        session.handle_ack_payload(AckNackPayload {
2236            ranges: vec![SequenceRange {
2237                start: seq,
2238                end: seq,
2239            }],
2240        });
2241        let mut receipts = session
2242            .process_incoming_receipts(now + Duration::from_millis(5))
2243            .acked_receipt_ids;
2244        receipts.sort_unstable();
2245        assert_eq!(receipts, vec![10, 20]);
2246    }
2247
2248    #[test]
2249    fn ack_flush_interval_defers_ack_until_deadline() {
2250        let tunables = SessionTunables {
2251            ack_nack_flush_profile: AckNackFlushProfile::Custom,
2252            ack_flush_interval: Duration::from_millis(50),
2253            nack_flush_interval: Duration::from_millis(1),
2254            ack_max_ranges_per_datagram: 64,
2255            nack_max_ranges_per_datagram: 64,
2256            ack_nack_priority: AckNackPriority::NackFirst,
2257            ..SessionTunables::default()
2258        };
2259        let mut session = Session::with_tunables(1400, tunables);
2260        let now = Instant::now();
2261
2262        session.process_datagram_sequence(Sequence24::new(0), now);
2263        let early = session.on_tick(now + Duration::from_millis(10), 0, 0, 0, 0);
2264        assert!(
2265            early.is_empty(),
2266            "ack must not flush before configured interval"
2267        );
2268
2269        let due = session.on_tick(now + Duration::from_millis(50), 0, 0, 0, 0);
2270        assert_eq!(due.len(), 1, "ack should flush at deadline");
2271        assert!(
2272            matches!(due[0].payload, DatagramPayload::Ack(_)),
2273            "flushed control packet must be ACK"
2274        );
2275    }
2276
2277    #[test]
2278    fn ack_batch_max_ranges_splits_large_ack_queue_across_ticks() {
2279        let tunables = SessionTunables {
2280            ack_nack_flush_profile: AckNackFlushProfile::Custom,
2281            ack_flush_interval: Duration::from_millis(1),
2282            nack_flush_interval: Duration::from_millis(1),
2283            ack_max_ranges_per_datagram: 2,
2284            nack_max_ranges_per_datagram: 64,
2285            ack_nack_priority: AckNackPriority::NackFirst,
2286            ..SessionTunables::default()
2287        };
2288        let mut session = Session::with_tunables(1400, tunables);
2289        let now = Instant::now();
2290
2291        session.outgoing_acks.push(SequenceRange {
2292            start: Sequence24::new(1),
2293            end: Sequence24::new(1),
2294        });
2295        session.outgoing_acks.push(SequenceRange {
2296            start: Sequence24::new(3),
2297            end: Sequence24::new(3),
2298        });
2299        session.outgoing_acks.push(SequenceRange {
2300            start: Sequence24::new(5),
2301            end: Sequence24::new(5),
2302        });
2303
2304        let first = session.on_tick(now, 0, 0, 0, 0);
2305        assert_eq!(first.len(), 1);
2306        match &first[0].payload {
2307            DatagramPayload::Ack(payload) => assert_eq!(payload.ranges.len(), 2),
2308            _ => panic!("expected ACK payload"),
2309        }
2310
2311        let second = session.on_tick(now + Duration::from_millis(1), 0, 0, 0, 0);
2312        assert_eq!(second.len(), 1);
2313        match &second[0].payload {
2314            DatagramPayload::Ack(payload) => assert_eq!(payload.ranges.len(), 1),
2315            _ => panic!("expected ACK payload"),
2316        }
2317    }
2318
2319    #[test]
2320    fn nack_first_priority_flushes_nack_before_ack() {
2321        let mut session = Session::new(1400);
2322        let now = Instant::now();
2323
2324        session.process_datagram_sequence(Sequence24::new(2), now);
2325        let out = session.on_tick(now + Duration::from_millis(10), 0, 0, 0, 0);
2326        assert_eq!(out.len(), 2, "both NACK and ACK must be flushed");
2327        assert!(
2328            matches!(out[0].payload, DatagramPayload::Nack(_)),
2329            "NACK must be emitted before ACK when priority is NackFirst"
2330        );
2331        assert!(
2332            matches!(out[1].payload, DatagramPayload::Ack(_)),
2333            "ACK must be emitted after NACK"
2334        );
2335
2336        let snapshot = session.metrics_snapshot();
2337        assert_eq!(snapshot.ack_out_datagrams, 1);
2338        assert_eq!(snapshot.nack_out_datagrams, 1);
2339    }
2340
2341    #[test]
2342    fn ack_first_priority_flushes_ack_before_nack_in_custom_policy() {
2343        let tunables = SessionTunables {
2344            ack_nack_flush_profile: AckNackFlushProfile::Custom,
2345            ack_flush_interval: Duration::from_millis(1),
2346            nack_flush_interval: Duration::from_millis(1),
2347            ack_max_ranges_per_datagram: 64,
2348            nack_max_ranges_per_datagram: 64,
2349            ack_nack_priority: AckNackPriority::AckFirst,
2350            ..SessionTunables::default()
2351        };
2352        let mut session = Session::with_tunables(1400, tunables);
2353        let now = Instant::now();
2354
2355        session.process_datagram_sequence(Sequence24::new(2), now);
2356        let out = session.on_tick(now + Duration::from_millis(1), 0, 0, 0, 0);
2357        assert_eq!(out.len(), 2, "both ACK and NACK must be flushed");
2358        assert!(
2359            matches!(out[0].payload, DatagramPayload::Ack(_)),
2360            "ACK must be emitted before NACK when priority is AckFirst"
2361        );
2362        assert!(
2363            matches!(out[1].payload, DatagramPayload::Nack(_)),
2364            "NACK must be emitted after ACK"
2365        );
2366
2367        let snapshot = session.metrics_snapshot();
2368        assert_eq!(snapshot.ack_out_datagrams, 1);
2369        assert_eq!(snapshot.nack_out_datagrams, 1);
2370    }
2371
2372    #[test]
2373    fn best_effort_zeroize_bytes_reports_success_for_unique_buffer() {
2374        let payload = Bytes::from(vec![0xAA, 0xBB, 0xCC]);
2375        assert!(
2376            super::best_effort_zeroize_bytes(payload),
2377            "uniquely-owned buffer should be writable for zeroize"
2378        );
2379    }
2380
2381    #[test]
2382    fn best_effort_zeroize_bytes_reports_failure_for_shared_buffer() {
2383        let payload = Bytes::from(vec![0xAA, 0xBB, 0xCC]);
2384        let shared = payload.clone();
2385        assert!(
2386            !super::best_effort_zeroize_bytes(payload),
2387            "shared buffer cannot be zeroized in-place without unique ownership"
2388        );
2389        drop(shared);
2390    }
2391}