1use std::collections::{VecDeque, BTreeMap};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use tokio::sync::{Mutex, Semaphore, Notify};
13use futures::future::poll_fn;
14use tokio::time::sleep;
15use std::task::{Poll, Context};
16use std::pin::Pin;
17
18use bytes::Bytes;
19use tracing::{debug, warn, info, trace, error};
20
21use crate::packet::{RtpPacket, rtcp::RtcpPacket};
22use crate::RtpSsrc;
23use crate::RtpTimestamp;
24
25use super::{BufferLimits, GlobalBufferManager, MemoryPermit, BufferPool, PooledBuffer};
26
27pub const DEFAULT_TRANSMIT_BUFFER_CAPACITY: usize = 1000;
29
30pub const DEFAULT_CONGESTION_WINDOW: usize = 64;
32
33pub const DEFAULT_MIN_RTO_MS: u64 = 70;
35
36pub const DEFAULT_MAX_RTO_MS: u64 = 1000;
38
39pub const DEFAULT_INITIAL_RTO_MS: u64 = 200;
41
42pub const DEFAULT_MAX_BURST: usize = 16;
44
45#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
47pub enum PacketPriority {
48 Control = 0,
50
51 High = 1,
53
54 Normal = 2,
56
57 Low = 3,
59}
60
61impl Default for PacketPriority {
62 fn default() -> Self {
63 Self::Normal
64 }
65}
66
67struct QueuedPacket {
69 packet: RtpPacket,
71
72 queue_time: Instant,
74
75 priority: PacketPriority,
77
78 is_retransmission: bool,
80
81 metadata: Option<PacketMetadata>,
83}
84
85#[derive(Debug, Clone)]
87pub struct PacketMetadata {
88 pub first_send_time: Option<Instant>,
90
91 pub transmit_count: u32,
93
94 pub acknowledged: bool,
96
97 pub last_send_time: Option<Instant>,
99}
100
101struct CongestionState {
103 cwnd: usize,
105
106 ssthresh: usize,
108
109 rto_ms: u64,
111
112 srtt_ms: Option<f64>,
114
115 rttvar_ms: Option<f64>,
117
118 last_window_reduction: Instant,
120
121 estimated_bps: u64,
123
124 in_flight: usize,
126
127 lost_packets: u64,
129
130 total_sent: u64,
132
133 last_seq_sent: u16,
135
136 in_slow_start: bool,
138}
139
140impl Default for CongestionState {
141 fn default() -> Self {
142 Self {
143 cwnd: DEFAULT_CONGESTION_WINDOW,
144 ssthresh: usize::MAX,
145 rto_ms: DEFAULT_INITIAL_RTO_MS,
146 srtt_ms: None,
147 rttvar_ms: None,
148 last_window_reduction: Instant::now(),
149 estimated_bps: 1_000_000, in_flight: 0,
151 lost_packets: 0,
152 total_sent: 0,
153 last_seq_sent: 0,
154 in_slow_start: true,
155 }
156 }
157}
158
159#[derive(Debug, Clone)]
161pub struct TransmitBufferConfig {
162 pub max_packets: usize,
164
165 pub initial_cwnd: usize,
167
168 pub min_rto_ms: u64,
170
171 pub max_rto_ms: u64,
173
174 pub initial_rto_ms: u64,
176
177 pub congestion_control_enabled: bool,
179
180 pub max_burst: usize,
182
183 pub prioritize_packets: bool,
185
186 pub max_packet_age_ms: u64,
188
189 pub clock_rate: u32,
191}
192
193impl Default for TransmitBufferConfig {
194 fn default() -> Self {
195 Self {
196 max_packets: DEFAULT_TRANSMIT_BUFFER_CAPACITY,
197 initial_cwnd: DEFAULT_CONGESTION_WINDOW,
198 min_rto_ms: DEFAULT_MIN_RTO_MS,
199 max_rto_ms: DEFAULT_MAX_RTO_MS,
200 initial_rto_ms: DEFAULT_INITIAL_RTO_MS,
201 congestion_control_enabled: true,
202 max_burst: DEFAULT_MAX_BURST,
203 prioritize_packets: true,
204 max_packet_age_ms: 1000, clock_rate: 8000, }
207 }
208}
209
210#[derive(Debug, Clone)]
212pub struct TransmitBufferStats {
213 pub queued_packets: usize,
215
216 pub packets_sent: u64,
218
219 pub packets_dropped_overflow: u64,
221
222 pub packets_dropped_aged: u64,
224
225 pub packets_retransmitted: u64,
227
228 pub cwnd: usize,
230
231 pub rto_ms: u64,
233
234 pub srtt_ms: Option<f64>,
236
237 pub estimated_bps: u64,
239
240 pub in_flight: usize,
242
243 pub loss_rate: f64,
245
246 pub buffer_fullness: f32,
248
249 pub packets_queued: usize,
251
252 pub packets_dropped: u64,
254}
255
256impl Default for TransmitBufferStats {
257 fn default() -> Self {
258 Self {
259 queued_packets: 0,
260 packets_sent: 0,
261 packets_dropped_overflow: 0,
262 packets_dropped_aged: 0,
263 packets_retransmitted: 0,
264 cwnd: DEFAULT_CONGESTION_WINDOW,
265 rto_ms: DEFAULT_INITIAL_RTO_MS,
266 srtt_ms: None,
267 estimated_bps: 1_000_000, in_flight: 0,
269 loss_rate: 0.0,
270 buffer_fullness: 0.0,
271 packets_queued: 0,
272 packets_dropped: 0,
273 }
274 }
275}
276
277pub struct TransmitBuffer {
286 config: TransmitBufferConfig,
288
289 queues: BTreeMap<PacketPriority, VecDeque<QueuedPacket>>,
292
293 congestion: CongestionState,
295
296 stats: TransmitBufferStats,
298
299 buffer_manager: Option<Arc<GlobalBufferManager>>,
301
302 packet_notify: Arc<Notify>,
304
305 packet_tracking: BTreeMap<u16, PacketMetadata>,
307
308 cwnd_semaphore: Arc<Semaphore>,
310
311 packet_buffer: Option<Arc<BufferPool>>,
313
314 ssrc: RtpSsrc,
316
317 last_send_time: Option<Instant>,
319
320 pacing_interval_us: u64,
322}
323
324impl TransmitBuffer {
325 pub fn new(ssrc: RtpSsrc, config: TransmitBufferConfig) -> Self {
327 let mut queues = BTreeMap::new();
328
329 queues.insert(PacketPriority::Control, VecDeque::with_capacity(100));
331 queues.insert(PacketPriority::High, VecDeque::with_capacity(config.max_packets / 4));
332 queues.insert(PacketPriority::Normal, VecDeque::with_capacity(config.max_packets / 2));
333 queues.insert(PacketPriority::Low, VecDeque::with_capacity(config.max_packets / 4));
334
335 let mut congestion = CongestionState::default();
337 congestion.cwnd = config.initial_cwnd;
338 congestion.rto_ms = config.initial_rto_ms;
339
340 let stats = TransmitBufferStats {
341 queued_packets: 0,
342 packets_sent: 0,
343 packets_dropped_overflow: 0,
344 packets_dropped_aged: 0,
345 packets_retransmitted: 0,
346 cwnd: config.initial_cwnd,
347 rto_ms: config.initial_rto_ms,
348 srtt_ms: None,
349 estimated_bps: 1_000_000, in_flight: 0,
351 loss_rate: 0.0,
352 buffer_fullness: 0.0,
353 packets_queued: 0,
354 packets_dropped: 0,
355 };
356
357 let cwnd_semaphore = Arc::new(Semaphore::new(config.initial_cwnd));
359
360 Self {
361 config,
362 queues,
363 congestion,
364 stats,
365 buffer_manager: None,
366 packet_notify: Arc::new(Notify::new()),
367 packet_tracking: BTreeMap::new(),
368 cwnd_semaphore,
369 packet_buffer: None,
370 ssrc,
371 last_send_time: None,
372 pacing_interval_us: 0,
373 }
374 }
375
376 pub fn with_buffer_manager(
378 ssrc: RtpSsrc,
379 config: TransmitBufferConfig,
380 buffer_manager: Arc<GlobalBufferManager>,
381 packet_buffer: Arc<BufferPool>,
382 ) -> Self {
383 let mut buffer = Self::new(ssrc, config);
384 buffer.buffer_manager = Some(buffer_manager);
385 buffer.packet_buffer = Some(packet_buffer);
386 buffer
387 }
388
389 pub async fn queue_packet(
393 &mut self,
394 packet: RtpPacket,
395 priority: PacketPriority
396 ) -> bool {
397 let total_packets = self.total_queued_packets();
399
400 if total_packets >= self.config.max_packets {
402 if priority != PacketPriority::High {
405 trace!("Buffer full ({}/{}), dropping {:?} priority packet with seq={}",
407 total_packets, self.config.max_packets,
408 priority, packet.header.sequence_number);
409 self.stats.packets_dropped_overflow += 1;
410 return false;
411 } else {
412 if !self.drop_low_priority_packets() {
414 trace!("Buffer full, nowhere to make room for high priority packet");
416 self.stats.packets_dropped_overflow += 1;
417 return false;
418 }
419 }
420 }
421
422 let metadata = PacketMetadata {
426 first_send_time: None,
427 transmit_count: 0,
428 acknowledged: false,
429 last_send_time: None,
430 };
431
432 let queued = QueuedPacket {
434 packet,
435 queue_time: Instant::now(),
436 priority,
437 is_retransmission: false,
438 metadata: Some(metadata),
439 };
440
441 let queue = self.queues.entry(priority).or_insert_with(|| VecDeque::new());
443
444 queue.push_back(queued);
446
447 self.stats.queued_packets = self.total_queued_packets();
449
450 self.packet_notify.notify_one();
452
453 true
454 }
455
456 pub async fn schedule_retransmission(&mut self, seq: u16) -> bool {
460 if let Some(metadata) = self.packet_tracking.get_mut(&seq) {
462 if !metadata.acknowledged {
464 for (priority, queue) in self.queues.iter() {
467 for queued_packet in queue {
468 if queued_packet.packet.header.sequence_number == seq {
469 trace!("Packet seq={} already queued for retransmission", seq);
471 return true;
472 }
473 }
474 }
475
476 warn!("Requested retransmission for seq={} but packet not available", seq);
483
484 self.stats.packets_retransmitted += 1;
486
487 return false;
488 }
489 }
490
491 false
492 }
493
494 pub async fn get_next_packet(&mut self) -> Option<RtpPacket> {
498 if self.total_queued_packets() == 0 {
500 return None;
501 }
502
503 if !self.config.congestion_control_enabled {
505 return self.get_packet_without_congestion_control().await;
506 }
507
508 if self.congestion.in_flight > 0 {
510 if self.congestion.in_flight >= self.congestion.cwnd {
512 trace!("Congestion window full ({}/{}), not sending new packets",
513 self.congestion.in_flight, self.congestion.cwnd);
514 return None;
515 }
516 }
517
518 match self.cwnd_semaphore.try_acquire() {
520 Ok(permit) => {
521 drop(permit);
523 }
524 Err(_) => {
525 trace!("No congestion permits available, not sending");
527 return None;
528 }
529 }
530
531 if self.pacing_interval_us > 0 {
533 self.apply_pacing().await;
534 }
535
536 let packet_option = self.dequeue_highest_priority_packet();
538
539 if let Some(packet) = &packet_option {
540 self.congestion.in_flight += 1;
542 self.congestion.total_sent += 1;
543
544 self.stats.in_flight = self.congestion.in_flight;
546 self.stats.packets_sent += 1;
547
548 self.last_send_time = Some(Instant::now());
550 }
551
552 packet_option
553 }
554
555 pub async fn wait_for_packet(&self, timeout: Duration) -> bool {
559 if self.total_queued_packets() > 0 && self.cwnd_semaphore.available_permits() > 0 {
561 return true;
562 }
563
564 let notify = self.packet_notify.clone();
566 tokio::select! {
567 _ = notify.notified() => true,
568 _ = tokio::time::sleep(timeout) => false,
569 }
570 }
571
572 pub fn process_rtcp_feedback(&mut self, rtcp: &RtcpPacket) {
574 match rtcp {
576 RtcpPacket::ReceiverReport(_) => {
577 let rtt_ms = 50.0; self.update_rtt_estimate(rtt_ms);
583
584 self.stats.loss_rate = 0.01; self.update_congestion_window(None);
589 },
590 RtcpPacket::SenderReport(_) => {
591 },
593 _ => {
594 },
596 }
597 }
598
599 pub fn acknowledge_packet(&mut self, seq: u16) {
601 if let Some(metadata) = self.packet_tracking.get_mut(&seq) {
602 metadata.acknowledged = true;
603
604 if self.congestion.in_flight > 0 {
606 self.congestion.in_flight -= 1;
607 self.stats.in_flight = self.congestion.in_flight;
608 }
609
610 self.cwnd_semaphore.add_permits(1);
612
613 if self.config.congestion_control_enabled {
615 self.update_congestion_window(Some(seq));
616 }
617 }
618 }
619
620 fn update_rtt_estimate(&mut self, rtt_ms: f64) {
622 if let (Some(srtt), Some(rttvar)) = (self.congestion.srtt_ms, self.congestion.rttvar_ms) {
623 let alpha = 0.125;
629 let beta = 0.25;
630
631 let new_rttvar = (1.0 - beta) * rttvar + beta * (srtt - rtt_ms).abs();
632 let new_srtt = (1.0 - alpha) * srtt + alpha * rtt_ms;
633
634 self.congestion.rttvar_ms = Some(new_rttvar);
635 self.congestion.srtt_ms = Some(new_srtt);
636
637 let new_rto = new_srtt + 4.0 * new_rttvar;
639
640 let clamped_rto = (new_rto.round() as u64)
642 .max(self.config.min_rto_ms)
643 .min(self.config.max_rto_ms);
644
645 self.congestion.rto_ms = clamped_rto;
646 } else {
647 let srtt = rtt_ms;
649 let rttvar = rtt_ms / 2.0;
650
651 self.congestion.srtt_ms = Some(srtt);
652 self.congestion.rttvar_ms = Some(rttvar);
653
654 let new_rto = srtt + 4.0 * rttvar;
656
657 let clamped_rto = (new_rto.round() as u64)
659 .max(self.config.min_rto_ms)
660 .min(self.config.max_rto_ms);
661
662 self.congestion.rto_ms = clamped_rto;
663 }
664
665 self.stats.srtt_ms = self.congestion.srtt_ms;
667 self.stats.rto_ms = self.congestion.rto_ms;
668 }
669
670 fn congestion_event(&mut self) {
672 let now = Instant::now();
673
674 if now.duration_since(self.congestion.last_window_reduction).as_millis() < self.congestion.rto_ms as u128 {
676 return;
677 }
678
679 self.congestion.last_window_reduction = now;
681
682 let new_cwnd = (self.congestion.cwnd / 2).max(2);
684
685 if self.congestion.in_slow_start {
686 self.congestion.in_slow_start = false;
688
689 self.congestion.ssthresh = new_cwnd;
691 }
692
693 self.congestion.cwnd = new_cwnd;
695
696 self.update_pacing();
698
699 let in_flight = self.congestion.in_flight;
701
702 let available = if in_flight < self.congestion.cwnd {
704 self.congestion.cwnd - in_flight
705 } else {
706 0
707 };
708
709 self.cwnd_semaphore = Arc::new(Semaphore::new(available));
711
712 self.stats.cwnd = self.congestion.cwnd;
714
715 debug!(
716 "Congestion event: cwnd={} -> {}, in_flight={}, loss_rate={:.2}%",
717 self.congestion.cwnd * 2,
718 self.congestion.cwnd,
719 in_flight,
720 self.stats.loss_rate * 100.0
721 );
722 }
723
724 fn update_congestion_window(&mut self, seq: Option<u16>) {
726 if self.congestion.in_slow_start {
727 let new_cwnd = self.congestion.cwnd + 1;
730
731 if new_cwnd >= self.congestion.ssthresh {
733 self.congestion.in_slow_start = false;
734 }
735
736 self.congestion.cwnd = new_cwnd;
737 } else {
738 if let Some(ack_seq) = seq {
742 let cwnd = self.congestion.cwnd;
743 if ack_seq % (cwnd as u16) == 0 {
744 self.congestion.cwnd += 1;
745 }
746 }
747 }
748
749 self.update_pacing();
751
752 self.stats.cwnd = self.congestion.cwnd;
754 }
755
756 fn update_pacing(&mut self) {
758 if let Some(srtt_ms) = self.congestion.srtt_ms {
760 let srtt_us = (srtt_ms * 1000.0) as u64;
762
763 const MIN_INTERVAL_US: u64 = 100;
765
766 let interval = if self.congestion.cwnd > 0 {
768 srtt_us / self.congestion.cwnd as u64
769 } else {
770 srtt_us
771 };
772
773 self.pacing_interval_us = interval.max(MIN_INTERVAL_US);
775 } else {
776 self.pacing_interval_us = 1000; }
779 }
780
781 async fn apply_pacing(&mut self) {
783 if self.pacing_interval_us == 0 {
784 return;
785 }
786
787 if let Some(last_send_time) = self.last_send_time {
788 let now = Instant::now();
789 let elapsed_us = now.duration_since(last_send_time).as_micros() as u64;
790
791 if elapsed_us < self.pacing_interval_us {
792 let wait_us = self.pacing_interval_us - elapsed_us;
794
795 if wait_us > 100 { sleep(Duration::from_micros(wait_us)).await;
797 }
798 }
799 }
800
801 self.last_send_time = Some(Instant::now());
803 }
804
805 fn total_queued_packets(&self) -> usize {
807 self.queues.values().map(|q| q.len()).sum()
808 }
809
810 fn drop_low_priority_packets(&mut self) -> bool {
814 let mut dropped = false;
815
816 if let Some(queue) = self.queues.get_mut(&PacketPriority::Low) {
818 if !queue.is_empty() {
819 queue.pop_front();
820 self.stats.packets_dropped_overflow += 1;
821 self.stats.queued_packets = self.total_queued_packets();
822 dropped = true;
823 trace!("Dropped low priority packet to make room");
824 return dropped;
825 }
826 }
827
828 if let Some(queue) = self.queues.get_mut(&PacketPriority::Normal) {
830 if !queue.is_empty() {
831 queue.pop_front();
832 self.stats.packets_dropped_overflow += 1;
833 self.stats.queued_packets = self.total_queued_packets();
834 dropped = true;
835 trace!("Dropped normal priority packet to make room");
836 return dropped;
837 }
838 }
839
840 trace!("No low/normal priority packets available to drop");
842 dropped
843 }
844
845 fn dequeue_highest_priority_packet(&mut self) -> Option<RtpPacket> {
847 for priority in [
849 PacketPriority::Control,
850 PacketPriority::High,
851 PacketPriority::Normal,
852 PacketPriority::Low,
853 ] {
854 if let Some(queue) = self.queues.get_mut(&priority) {
855 if !queue.is_empty() {
856 let queued_packet = queue.pop_front().unwrap();
857
858 self.stats.queued_packets = self.total_queued_packets();
860
861 if let Some(mut metadata) = queued_packet.metadata {
863 let now = Instant::now();
864 let seq = queued_packet.packet.header.sequence_number;
865
866 if metadata.first_send_time.is_none() {
867 metadata.first_send_time = Some(now);
868 }
869
870 metadata.transmit_count += 1;
871 metadata.last_send_time = Some(now);
872
873 self.packet_tracking.insert(seq, metadata);
875
876 self.congestion.last_seq_sent = seq;
878 }
879
880 return Some(queued_packet.packet);
881 }
882 }
883 }
884
885 None
886 }
887
888 async fn get_packet_without_congestion_control(&mut self) -> Option<RtpPacket> {
892 let packet_option = self.dequeue_highest_priority_packet();
894
895 if packet_option.is_some() {
896 self.stats.packets_sent += 1;
898
899 self.last_send_time = Some(Instant::now());
901 }
902
903 packet_option
904 }
905
906 pub fn purge_expired_packets(&mut self) {
910 let now = Instant::now();
911 let max_age = Duration::from_millis(self.config.max_packet_age_ms);
912
913 for queue in self.queues.values_mut() {
915 let mut i = 0;
917 while i < queue.len() {
918 if now.duration_since(queue[i].queue_time) > max_age {
919 queue.remove(i);
920 self.stats.packets_dropped_aged += 1;
921 } else {
922 i += 1;
923 }
924 }
925 }
926
927 self.stats.queued_packets = self.total_queued_packets();
929 }
930
931 pub fn clear(&mut self) {
933 for queue in self.queues.values_mut() {
935 queue.clear();
936 }
937
938 self.packet_tracking.clear();
940
941 self.stats.queued_packets = 0;
943 }
944
945 pub fn get_stats(&self) -> TransmitBufferStats {
947 let total_capacity = self.config.max_packets;
949 let current_queued = self.total_queued_packets();
950 let buffer_fullness = if total_capacity > 0 {
951 current_queued as f32 / total_capacity as f32
952 } else {
953 0.0
954 };
955
956 let mut stats = self.stats.clone();
958 stats.queued_packets = current_queued;
959 stats.buffer_fullness = buffer_fullness;
960 stats.packets_queued = current_queued;
961 stats.packets_dropped = stats.packets_dropped_overflow + stats.packets_dropped_aged;
962
963 stats
964 }
965
966 pub fn update_config(&mut self, config: TransmitBufferConfig) {
968 let old_max_packets = self.config.max_packets;
970 let old_cwnd = self.config.initial_cwnd;
971 let old_cc_enabled = self.config.congestion_control_enabled;
972
973 self.config = config;
975
976 if old_cwnd != self.config.initial_cwnd {
978 if !old_cc_enabled || !self.config.congestion_control_enabled {
980 self.congestion.cwnd = self.config.initial_cwnd;
981
982 let in_flight = self.congestion.in_flight;
984 let available = if in_flight < self.congestion.cwnd {
985 self.congestion.cwnd - in_flight
986 } else {
987 0
988 };
989
990 self.cwnd_semaphore = Arc::new(Semaphore::new(available));
992
993 self.stats.cwnd = self.congestion.cwnd;
995 }
996 }
997
998 self.update_pacing();
1000
1001 debug!("Updated transmit buffer config: max_packets={}, cwnd={}, cc_enabled={}",
1002 self.config.max_packets, self.congestion.cwnd, self.config.congestion_control_enabled);
1003 }
1004
1005 pub fn set_priority_threshold(&mut self, buffer_fullness: f32, priority: PacketPriority) {
1011 debug!("Setting priority threshold: at {:.1}% fullness, only {:?} or higher priority will be sent",
1013 buffer_fullness * 100.0, priority);
1014
1015 }
1022}
1023
1024#[cfg(test)]
1025mod tests {
1026 use super::*;
1027 use bytes::Bytes;
1028 use crate::packet::{RtpHeader, RtpPacket};
1029
1030 fn create_test_packet(seq: u16, ts: u32, ssrc: RtpSsrc) -> RtpPacket {
1031 let header = RtpHeader::new(96, seq, ts, ssrc);
1032 let payload = Bytes::from_static(b"test");
1033 RtpPacket::new(header, payload)
1034 }
1035
1036 #[tokio::test]
1037 async fn test_basic_queuing() {
1038 let config = TransmitBufferConfig::default();
1039 let mut buffer = TransmitBuffer::new(0x12345678, config);
1040
1041 buffer.queue_packet(
1043 create_test_packet(1, 0, 0x12345678),
1044 PacketPriority::Normal
1045 ).await;
1046
1047 buffer.queue_packet(
1048 create_test_packet(2, 160, 0x12345678),
1049 PacketPriority::High
1050 ).await;
1051
1052 let p1 = buffer.get_next_packet().await;
1054 let p2 = buffer.get_next_packet().await;
1055
1056 assert!(p1.is_some());
1057 assert!(p2.is_some());
1058
1059 assert_eq!(p1.unwrap().header.sequence_number, 2);
1061 assert_eq!(p2.unwrap().header.sequence_number, 1);
1062 }
1063
1064 #[tokio::test]
1065 async fn test_buffer_overflow() {
1066 let config = TransmitBufferConfig {
1067 max_packets: 2,
1068 ..Default::default()
1069 };
1070
1071 let mut buffer = TransmitBuffer::new(0x12345678, config);
1072
1073 assert!(buffer.queue_packet(
1075 create_test_packet(1, 0, 0x12345678),
1076 PacketPriority::Normal
1077 ).await, "First packet should be queued");
1078
1079 assert!(buffer.queue_packet(
1080 create_test_packet(2, 160, 0x12345678),
1081 PacketPriority::Normal
1082 ).await, "Second packet should be queued");
1083
1084 let stats = buffer.get_stats();
1086 assert_eq!(stats.queued_packets, 2, "Buffer should have 2 packets");
1087
1088 assert!(!buffer.queue_packet(
1090 create_test_packet(3, 320, 0x12345678),
1091 PacketPriority::Normal
1092 ).await, "Third normal packet should be rejected");
1093
1094 assert!(buffer.queue_packet(
1096 create_test_packet(3, 320, 0x12345678),
1097 PacketPriority::High
1098 ).await, "High priority packet should be accepted");
1099
1100 let stats = buffer.get_stats();
1102 assert_eq!(stats.queued_packets, 2, "Buffer should still have 2 packets");
1103
1104 let p1 = buffer.get_next_packet().await;
1106 let p2 = buffer.get_next_packet().await;
1107 let p3 = buffer.get_next_packet().await;
1108
1109 assert!(p1.is_some(), "First packet (high priority) should be available");
1110 assert!(p2.is_some(), "Second packet should be available");
1111 assert!(p3.is_none(), "Buffer should be empty after 2 packets");
1112
1113 assert_eq!(p1.unwrap().header.sequence_number, 3, "First packet should be the high priority one");
1115 assert_eq!(p2.unwrap().header.sequence_number, 2, "Second packet should be the remaining normal one");
1116 }
1117
1118 #[tokio::test]
1119 async fn test_congestion_control() {
1120 let config = TransmitBufferConfig {
1121 initial_cwnd: 2, congestion_control_enabled: true,
1123 ..Default::default()
1124 };
1125
1126 let mut buffer = TransmitBuffer::new(0x12345678, config);
1127
1128 for i in 1..=5 {
1130 assert!(buffer.queue_packet(
1131 create_test_packet(i, (i as u32) * 160, 0x12345678),
1132 PacketPriority::Normal
1133 ).await, "Packet {} should be queued", i);
1134 }
1135
1136 let stats = buffer.get_stats();
1138 assert_eq!(stats.queued_packets, 5, "Buffer should have 5 queued packets");
1139
1140 let p1 = buffer.get_next_packet().await;
1142 assert!(p1.is_some(), "First packet should be available");
1143
1144 let p2 = buffer.get_next_packet().await;
1145 assert!(p2.is_some(), "Second packet should be available");
1146
1147 let stats = buffer.get_stats();
1149 assert_eq!(stats.in_flight, 2, "Should have 2 packets in flight");
1150
1151 let p3 = buffer.get_next_packet().await;
1153 assert!(p3.is_none(), "Third packet should be blocked by congestion window");
1154
1155 buffer.acknowledge_packet(1);
1157
1158 let stats = buffer.get_stats();
1160 assert_eq!(stats.in_flight, 1, "Should have 1 packet in flight after ACK");
1161
1162 let p3 = buffer.get_next_packet().await;
1164 assert!(p3.is_some(), "Third packet should be available after ACK");
1165 assert_eq!(p3.unwrap().header.sequence_number, 3, "Third packet should have seq=3");
1166 }
1167}