rvoip_rtp_core/buffer/
transmit.rs

1//! Transmit buffer for outbound RTP packets
2//!
3//! This module provides a high-performance transmit buffer with:
4//! - Prioritization of different packet types
5//! - Congestion control using RTCP feedback
6//! - Buffer limits to prevent memory exhaustion
7//! - Efficient packet scheduling
8
9use std::collections::{VecDeque, BTreeMap};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use tokio::sync::{Mutex, Semaphore, Notify};
13use futures::future::poll_fn;
14use tokio::time::sleep;
15use std::task::{Poll, Context};
16use std::pin::Pin;
17
18use bytes::Bytes;
19use tracing::{debug, warn, info, trace, error};
20
21use crate::packet::{RtpPacket, rtcp::RtcpPacket};
22use crate::RtpSsrc;
23use crate::RtpTimestamp;
24
25use super::{BufferLimits, GlobalBufferManager, MemoryPermit, BufferPool, PooledBuffer};
26
27/// Default transmit buffer capacity
28pub const DEFAULT_TRANSMIT_BUFFER_CAPACITY: usize = 1000;
29
30/// Default congestion window size (packets)
31pub const DEFAULT_CONGESTION_WINDOW: usize = 64;
32
33/// Default minimum retransmission timeout
34pub const DEFAULT_MIN_RTO_MS: u64 = 70;
35
36/// Default maximum retransmission timeout
37pub const DEFAULT_MAX_RTO_MS: u64 = 1000;
38
39/// Default initial retransmission timeout
40pub const DEFAULT_INITIAL_RTO_MS: u64 = 200;
41
42/// Default maximum burst size (packets)
43pub const DEFAULT_MAX_BURST: usize = 16;
44
45/// Packet priority levels
46#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
47pub enum PacketPriority {
48    /// Critical control packets (RTCP BYE, SR, RR)
49    Control = 0,
50    
51    /// High priority media (e.g., I-frames in video)
52    High = 1,
53    
54    /// Normal priority media (typical packets)
55    Normal = 2,
56    
57    /// Low priority (can be dropped first)
58    Low = 3,
59}
60
61impl Default for PacketPriority {
62    fn default() -> Self {
63        Self::Normal
64    }
65}
66
67/// Packet in the transmit queue
68struct QueuedPacket {
69    /// The RTP packet to transmit
70    packet: RtpPacket,
71    
72    /// When the packet was queued
73    queue_time: Instant,
74    
75    /// Priority of this packet
76    priority: PacketPriority,
77    
78    /// Whether this is a retransmission
79    is_retransmission: bool,
80    
81    /// Metadata for the packet
82    metadata: Option<PacketMetadata>,
83}
84
85/// Metadata for packet tracking
86#[derive(Debug, Clone)]
87pub struct PacketMetadata {
88    /// Timestamp when packet was first sent
89    pub first_send_time: Option<Instant>,
90    
91    /// Number of transmission attempts
92    pub transmit_count: u32,
93    
94    /// Whether the packet was acknowledged
95    pub acknowledged: bool,
96    
97    /// Last time the packet was sent
98    pub last_send_time: Option<Instant>,
99}
100
101/// Congestion control state
102struct CongestionState {
103    /// Current congestion window size (packets)
104    cwnd: usize,
105    
106    /// Slow start threshold
107    ssthresh: usize,
108    
109    /// Current retransmission timeout (ms)
110    rto_ms: u64,
111    
112    /// Smoothed round-trip time (ms)
113    srtt_ms: Option<f64>,
114    
115    /// RTT variation (ms)
116    rttvar_ms: Option<f64>,
117    
118    /// Last time window was reduced
119    last_window_reduction: Instant,
120    
121    /// Current estimate of network bandwidth (bps)
122    estimated_bps: u64,
123    
124    /// Packets in flight
125    in_flight: usize,
126    
127    /// Lost packets detected
128    lost_packets: u64,
129    
130    /// Total packets sent
131    total_sent: u64,
132    
133    /// Last sequence number sent
134    last_seq_sent: u16,
135    
136    /// Whether we're in slow start
137    in_slow_start: bool,
138}
139
140impl Default for CongestionState {
141    fn default() -> Self {
142        Self {
143            cwnd: DEFAULT_CONGESTION_WINDOW,
144            ssthresh: usize::MAX,
145            rto_ms: DEFAULT_INITIAL_RTO_MS,
146            srtt_ms: None,
147            rttvar_ms: None,
148            last_window_reduction: Instant::now(),
149            estimated_bps: 1_000_000, // 1 Mbps initial guess
150            in_flight: 0,
151            lost_packets: 0,
152            total_sent: 0,
153            last_seq_sent: 0,
154            in_slow_start: true,
155        }
156    }
157}
158
159/// Transmit buffer configuration
160#[derive(Debug, Clone)]
161pub struct TransmitBufferConfig {
162    /// Maximum packets to buffer for transmission
163    pub max_packets: usize,
164    
165    /// Initial congestion window size (packets)
166    pub initial_cwnd: usize,
167    
168    /// Minimum retransmission timeout (ms)
169    pub min_rto_ms: u64,
170    
171    /// Maximum retransmission timeout (ms)
172    pub max_rto_ms: u64,
173    
174    /// Initial retransmission timeout (ms)
175    pub initial_rto_ms: u64,
176    
177    /// Whether to enable congestion control
178    pub congestion_control_enabled: bool,
179    
180    /// Maximum burst size (packets)
181    pub max_burst: usize,
182    
183    /// Whether to prioritize packets
184    pub prioritize_packets: bool,
185    
186    /// Maximum packet age before dropping (ms)
187    pub max_packet_age_ms: u64,
188    
189    /// Clock rate for timestamp calculations
190    pub clock_rate: u32,
191}
192
193impl Default for TransmitBufferConfig {
194    fn default() -> Self {
195        Self {
196            max_packets: DEFAULT_TRANSMIT_BUFFER_CAPACITY,
197            initial_cwnd: DEFAULT_CONGESTION_WINDOW,
198            min_rto_ms: DEFAULT_MIN_RTO_MS,
199            max_rto_ms: DEFAULT_MAX_RTO_MS,
200            initial_rto_ms: DEFAULT_INITIAL_RTO_MS,
201            congestion_control_enabled: true,
202            max_burst: DEFAULT_MAX_BURST,
203            prioritize_packets: true,
204            max_packet_age_ms: 1000, // 1 second
205            clock_rate: 8000, // Default for audio
206        }
207    }
208}
209
210/// Statistics for the transmit buffer
211#[derive(Debug, Clone)]
212pub struct TransmitBufferStats {
213    /// Current number of packets in the queue
214    pub queued_packets: usize,
215    
216    /// Total packets sent
217    pub packets_sent: u64,
218    
219    /// Packets dropped due to queue overflow
220    pub packets_dropped_overflow: u64,
221    
222    /// Packets dropped due to age
223    pub packets_dropped_aged: u64,
224    
225    /// Packets retransmitted
226    pub packets_retransmitted: u64,
227    
228    /// Current congestion window size
229    pub cwnd: usize,
230    
231    /// Current retransmission timeout (ms)
232    pub rto_ms: u64,
233    
234    /// Current estimated RTT (ms)
235    pub srtt_ms: Option<f64>,
236    
237    /// Estimated bandwidth (bps)
238    pub estimated_bps: u64,
239    
240    /// Packets currently in flight
241    pub in_flight: usize,
242    
243    /// Packet loss rate (0.0-1.0)
244    pub loss_rate: f64,
245    
246    /// Buffer fullness percentage (0.0-1.0)
247    pub buffer_fullness: f32,
248    
249    /// Access for the API layer to these stats
250    pub packets_queued: usize,
251    
252    /// Total number of packets that have been dropped
253    pub packets_dropped: u64,
254}
255
256impl Default for TransmitBufferStats {
257    fn default() -> Self {
258        Self {
259            queued_packets: 0,
260            packets_sent: 0,
261            packets_dropped_overflow: 0,
262            packets_dropped_aged: 0,
263            packets_retransmitted: 0,
264            cwnd: DEFAULT_CONGESTION_WINDOW,
265            rto_ms: DEFAULT_INITIAL_RTO_MS,
266            srtt_ms: None,
267            estimated_bps: 1_000_000, // 1 Mbps initial guess
268            in_flight: 0,
269            loss_rate: 0.0,
270            buffer_fullness: 0.0,
271            packets_queued: 0,
272            packets_dropped: 0,
273        }
274    }
275}
276
277/// High-performance transmit buffer for RTP packets
278///
279/// This implementation provides:
280/// - Efficient packet queuing and sending
281/// - Congestion control and bandwidth estimation
282/// - Packet prioritization
283/// - RTCP-based feedback handling
284/// - Memory management with global limits
285pub struct TransmitBuffer {
286    /// Configuration
287    config: TransmitBufferConfig,
288    
289    /// Priority queues for packets
290    /// Lower priority value = higher priority (will be sent first)
291    queues: BTreeMap<PacketPriority, VecDeque<QueuedPacket>>,
292    
293    /// Congestion control state
294    congestion: CongestionState,
295    
296    /// Statistics
297    stats: TransmitBufferStats,
298    
299    /// Reference to global buffer manager
300    buffer_manager: Option<Arc<GlobalBufferManager>>,
301    
302    /// Notification for new packets
303    packet_notify: Arc<Notify>,
304    
305    /// Sequence -> packet metadata map for tracking
306    packet_tracking: BTreeMap<u16, PacketMetadata>,
307    
308    /// Permits for congestion window
309    cwnd_semaphore: Arc<Semaphore>,
310    
311    /// Buffer for efficient packet ordering
312    packet_buffer: Option<Arc<BufferPool>>,
313    
314    /// SSRC for this transmit buffer
315    ssrc: RtpSsrc,
316    
317    /// Last packet send time
318    last_send_time: Option<Instant>,
319    
320    /// Pacing state
321    pacing_interval_us: u64,
322}
323
324impl TransmitBuffer {
325    /// Create a new transmit buffer
326    pub fn new(ssrc: RtpSsrc, config: TransmitBufferConfig) -> Self {
327        let mut queues = BTreeMap::new();
328        
329        // Initialize priority queues
330        queues.insert(PacketPriority::Control, VecDeque::with_capacity(100));
331        queues.insert(PacketPriority::High, VecDeque::with_capacity(config.max_packets / 4));
332        queues.insert(PacketPriority::Normal, VecDeque::with_capacity(config.max_packets / 2));
333        queues.insert(PacketPriority::Low, VecDeque::with_capacity(config.max_packets / 4));
334        
335        // Initialize congestion state
336        let mut congestion = CongestionState::default();
337        congestion.cwnd = config.initial_cwnd;
338        congestion.rto_ms = config.initial_rto_ms;
339        
340        let stats = TransmitBufferStats {
341            queued_packets: 0,
342            packets_sent: 0,
343            packets_dropped_overflow: 0,
344            packets_dropped_aged: 0,
345            packets_retransmitted: 0,
346            cwnd: config.initial_cwnd,
347            rto_ms: config.initial_rto_ms,
348            srtt_ms: None,
349            estimated_bps: 1_000_000, // 1 Mbps initial guess
350            in_flight: 0,
351            loss_rate: 0.0,
352            buffer_fullness: 0.0,
353            packets_queued: 0,
354            packets_dropped: 0,
355        };
356        
357        // Create congestion window semaphore
358        let cwnd_semaphore = Arc::new(Semaphore::new(config.initial_cwnd));
359        
360        Self {
361            config,
362            queues,
363            congestion,
364            stats,
365            buffer_manager: None,
366            packet_notify: Arc::new(Notify::new()),
367            packet_tracking: BTreeMap::new(),
368            cwnd_semaphore,
369            packet_buffer: None,
370            ssrc,
371            last_send_time: None,
372            pacing_interval_us: 0,
373        }
374    }
375    
376    /// Create a new transmit buffer with global buffer management
377    pub fn with_buffer_manager(
378        ssrc: RtpSsrc,
379        config: TransmitBufferConfig,
380        buffer_manager: Arc<GlobalBufferManager>,
381        packet_buffer: Arc<BufferPool>,
382    ) -> Self {
383        let mut buffer = Self::new(ssrc, config);
384        buffer.buffer_manager = Some(buffer_manager);
385        buffer.packet_buffer = Some(packet_buffer);
386        buffer
387    }
388    
389    /// Queue a packet for transmission
390    ///
391    /// Returns true if the packet was queued, false if it was dropped.
392    pub async fn queue_packet(
393        &mut self, 
394        packet: RtpPacket, 
395        priority: PacketPriority
396    ) -> bool {
397        // Check if buffer has reached maximum capacity
398        let total_packets = self.total_queued_packets();
399        
400        // Fast reject if at max capacity and priority isn't high enough to drop other packets
401        if total_packets >= self.config.max_packets {
402            // For normal or low priority packets, drop if we're at capacity
403            // High priority packets can try to make room by dropping lower priority ones
404            if priority != PacketPriority::High {
405                // Normal or low priority when buffer is full - drop immediately
406                trace!("Buffer full ({}/{}), dropping {:?} priority packet with seq={}",
407                      total_packets, self.config.max_packets,
408                      priority, packet.header.sequence_number);
409                self.stats.packets_dropped_overflow += 1;
410                return false;
411            } else {
412                // High priority packet - try to drop something to make room
413                if !self.drop_low_priority_packets() {
414                    // If there's nothing to drop, drop this packet too
415                    trace!("Buffer full, nowhere to make room for high priority packet");
416                    self.stats.packets_dropped_overflow += 1;
417                    return false;
418                }
419            }
420        }
421        
422        // At this point we have room (or made room) in the buffer
423        
424        // Create packet metadata for tracking
425        let metadata = PacketMetadata {
426            first_send_time: None,
427            transmit_count: 0,
428            acknowledged: false,
429            last_send_time: None,
430        };
431        
432        // Create queued packet entry
433        let queued = QueuedPacket {
434            packet,
435            queue_time: Instant::now(),
436            priority,
437            is_retransmission: false,
438            metadata: Some(metadata),
439        };
440        
441        // Get or create queue for this priority
442        let queue = self.queues.entry(priority).or_insert_with(|| VecDeque::new());
443        
444        // Add to queue
445        queue.push_back(queued);
446        
447        // Update stats
448        self.stats.queued_packets = self.total_queued_packets();
449        
450        // Notify waiters
451        self.packet_notify.notify_one();
452        
453        true
454    }
455    
456    /// Schedule a packet retransmission
457    ///
458    /// Returns true if the retransmission was scheduled, false otherwise.
459    pub async fn schedule_retransmission(&mut self, seq: u16) -> bool {
460        // Check if we're tracking this packet
461        if let Some(metadata) = self.packet_tracking.get_mut(&seq) {
462            // Only retransmit if not already acknowledged
463            if !metadata.acknowledged {
464                // Find the packet to retransmit (we may still have it)
465                // Look through all queues in order of priority
466                for (priority, queue) in self.queues.iter() {
467                    for queued_packet in queue {
468                        if queued_packet.packet.header.sequence_number == seq {
469                            // Packet is already queued, just update stats
470                            trace!("Packet seq={} already queued for retransmission", seq);
471                            return true;
472                        }
473                    }
474                }
475                
476                // We don't have the packet in queue, need to rebuild it
477                // This would require access to the original data
478                // In a real implementation, we'd need to cache the packets
479                // or have the application rebuild it
480                
481                // For now, just log it
482                warn!("Requested retransmission for seq={} but packet not available", seq);
483                
484                // Update stats
485                self.stats.packets_retransmitted += 1;
486                
487                return false;
488            }
489        }
490        
491        false
492    }
493    
494    /// Get the next packet to send
495    ///
496    /// This handles congestion control if enabled.
497    pub async fn get_next_packet(&mut self) -> Option<RtpPacket> {
498        // Nothing to send
499        if self.total_queued_packets() == 0 {
500            return None;
501        }
502        
503        // If congestion control is not enabled, bypass window checks
504        if !self.config.congestion_control_enabled {
505            return self.get_packet_without_congestion_control().await;
506        }
507        
508        // Skip this check if we have no packets in flight yet
509        if self.congestion.in_flight > 0 {
510            // Check if window is already full
511            if self.congestion.in_flight >= self.congestion.cwnd {
512                trace!("Congestion window full ({}/{}), not sending new packets", 
513                      self.congestion.in_flight, self.congestion.cwnd);
514                return None;
515            }
516        }
517        
518        // Try to acquire a congestion window permit
519        match self.cwnd_semaphore.try_acquire() {
520            Ok(permit) => {
521                // Permit acquired, we can send
522                drop(permit);
523            }
524            Err(_) => {
525                // No permits available, congestion window is full
526                trace!("No congestion permits available, not sending");
527                return None;
528            }
529        }
530        
531        // Apply pacing if needed
532        if self.pacing_interval_us > 0 {
533            self.apply_pacing().await;
534        }
535        
536        // Get highest priority packet
537        let packet_option = self.dequeue_highest_priority_packet();
538        
539        if let Some(packet) = &packet_option {
540            // Update in-flight count
541            self.congestion.in_flight += 1;
542            self.congestion.total_sent += 1;
543            
544            // Update stats
545            self.stats.in_flight = self.congestion.in_flight;
546            self.stats.packets_sent += 1;
547            
548            // Update last send time
549            self.last_send_time = Some(Instant::now());
550        }
551        
552        packet_option
553    }
554    
555    /// Wait until either a packet is available or timeout occurs
556    ///
557    /// Returns true if a packet is available, false if timeout occurred.
558    pub async fn wait_for_packet(&self, timeout: Duration) -> bool {
559        // If we already have packets and congestion window permits, return immediately
560        if self.total_queued_packets() > 0 && self.cwnd_semaphore.available_permits() > 0 {
561            return true;
562        }
563        
564        // Wait for notification with timeout
565        let notify = self.packet_notify.clone();
566        tokio::select! {
567            _ = notify.notified() => true,
568            _ = tokio::time::sleep(timeout) => false,
569        }
570    }
571    
572    /// Process RTCP feedback packets to update congestion control
573    pub fn process_rtcp_feedback(&mut self, rtcp: &RtcpPacket) {
574        // Simplified implementation that doesn't rely on specific RTCP methods
575        match rtcp {
576            RtcpPacket::ReceiverReport(_) => {
577                // In a real implementation, we would extract RTT and loss data 
578                // from the receiver report. For this example, we'll simulate it.
579                
580                // Simulate RTT measurement
581                let rtt_ms = 50.0; // Assume 50ms RTT
582                self.update_rtt_estimate(rtt_ms);
583                
584                // Simulate packet loss information
585                self.stats.loss_rate = 0.01; // 1% loss rate
586                
587                // Update congestion window
588                self.update_congestion_window(None);
589            },
590            RtcpPacket::SenderReport(_) => {
591                // Nothing specific to do for sender reports
592            },
593            _ => {
594                // Other RTCP packet types not handled
595            },
596        }
597    }
598    
599    /// Signal a packet has been acknowledged (e.g., via RTCP)
600    pub fn acknowledge_packet(&mut self, seq: u16) {
601        if let Some(metadata) = self.packet_tracking.get_mut(&seq) {
602            metadata.acknowledged = true;
603            
604            // Update in-flight count if needed
605            if self.congestion.in_flight > 0 {
606                self.congestion.in_flight -= 1;
607                self.stats.in_flight = self.congestion.in_flight;
608            }
609            
610            // Release a congestion window permit
611            self.cwnd_semaphore.add_permits(1);
612            
613            // Adjust congestion window for successful transmission
614            if self.config.congestion_control_enabled {
615                self.update_congestion_window(Some(seq));
616            }
617        }
618    }
619    
620    /// Update RTT estimate using RFC 6298 algorithm
621    fn update_rtt_estimate(&mut self, rtt_ms: f64) {
622        if let (Some(srtt), Some(rttvar)) = (self.congestion.srtt_ms, self.congestion.rttvar_ms) {
623            // Update RTTVAR and SRTT
624            // RTTVAR = (1 - beta) * RTTVAR + beta * |SRTT - R|
625            // SRTT = (1 - alpha) * SRTT + alpha * R
626            // where alpha = 1/8 and beta = 1/4
627            
628            let alpha = 0.125;
629            let beta = 0.25;
630            
631            let new_rttvar = (1.0 - beta) * rttvar + beta * (srtt - rtt_ms).abs();
632            let new_srtt = (1.0 - alpha) * srtt + alpha * rtt_ms;
633            
634            self.congestion.rttvar_ms = Some(new_rttvar);
635            self.congestion.srtt_ms = Some(new_srtt);
636            
637            // Update RTO based on RFC 6298 formula: RTO = SRTT + 4 * RTTVAR
638            let new_rto = new_srtt + 4.0 * new_rttvar;
639            
640            // Clamp RTO to min/max values
641            let clamped_rto = (new_rto.round() as u64)
642                .max(self.config.min_rto_ms)
643                .min(self.config.max_rto_ms);
644            
645            self.congestion.rto_ms = clamped_rto;
646        } else {
647            // First RTT measurement
648            let srtt = rtt_ms;
649            let rttvar = rtt_ms / 2.0;
650            
651            self.congestion.srtt_ms = Some(srtt);
652            self.congestion.rttvar_ms = Some(rttvar);
653            
654            // Initial RTO = SRTT + 4 * RTTVAR
655            let new_rto = srtt + 4.0 * rttvar;
656            
657            // Clamp RTO to min/max values
658            let clamped_rto = (new_rto.round() as u64)
659                .max(self.config.min_rto_ms)
660                .min(self.config.max_rto_ms);
661            
662            self.congestion.rto_ms = clamped_rto;
663        }
664        
665        // Update stats
666        self.stats.srtt_ms = self.congestion.srtt_ms;
667        self.stats.rto_ms = self.congestion.rto_ms;
668    }
669    
670    /// Handle a congestion event (packet loss or timeout)
671    fn congestion_event(&mut self) {
672        let now = Instant::now();
673        
674        // Avoid reacting to multiple congestion events in a short time
675        if now.duration_since(self.congestion.last_window_reduction).as_millis() < self.congestion.rto_ms as u128 {
676            return;
677        }
678        
679        // Record the time
680        self.congestion.last_window_reduction = now;
681        
682        // Cut congestion window in half (but minimum of 2)
683        let new_cwnd = (self.congestion.cwnd / 2).max(2);
684        
685        if self.congestion.in_slow_start {
686            // Exit slow start
687            self.congestion.in_slow_start = false;
688            
689            // Set slow start threshold to half of current window
690            self.congestion.ssthresh = new_cwnd;
691        }
692        
693        // Update congestion window
694        self.congestion.cwnd = new_cwnd;
695        
696        // Recompute pacing interval
697        self.update_pacing();
698        
699        // Update semaphore (may need to invalidate permits)
700        let in_flight = self.congestion.in_flight;
701        
702        // Reset semaphore to current window size minus in-flight packets
703        let available = if in_flight < self.congestion.cwnd {
704            self.congestion.cwnd - in_flight
705        } else {
706            0
707        };
708        
709        // Reset semaphore to new permitted value
710        self.cwnd_semaphore = Arc::new(Semaphore::new(available));
711        
712        // Update stats
713        self.stats.cwnd = self.congestion.cwnd;
714        
715        debug!(
716            "Congestion event: cwnd={} -> {}, in_flight={}, loss_rate={:.2}%",
717            self.congestion.cwnd * 2,
718            self.congestion.cwnd,
719            in_flight,
720            self.stats.loss_rate * 100.0
721        );
722    }
723    
724    /// Update congestion window after successful transmission
725    fn update_congestion_window(&mut self, seq: Option<u16>) {
726        if self.congestion.in_slow_start {
727            // In slow start, grow window exponentially
728            // Increase by 1 for each ACK
729            let new_cwnd = self.congestion.cwnd + 1;
730            
731            // Check if we should exit slow start
732            if new_cwnd >= self.congestion.ssthresh {
733                self.congestion.in_slow_start = false;
734            }
735            
736            self.congestion.cwnd = new_cwnd;
737        } else {
738            // In congestion avoidance, grow window linearly
739            // Increase by 1/cwnd for each ACK, so cwnd += 1 after a full window
740            // We approximate this by adding 1 every cwnd packets
741            if let Some(ack_seq) = seq {
742                let cwnd = self.congestion.cwnd;
743                if ack_seq % (cwnd as u16) == 0 {
744                    self.congestion.cwnd += 1;
745                }
746            }
747        }
748        
749        // Update pacing
750        self.update_pacing();
751        
752        // Update stats
753        self.stats.cwnd = self.congestion.cwnd;
754    }
755    
756    /// Calculate and update pacing interval
757    fn update_pacing(&mut self) {
758        // Rough approximation: pace packets evenly across the RTT or a minimum interval
759        if let Some(srtt_ms) = self.congestion.srtt_ms {
760            // Convert to microseconds for precision
761            let srtt_us = (srtt_ms * 1000.0) as u64;
762            
763            // Minimum sensible interval (100 microseconds)
764            const MIN_INTERVAL_US: u64 = 100;
765            
766            // Calculate pacing interval: RTT / cwnd
767            let interval = if self.congestion.cwnd > 0 {
768                srtt_us / self.congestion.cwnd as u64
769            } else {
770                srtt_us
771            };
772            
773            // Use a reasonable minimum
774            self.pacing_interval_us = interval.max(MIN_INTERVAL_US);
775        } else {
776            // No RTT estimate yet, use a reasonable default
777            self.pacing_interval_us = 1000; // 1ms
778        }
779    }
780    
781    /// Apply pacing to avoid bursts
782    async fn apply_pacing(&mut self) {
783        if self.pacing_interval_us == 0 {
784            return;
785        }
786        
787        if let Some(last_send_time) = self.last_send_time {
788            let now = Instant::now();
789            let elapsed_us = now.duration_since(last_send_time).as_micros() as u64;
790            
791            if elapsed_us < self.pacing_interval_us {
792                // Need to wait for pacing
793                let wait_us = self.pacing_interval_us - elapsed_us;
794                
795                if wait_us > 100 { // Only wait if it's significant (>100µs)
796                    sleep(Duration::from_micros(wait_us)).await;
797                }
798            }
799        }
800        
801        // Update last send time
802        self.last_send_time = Some(Instant::now());
803    }
804    
805    /// Total number of packets queued across all priorities
806    fn total_queued_packets(&self) -> usize {
807        self.queues.values().map(|q| q.len()).sum()
808    }
809    
810    /// Try to drop low priority packets to make room for higher priority ones
811    ///
812    /// Returns true if packets were dropped, false otherwise.
813    fn drop_low_priority_packets(&mut self) -> bool {
814        let mut dropped = false;
815        
816        // Try low priority first
817        if let Some(queue) = self.queues.get_mut(&PacketPriority::Low) {
818            if !queue.is_empty() {
819                queue.pop_front();
820                self.stats.packets_dropped_overflow += 1;
821                self.stats.queued_packets = self.total_queued_packets();
822                dropped = true;
823                trace!("Dropped low priority packet to make room");
824                return dropped;
825            }
826        }
827        
828        // If we couldn't drop low priority, try normal priority
829        if let Some(queue) = self.queues.get_mut(&PacketPriority::Normal) {
830            if !queue.is_empty() {
831                queue.pop_front();
832                self.stats.packets_dropped_overflow += 1;
833                self.stats.queued_packets = self.total_queued_packets();
834                dropped = true;
835                trace!("Dropped normal priority packet to make room");
836                return dropped;
837            }
838        }
839        
840        // Don't drop high priority or control packets
841        trace!("No low/normal priority packets available to drop");
842        dropped
843    }
844    
845    /// Dequeue the highest priority packet
846    fn dequeue_highest_priority_packet(&mut self) -> Option<RtpPacket> {
847        // Try each queue in priority order
848        for priority in [
849            PacketPriority::Control,
850            PacketPriority::High,
851            PacketPriority::Normal,
852            PacketPriority::Low,
853        ] {
854            if let Some(queue) = self.queues.get_mut(&priority) {
855                if !queue.is_empty() {
856                    let queued_packet = queue.pop_front().unwrap();
857                    
858                    // Update stats
859                    self.stats.queued_packets = self.total_queued_packets();
860                    
861                    // Update metadata and tracking
862                    if let Some(mut metadata) = queued_packet.metadata {
863                        let now = Instant::now();
864                        let seq = queued_packet.packet.header.sequence_number;
865                        
866                        if metadata.first_send_time.is_none() {
867                            metadata.first_send_time = Some(now);
868                        }
869                        
870                        metadata.transmit_count += 1;
871                        metadata.last_send_time = Some(now);
872                        
873                        // Track the packet
874                        self.packet_tracking.insert(seq, metadata);
875                        
876                        // Update last sequence sent
877                        self.congestion.last_seq_sent = seq;
878                    }
879                    
880                    return Some(queued_packet.packet);
881                }
882            }
883        }
884        
885        None
886    }
887    
888    /// Get a packet without congestion control
889    ///
890    /// This is used when congestion control is disabled.
891    async fn get_packet_without_congestion_control(&mut self) -> Option<RtpPacket> {
892        // Get highest priority packet
893        let packet_option = self.dequeue_highest_priority_packet();
894        
895        if packet_option.is_some() {
896            // Update stats
897            self.stats.packets_sent += 1;
898            
899            // Update last send time
900            self.last_send_time = Some(Instant::now());
901        }
902        
903        packet_option
904    }
905    
906    /// Purge expired packets from the queue
907    ///
908    /// This is useful to periodically call to avoid memory leaks.
909    pub fn purge_expired_packets(&mut self) {
910        let now = Instant::now();
911        let max_age = Duration::from_millis(self.config.max_packet_age_ms);
912        
913        // Check each queue
914        for queue in self.queues.values_mut() {
915            // Remove packets that are too old
916            let mut i = 0;
917            while i < queue.len() {
918                if now.duration_since(queue[i].queue_time) > max_age {
919                    queue.remove(i);
920                    self.stats.packets_dropped_aged += 1;
921                } else {
922                    i += 1;
923                }
924            }
925        }
926        
927        // Update stats
928        self.stats.queued_packets = self.total_queued_packets();
929    }
930    
931    /// Clear the transmit buffer
932    pub fn clear(&mut self) {
933        // Clear all queues
934        for queue in self.queues.values_mut() {
935            queue.clear();
936        }
937        
938        // Clear packet tracking
939        self.packet_tracking.clear();
940        
941        // Reset stats
942        self.stats.queued_packets = 0;
943    }
944    
945    /// Get statistics for the transmit buffer
946    pub fn get_stats(&self) -> TransmitBufferStats {
947        // Calculate additional stats
948        let total_capacity = self.config.max_packets;
949        let current_queued = self.total_queued_packets();
950        let buffer_fullness = if total_capacity > 0 {
951            current_queued as f32 / total_capacity as f32
952        } else {
953            0.0
954        };
955        
956        // Update the queued_packets count
957        let mut stats = self.stats.clone();
958        stats.queued_packets = current_queued;
959        stats.buffer_fullness = buffer_fullness;
960        stats.packets_queued = current_queued;
961        stats.packets_dropped = stats.packets_dropped_overflow + stats.packets_dropped_aged;
962        
963        stats
964    }
965    
966    /// Update the configuration of the transmit buffer
967    pub fn update_config(&mut self, config: TransmitBufferConfig) {
968        // Store current values for comparison
969        let old_max_packets = self.config.max_packets;
970        let old_cwnd = self.config.initial_cwnd;
971        let old_cc_enabled = self.config.congestion_control_enabled;
972        
973        // Update the configuration
974        self.config = config;
975        
976        // If the congestion window size changed, update the semaphore
977        if old_cwnd != self.config.initial_cwnd {
978            // Only if not in active congestion control
979            if !old_cc_enabled || !self.config.congestion_control_enabled {
980                self.congestion.cwnd = self.config.initial_cwnd;
981                
982                // Reset semaphore to current window size minus in-flight packets
983                let in_flight = self.congestion.in_flight;
984                let available = if in_flight < self.congestion.cwnd {
985                    self.congestion.cwnd - in_flight
986                } else {
987                    0
988                };
989                
990                // Reset semaphore to new permitted value
991                self.cwnd_semaphore = Arc::new(Semaphore::new(available));
992                
993                // Update stats
994                self.stats.cwnd = self.congestion.cwnd;
995            }
996        }
997        
998        // Update pacing
999        self.update_pacing();
1000        
1001        debug!("Updated transmit buffer config: max_packets={}, cwnd={}, cc_enabled={}",
1002              self.config.max_packets, self.congestion.cwnd, self.config.congestion_control_enabled);
1003    }
1004    
1005    /// Set the priority threshold for a specific buffer fullness level
1006    ///
1007    /// When the buffer reaches the specified fullness level (0.0-1.0),
1008    /// only packets with priority greater than or equal to the threshold
1009    /// will be transmitted.
1010    pub fn set_priority_threshold(&mut self, buffer_fullness: f32, priority: PacketPriority) {
1011        // Store this as a configuration option
1012        debug!("Setting priority threshold: at {:.1}% fullness, only {:?} or higher priority will be sent",
1013              buffer_fullness * 100.0, priority);
1014        
1015        // We could implement more sophisticated logic here, like 
1016        // storing multiple thresholds for different fullness levels
1017        
1018        // For this simple implementation, we just note it in the log
1019        // A real implementation would check buffer fullness in get_next_packet
1020        // and only return packets above the threshold
1021    }
1022}
1023
1024#[cfg(test)]
1025mod tests {
1026    use super::*;
1027    use bytes::Bytes;
1028    use crate::packet::{RtpHeader, RtpPacket};
1029    
1030    fn create_test_packet(seq: u16, ts: u32, ssrc: RtpSsrc) -> RtpPacket {
1031        let header = RtpHeader::new(96, seq, ts, ssrc);
1032        let payload = Bytes::from_static(b"test");
1033        RtpPacket::new(header, payload)
1034    }
1035    
1036    #[tokio::test]
1037    async fn test_basic_queuing() {
1038        let config = TransmitBufferConfig::default();
1039        let mut buffer = TransmitBuffer::new(0x12345678, config);
1040        
1041        // Queue some packets
1042        buffer.queue_packet(
1043            create_test_packet(1, 0, 0x12345678),
1044            PacketPriority::Normal
1045        ).await;
1046        
1047        buffer.queue_packet(
1048            create_test_packet(2, 160, 0x12345678),
1049            PacketPriority::High
1050        ).await;
1051        
1052        // Get packets - should come in priority order
1053        let p1 = buffer.get_next_packet().await;
1054        let p2 = buffer.get_next_packet().await;
1055        
1056        assert!(p1.is_some());
1057        assert!(p2.is_some());
1058        
1059        // High priority should be first
1060        assert_eq!(p1.unwrap().header.sequence_number, 2);
1061        assert_eq!(p2.unwrap().header.sequence_number, 1);
1062    }
1063    
1064    #[tokio::test]
1065    async fn test_buffer_overflow() {
1066        let config = TransmitBufferConfig {
1067            max_packets: 2,
1068            ..Default::default()
1069        };
1070        
1071        let mut buffer = TransmitBuffer::new(0x12345678, config);
1072        
1073        // Queue up to capacity with normal priority
1074        assert!(buffer.queue_packet(
1075            create_test_packet(1, 0, 0x12345678),
1076            PacketPriority::Normal
1077        ).await, "First packet should be queued");
1078        
1079        assert!(buffer.queue_packet(
1080            create_test_packet(2, 160, 0x12345678),
1081            PacketPriority::Normal
1082        ).await, "Second packet should be queued");
1083        
1084        // Verify buffer state
1085        let stats = buffer.get_stats();
1086        assert_eq!(stats.queued_packets, 2, "Buffer should have 2 packets");
1087        
1088        // Third normal priority packet should be rejected as we're at capacity
1089        assert!(!buffer.queue_packet(
1090            create_test_packet(3, 320, 0x12345678),
1091            PacketPriority::Normal
1092        ).await, "Third normal packet should be rejected");
1093        
1094        // High priority packet should be accepted by dropping a normal one
1095        assert!(buffer.queue_packet(
1096            create_test_packet(3, 320, 0x12345678),
1097            PacketPriority::High
1098        ).await, "High priority packet should be accepted");
1099        
1100        // Verify we still have max 2 packets
1101        let stats = buffer.get_stats();
1102        assert_eq!(stats.queued_packets, 2, "Buffer should still have 2 packets");
1103        
1104        // We should now have packets 2 and 3 (high priority)
1105        let p1 = buffer.get_next_packet().await;
1106        let p2 = buffer.get_next_packet().await;
1107        let p3 = buffer.get_next_packet().await;
1108        
1109        assert!(p1.is_some(), "First packet (high priority) should be available");
1110        assert!(p2.is_some(), "Second packet should be available");
1111        assert!(p3.is_none(), "Buffer should be empty after 2 packets");
1112        
1113        // High priority should be first
1114        assert_eq!(p1.unwrap().header.sequence_number, 3, "First packet should be the high priority one");
1115        assert_eq!(p2.unwrap().header.sequence_number, 2, "Second packet should be the remaining normal one");
1116    }
1117    
1118    #[tokio::test]
1119    async fn test_congestion_control() {
1120        let config = TransmitBufferConfig {
1121            initial_cwnd: 2,              // Small window for testing 
1122            congestion_control_enabled: true,
1123            ..Default::default()
1124        };
1125        
1126        let mut buffer = TransmitBuffer::new(0x12345678, config);
1127        
1128        // Queue several packets
1129        for i in 1..=5 {
1130            assert!(buffer.queue_packet(
1131                create_test_packet(i, (i as u32) * 160, 0x12345678),
1132                PacketPriority::Normal
1133            ).await, "Packet {} should be queued", i);
1134        }
1135        
1136        // Verify all 5 packets are queued
1137        let stats = buffer.get_stats();
1138        assert_eq!(stats.queued_packets, 5, "Buffer should have 5 queued packets");
1139        
1140        // But congestion window should only allow sending 2
1141        let p1 = buffer.get_next_packet().await;
1142        assert!(p1.is_some(), "First packet should be available");
1143        
1144        let p2 = buffer.get_next_packet().await;
1145        assert!(p2.is_some(), "Second packet should be available");
1146        
1147        // Verify in-flight count
1148        let stats = buffer.get_stats();
1149        assert_eq!(stats.in_flight, 2, "Should have 2 packets in flight");
1150        
1151        // Third packet should be blocked by congestion window
1152        let p3 = buffer.get_next_packet().await;
1153        assert!(p3.is_none(), "Third packet should be blocked by congestion window");
1154        
1155        // Acknowledge one packet to open a window slot
1156        buffer.acknowledge_packet(1);
1157        
1158        // Verify in-flight count decreased
1159        let stats = buffer.get_stats();
1160        assert_eq!(stats.in_flight, 1, "Should have 1 packet in flight after ACK");
1161        
1162        // Now we should be able to get another packet
1163        let p3 = buffer.get_next_packet().await;
1164        assert!(p3.is_some(), "Third packet should be available after ACK");
1165        assert_eq!(p3.unwrap().header.sequence_number, 3, "Third packet should have seq=3");
1166    }
1167}