rvoip_rtp_core/session/
mod.rs

1//! RTP Session Management
2//!
3//! This module provides functionality for managing RTP sessions, including
4//! configuration, packet sending/receiving, and jitter buffer management.
5
6mod stream;
7mod scheduling;
8
9pub use stream::{RtpStream, RtpStreamStats};
10pub use scheduling::{RtpScheduler, RtpSchedulerStats};
11
12use bytes::Bytes;
13use rand::Rng;
14use std::collections::HashMap;
15use std::net::SocketAddr;
16use std::sync::{Arc, Mutex};
17use std::time::{Duration, Instant};
18use tokio::net::UdpSocket;
19use tokio::sync::{mpsc, broadcast};
20use tokio::task::JoinHandle;
21use tracing::{error, warn, debug, trace, info};
22
23use crate::error::Error;
24use crate::packet::{RtpHeader, RtpPacket};
25use crate::transport::{RtpTransport, RtpTransportConfig, UdpRtpTransport};
26use crate::{Result, RtpSequenceNumber, RtpSsrc, RtpTimestamp, DEFAULT_MAX_PACKET_SIZE};
27
28// Define the constant locally since it's not publicly exported
29const RTP_MIN_HEADER_SIZE: usize = 12;
30
31/// Stats for an RTP session
32#[derive(Debug, Clone, Default)]
33pub struct RtpSessionStats {
34    /// Total packets sent
35    pub packets_sent: u64,
36    
37    /// Total packets received
38    pub packets_received: u64,
39    
40    /// Total bytes sent
41    pub bytes_sent: u64,
42    
43    /// Total bytes received
44    pub bytes_received: u64,
45    
46    /// Packets lost (based on sequence numbers)
47    pub packets_lost: u64,
48    
49    /// Duplicate packets received
50    pub packets_duplicated: u64,
51    
52    /// Out-of-order packets received
53    pub packets_out_of_order: u64,
54    
55    /// Packets discarded by jitter buffer (too old)
56    pub packets_discarded_by_jitter: u64,
57    
58    /// Current jitter estimate (in milliseconds)
59    pub jitter_ms: f64,
60    
61    /// Remote address of the most recent packet
62    pub remote_addr: Option<SocketAddr>,
63}
64
65/// RTP session configuration options
66#[derive(Debug, Clone)]
67pub struct RtpSessionConfig {
68    /// Local address to bind to
69    pub local_addr: SocketAddr,
70    
71    /// Remote address to send packets to
72    pub remote_addr: Option<SocketAddr>,
73    
74    /// SSRC to use for sending packets
75    pub ssrc: Option<RtpSsrc>,
76    
77    /// Payload type
78    pub payload_type: u8,
79    
80    /// Clock rate for the payload type (needed for jitter buffer)
81    pub clock_rate: u32,
82    
83    /// Jitter buffer size in packets
84    pub jitter_buffer_size: Option<usize>,
85    
86    /// Maximum packet age in the jitter buffer (ms)
87    pub max_packet_age_ms: Option<u32>,
88    
89    /// Enable jitter buffer
90    pub enable_jitter_buffer: bool,
91}
92
93impl Default for RtpSessionConfig {
94    fn default() -> Self {
95        Self {
96            local_addr: "0.0.0.0:0".parse().unwrap(),
97            remote_addr: None,
98            ssrc: None,
99            payload_type: 0,
100            clock_rate: 8000, // Default for most audio codecs (8kHz)
101            jitter_buffer_size: Some(50),
102            max_packet_age_ms: Some(200),
103            enable_jitter_buffer: true,
104        }
105    }
106}
107
108/// Events emitted by the RTP session
109#[derive(Debug, Clone)]
110pub enum RtpSessionEvent {
111    /// New packet received
112    PacketReceived(RtpPacket),
113    
114    /// Error in the session
115    Error(Error),
116    
117    /// BYE RTCP packet received (a party is leaving the session)
118    Bye {
119        /// SSRC of the source that sent the BYE
120        ssrc: RtpSsrc,
121        
122        /// Optional reason text
123        reason: Option<String>,
124    },
125    
126    /// New stream detected with a specific SSRC
127    /// This event is emitted as soon as the first packet for a new SSRC is received,
128    /// even if the packet is being held in a jitter buffer.
129    NewStreamDetected {
130        /// SSRC of the new stream
131        ssrc: RtpSsrc,
132    },
133    
134    /// RTCP Sender Report received
135    RtcpSenderReport {
136        /// SSRC of the sender
137        ssrc: RtpSsrc,
138        
139        /// NTP timestamp
140        ntp_timestamp: crate::packet::rtcp::NtpTimestamp,
141        
142        /// RTP timestamp
143        rtp_timestamp: RtpTimestamp,
144        
145        /// Packet count
146        packet_count: u32,
147        
148        /// Octet count
149        octet_count: u32,
150        
151        /// Report blocks
152        report_blocks: Vec<crate::packet::rtcp::RtcpReportBlock>,
153    },
154    
155    /// RTCP Receiver Report received
156    RtcpReceiverReport {
157        /// SSRC of the receiver
158        ssrc: RtpSsrc,
159        
160        /// Report blocks
161        report_blocks: Vec<crate::packet::rtcp::RtcpReportBlock>,
162    },
163}
164
165/// RTP session for sending and receiving RTP packets
166///
167/// This class manages an RTP session, including sending and receiving packets,
168/// jitter buffer management, and demultiplexing of multiple streams.
169///
170/// # SSRC Demultiplexing
171///
172/// An RTP session can receive packets from multiple sources, each identified by
173/// a unique Synchronization Source identifier (SSRC). This implementation
174/// automatically demultiplexes incoming packets based on their SSRC:
175///
176/// 1. When a packet arrives, its SSRC is extracted
177/// 2. If this is the first packet from this SSRC, a new stream is created
178/// 3. The packet is processed by the appropriate stream, which handles:
179///    - Sequence number tracking
180///    - Jitter calculation
181///    - Duplicate detection
182///    - Packet reordering (via jitter buffer)
183///
184/// Each stream maintains its own statistics and state. You can access information
185/// about individual streams using the `get_stream()`, `get_all_streams()`, and
186/// `stream_count()` methods.
187///
188/// This approach aligns with RFC 3550 Section 8.2, which describes how to handle
189/// multiple sources in a single RTP session.
190pub struct RtpSession {
191    /// Session configuration
192    config: RtpSessionConfig,
193    
194    /// SSRC for this session
195    ssrc: RtpSsrc,
196    
197    /// Transport for sending/receiving packets
198    transport: Arc<dyn RtpTransport>,
199    
200    /// Map of received streams by SSRC
201    streams: Arc<Mutex<HashMap<RtpSsrc, RtpStream>>>,
202    
203    /// Packet scheduler for sending packets
204    scheduler: Option<RtpScheduler>,
205    
206    /// Channel for receiving packets
207    receiver: mpsc::Receiver<RtpPacket>,
208    
209    /// Channel for sending packets
210    sender: mpsc::Sender<RtpPacket>,
211    
212    /// Event broadcaster
213    event_tx: broadcast::Sender<RtpSessionEvent>,
214    
215    /// Receiving task handle
216    recv_task: Option<JoinHandle<()>>,
217    
218    /// Sending task handle
219    send_task: Option<JoinHandle<()>>,
220    
221    /// Session statistics
222    stats: Arc<Mutex<RtpSessionStats>>,
223    
224    /// Media synchronization context
225    media_sync: Option<Arc<std::sync::RwLock<crate::sync::MediaSync>>>,
226    
227    /// Whether the session is active
228    active: bool,
229    
230    /// RTCP report generator
231    rtcp_generator: Option<crate::stats::reports::RtcpReportGenerator>,
232    
233    /// RTCP sender task
234    rtcp_task: Option<JoinHandle<()>>,
235    
236    /// Session bandwidth (bits per second)
237    bandwidth_bps: u32,
238}
239
240impl RtpSession {
241    /// Create a new RTP session
242    pub async fn new(config: RtpSessionConfig) -> Result<Self> {
243        // Generate SSRC if not provided
244        let ssrc = config.ssrc.unwrap_or_else(|| {
245            let mut rng = rand::thread_rng();
246            rng.gen::<u32>()
247        });
248        
249        // Create transport config - respect provided ports!
250        let transport_config = RtpTransportConfig {
251            local_rtp_addr: config.local_addr,
252            local_rtcp_addr: None, // RTCP on same port for now
253            symmetric_rtp: true,
254            rtcp_mux: true, // Enable RTCP multiplexing by default
255            session_id: Some(format!("rtp-session-{}", ssrc)),
256            // Don't allocate a new port - use the one provided in config
257            use_port_allocator: false,
258        };
259        
260        // Create UDP transport
261        let transport = Arc::new(UdpRtpTransport::new(transport_config).await?);
262        
263        // Create channels for internal communication
264        // Increased capacity to handle longer sessions without dropping packets
265        let (sender_tx, sender_rx) = mpsc::channel(1000);
266        let (receiver_tx, receiver_rx) = mpsc::channel(1000);
267        let (event_tx, _) = broadcast::channel(1000);
268        
269        // Create scheduler if needed
270        let scheduler = Some(RtpScheduler::new(
271            config.clock_rate,
272            rand::thread_rng().gen::<u16>(), // Random starting sequence
273            rand::thread_rng().gen::<u32>(), // Random starting timestamp
274        ));
275        
276        // Create RTCP report generator
277        let hostname = hostname::get().unwrap_or_else(|_| "unknown".into());
278        let hostname_str = hostname.to_string_lossy();
279        let cname = format!("{}@{}", std::env::var("USER").unwrap_or_else(|_| "user".to_string()), hostname_str);
280        let rtcp_generator = crate::stats::reports::RtcpReportGenerator::new(ssrc, cname);
281        
282        let mut session = Self {
283            config,
284            ssrc,
285            transport,
286            streams: Arc::new(Mutex::new(HashMap::new())),
287            scheduler,
288            receiver: receiver_rx,
289            sender: sender_tx,
290            event_tx,
291            recv_task: None,
292            send_task: None,
293            stats: Arc::new(Mutex::new(RtpSessionStats::default())),
294            media_sync: None,
295            active: false,
296            rtcp_generator: Some(rtcp_generator),
297            rtcp_task: None,
298            bandwidth_bps: 64000, // Default bandwidth: 64 kbps
299        };
300        
301        // Start the session
302        session.start(sender_rx, receiver_tx).await?;
303        
304        Ok(session)
305    }
306    
307    /// Start the session tasks
308    async fn start(
309        &mut self,
310        mut sender_rx: mpsc::Receiver<RtpPacket>,
311        receiver_tx: mpsc::Sender<RtpPacket>,
312    ) -> Result<()> {
313        if self.active {
314            return Ok(());
315        }
316        
317        let transport = self.transport.clone();
318        let stats_send = self.stats.clone();
319        let stats_recv = self.stats.clone();
320        let remote_addr = self.config.remote_addr;
321        let event_tx_send = self.event_tx.clone();
322        let event_tx_recv = self.event_tx.clone();
323        let clock_rate = self.config.clock_rate;
324        let payload_type = self.config.payload_type;
325        let ssrc = self.ssrc;
326        let streams_map = self.streams.clone();
327        let jitter_buffer_enabled = self.config.enable_jitter_buffer;
328        let jitter_size = self.config.jitter_buffer_size.unwrap_or(50);
329        let max_age_ms = self.config.max_packet_age_ms.unwrap_or(200);
330        
331        let media_sync = self.media_sync.clone();
332        
333        // If we have a remote address, set it on the transport
334        if let Some(addr) = remote_addr {
335            // Set the remote RTP address on the UDP transport
336            if let Some(t) = transport.as_any().downcast_ref::<UdpRtpTransport>() {
337                t.set_remote_rtp_addr(addr).await;
338            }
339        }
340        
341        // Start the scheduler if available
342        if let Some(scheduler) = &mut self.scheduler {
343            let sender_tx = self.sender.clone();
344            scheduler.set_sender(sender_tx);
345            
346            // Set appropriate timestamp increment based on packet interval
347            let interval_ms = 20; // Default 20ms packet interval
348            let samples_per_packet = (clock_rate as f64 * (interval_ms as f64 / 1000.0)) as u32;
349            scheduler.set_interval(interval_ms, samples_per_packet);
350            
351            scheduler.start()?;
352        }
353        
354        // Start sending task
355        let send_transport = transport.clone();
356        let send_task = tokio::spawn(async move {
357            let mut last_remote_addr = remote_addr;
358            
359            while let Some(packet) = sender_rx.recv().await {
360                // Always try to get the current remote address from transport first
361                let dest = if let Some(t) = send_transport.as_any().downcast_ref::<UdpRtpTransport>() {
362                    // Check transport for current remote address
363                    match t.remote_rtp_addr().await {
364                        Some(addr) => {
365                            // Update our cached value
366                            last_remote_addr = Some(addr);
367                            addr
368                        }
369                        None => {
370                            // Fall back to cached value if transport doesn't have one
371                            if let Some(addr) = last_remote_addr {
372                                addr
373                            } else {
374                                // No destination address, can't send
375                                warn!("No destination address for RTP packet, dropping");
376                                continue;
377                            }
378                        }
379                    }
380                } else {
381                    // Not a UDP transport, use cached value
382                    if let Some(addr) = last_remote_addr {
383                        addr
384                    } else {
385                        // No destination address, can't send
386                        warn!("No destination address for RTP packet, dropping");
387                        continue;
388                    }
389                };
390                
391                // Send the packet
392                debug!("Sending RTP packet to {} (seq={}, timestamp={})", 
393                       dest, packet.header.sequence_number, packet.header.timestamp);
394                       
395                if let Err(e) = send_transport.send_rtp(&packet, dest).await {
396                    error!("Failed to send RTP packet: {}", e);
397                    
398                    // Broadcast error event
399                    let _ = event_tx_send.send(RtpSessionEvent::Error(e));
400                    continue;
401                }
402                
403                debug!("Successfully sent RTP packet to {}", dest);
404                
405                // Update stats
406                if let Ok(mut session_stats) = stats_send.lock() {
407                    session_stats.packets_sent += 1;
408                    session_stats.bytes_sent += packet.size() as u64;
409                }
410            }
411        });
412        
413        // Start receiving task
414        let recv_transport = transport.clone();
415        
416        // Subscribe to transport events to handle RTCP packets
417        let mut transport_events = recv_transport.subscribe();
418        
419        let recv_task = tokio::spawn(async move {
420            // IMPORTANT: Only handle events from transport, no direct packet reception
421            // to avoid race conditions where two tasks read from the same socket
422            loop {
423                match transport_events.recv().await {
424                    Ok(crate::traits::RtpEvent::RtcpReceived { data, source }) => {
425                        // Try to parse the RTCP packet
426                        if let Ok(rtcp_packet) = crate::packet::rtcp::RtcpPacket::parse(&data) {
427                            // Handle the RTCP packet based on its type
428                            match rtcp_packet {
429                                crate::packet::rtcp::RtcpPacket::Goodbye(bye) => {
430                                    // Extract the SSRC and reason
431                                    if !bye.sources.is_empty() {
432                                        let source_ssrc = bye.sources[0];
433                                        
434                                        // Broadcast BYE event
435                                        let _ = event_tx_recv.send(RtpSessionEvent::Bye {
436                                            ssrc: source_ssrc,
437                                            reason: bye.reason,
438                                        });
439                                        
440                                        info!("Received RTCP BYE from SSRC={:08x}", source_ssrc);
441                                    }
442                                },
443                                crate::packet::rtcp::RtcpPacket::SenderReport(sr) => {
444                                    // Process sender report
445                                    let report_ssrc = sr.ssrc;
446                                    
447                                    debug!("Received RTCP SR from SSRC={:08x}", report_ssrc);
448                                    
449                                    // Update stream statistics if this stream exists
450                                    if let Ok(mut streams) = streams_map.lock() {
451                                        if let Some(stream) = streams.get_mut(&report_ssrc) {
452                                            // Update the stream's RTCP SR info
453                                            // This will be used for calculating round-trip time
454                                            stream.update_last_sr_info(
455                                                sr.ntp_timestamp.to_u32(),
456                                                std::time::Instant::now(),
457                                            );
458                                            
459                                            debug!("Updated RTCP SR info for stream SSRC={:08x}", report_ssrc);
460                                        }
461                                    }
462                                    
463                                    // If media sync is enabled, update it
464                                    if let Some(sync) = &media_sync {
465                                        if let Ok(mut media_sync) = sync.write() {
466                                            // Update synchronization data
467                                            media_sync.update_from_sr(report_ssrc, sr.ntp_timestamp, sr.rtp_timestamp);
468                                        }
469                                    }
470                                    
471                                    // Emit SR event for external processing
472                                    let _ = event_tx_recv.send(RtpSessionEvent::RtcpSenderReport { 
473                                        ssrc: report_ssrc,
474                                        ntp_timestamp: sr.ntp_timestamp,
475                                        rtp_timestamp: sr.rtp_timestamp,
476                                        packet_count: sr.sender_packet_count,
477                                        octet_count: sr.sender_octet_count,
478                                        report_blocks: sr.report_blocks,
479                                    });
480                                },
481                                crate::packet::rtcp::RtcpPacket::ReceiverReport(rr) => {
482                                    // Process receiver report
483                                    let report_ssrc = rr.ssrc;
484                                    
485                                    debug!("Received RTCP RR from SSRC={:08x} with {} report blocks", 
486                                          report_ssrc, rr.report_blocks.len());
487                                    
488                                    // If there's a report block about our SSRC, process it
489                                    for block in &rr.report_blocks {
490                                        if block.ssrc == ssrc {
491                                            debug!("Processing report block about our SSRC={:08x}", ssrc);
492                                            
493                                            // Update session stats with packet loss info
494                                            if let Ok(mut stats) = stats_recv.lock() {
495                                                stats.packets_lost = block.cumulative_lost as u64;
496                                                
497                                                // Calculate packet loss percentage
498                                                let fraction_lost = block.fraction_lost as f64 / 256.0;
499                                                debug!("Packet loss: {}% (fraction={})", 
500                                                       fraction_lost * 100.0, block.fraction_lost);
501                                            }
502                                        }
503                                    }
504                                    
505                                    // Emit RR event for external processing
506                                    let _ = event_tx_recv.send(RtpSessionEvent::RtcpReceiverReport { 
507                                        ssrc: report_ssrc,
508                                        report_blocks: rr.report_blocks,
509                                    });
510                                },
511                                // Handle other RTCP packet types as needed
512                                _ => {
513                                    // For now, we're just logging other packet types
514                                    trace!("Received RTCP packet: {:?}", rtcp_packet);
515                                }
516                            }
517                        } else {
518                            warn!("Failed to parse RTCP packet");
519                        }
520                    }
521                    Ok(crate::traits::RtpEvent::MediaReceived { payload_type, timestamp, payload, source, ssrc: ssrc_from_event, marker, .. }) => {
522                        // Handle RTP packets received via transport events
523                        // This is the ONLY path for RTP packets to avoid race conditions
524                        
525                        // Reconstruct minimal RTP header for processing
526                        let header = RtpHeader {
527                            version: 2,
528                            padding: false,
529                            extension: false,
530                            cc: 0,
531                            marker,
532                            payload_type,
533                            sequence_number: 0, // Will be set by stream tracking
534                            timestamp,
535                            ssrc,
536                            csrc: vec![],
537                            extensions: None,
538                        };
539                        
540                        let packet = RtpPacket {
541                            header,
542                            payload: payload.clone(),
543                        };
544                        
545                        // Update stats
546                        if let Ok(mut session_stats) = stats_recv.lock() {
547                            session_stats.packets_received += 1;
548                            session_stats.bytes_received += payload.len() as u64 + 12; // payload + header
549                            session_stats.remote_addr = Some(source);
550                        }
551                        
552                        // Use the SSRC from the event
553                        let packet_ssrc = ssrc_from_event;
554                        
555                        // Get or create the stream for this SSRC
556                        let (is_new_stream, output_packet) = {
557                            let mut streams = match streams_map.lock() {
558                                Ok(streams) => streams,
559                                Err(e) => {
560                                    error!("Failed to lock streams map: {}", e);
561                                    continue;
562                                }
563                            };
564                            
565                            let is_new = !streams.contains_key(&packet_ssrc);
566                            let stream = streams.entry(packet_ssrc).or_insert_with(|| {
567                                info!("New RTP stream detected with SSRC={:08x}", packet_ssrc);
568                                RtpStream::new(packet_ssrc, clock_rate)
569                            });
570                            
571                            // For simplified processing without full RTP header,
572                            // just forward the packet immediately
573                            (is_new, Some(packet.clone()))
574                        };
575                        
576                        // If this is a new stream, emit the NewStreamDetected event
577                        if is_new_stream {
578                            let _ = event_tx_recv.send(RtpSessionEvent::NewStreamDetected {
579                                ssrc: packet_ssrc,
580                            });
581                        }
582                        
583                        // Forward the packet
584                        if let Some(output) = output_packet {
585                            if let Err(e) = receiver_tx.send(output.clone()).await {
586                                error!("Failed to forward RTP packet to receiver: {}", e);
587                            }
588                            
589                            // Broadcast packet received event
590                            let _ = event_tx_recv.send(RtpSessionEvent::PacketReceived(output));
591                        }
592                    }
593                    Ok(crate::traits::RtpEvent::Error(e)) => {
594                        error!("Transport error: {}", e);
595                        let _ = event_tx_recv.send(RtpSessionEvent::Error(e));
596                    }
597                    Err(e) => {
598                        debug!("Transport event channel error: {}", e);
599                    }
600                }
601            }
602        });
603        
604        // Start RTCP sending task if we have a remote address and report generator
605        if let (Some(remote_addr), Some(mut rtcp_generator)) = (self.config.remote_addr, self.rtcp_generator.take()) {
606            let transport = self.transport.clone();
607            let ssrc = self.ssrc;
608            let event_tx = self.event_tx.clone();
609            let stats = self.stats.clone();
610            let active_state = Arc::new(tokio::sync::Mutex::new(true));
611            let active_state_clone = active_state.clone();
612            let bandwidth = self.bandwidth_bps;
613            
614            // Set bandwidth in the generator
615            rtcp_generator.set_bandwidth(bandwidth);
616            
617            // Start the RTCP task
618            let rtcp_task = tokio::spawn(async move {
619                debug!("RTCP scheduling task started");
620                
621                // Initial interval calculation
622                let mut interval = rtcp_generator.calculate_interval();
623                debug!("Initial RTCP interval: {:?}", interval);
624                
625                while *active_state.lock().await {
626                    // Wait for the calculated interval
627                    tokio::time::sleep(interval).await;
628                    
629                    // Check if we should continue
630                    if !*active_state.lock().await {
631                        break;
632                    }
633                    
634                    // Update RTP statistics before sending the report
635                    if let Ok(session_stats) = stats.lock() {
636                        rtcp_generator.update_sent_stats(
637                            session_stats.packets_sent as u32,
638                            session_stats.bytes_sent as u32
639                        );
640                        
641                        // Log the current stats for debugging
642                        debug!("Current stats for RTCP report: packets={}, bytes={}", 
643                               session_stats.packets_sent, session_stats.bytes_sent);
644                    }
645                    
646                    // Send an RTCP report regardless of should_send_report logic for this example
647                    // We'll send a compound packet with SR and SDES
648                    debug!("Sending RTCP report");
649                    
650                    // Generate sender report
651                    let rtp_timestamp = std::time::SystemTime::now()
652                        .duration_since(std::time::UNIX_EPOCH)
653                        .unwrap_or_default()
654                        .as_millis() as u32;
655                        
656                    let sr = rtcp_generator.generate_sender_report(rtp_timestamp);
657                    let sdes = rtcp_generator.generate_sdes();
658                    
659                    // Create compound packet
660                    let mut compound = crate::packet::rtcp::RtcpCompoundPacket::new_with_sr(sr);
661                    compound.add_sdes(sdes);
662                    
663                    // Send the compound packet
664                    if let Ok(data) = compound.serialize() {
665                        if let Err(e) = transport.send_rtcp_bytes(&data, remote_addr).await {
666                            warn!("Failed to send RTCP compound packet: {}", e);
667                        } else {
668                            info!("Sent RTCP compound packet of {} bytes", data.len());
669                            
670                            // Emit SR event
671                            if let Some(sr) = compound.get_sr() {
672                                let _ = event_tx.send(RtpSessionEvent::RtcpSenderReport {
673                                    ssrc,
674                                    ntp_timestamp: sr.ntp_timestamp,
675                                    rtp_timestamp: sr.rtp_timestamp,
676                                    packet_count: sr.sender_packet_count,
677                                    octet_count: sr.sender_octet_count,
678                                    report_blocks: sr.report_blocks.clone(),
679                                });
680                            }
681                        }
682                    }
683                    
684                    // Recalculate interval for next report
685                    interval = rtcp_generator.calculate_interval();
686                    debug!("Next RTCP report in {:?}", interval);
687                }
688                
689                debug!("RTCP scheduling task ended");
690            });
691            
692            self.rtcp_task = Some(rtcp_task);
693        }
694        
695        self.recv_task = Some(recv_task);
696        self.send_task = Some(send_task);
697        self.active = true;
698        
699        info!("Started RTP session with SSRC={:08x}", ssrc);
700        Ok(())
701    }
702    
703    /// Send an RTP packet with payload
704    pub async fn send_packet(&mut self, timestamp: RtpTimestamp, payload: Bytes, marker: bool) -> Result<()> {
705        // Create RTP header
706        let mut header = RtpHeader::new(
707            self.config.payload_type,
708            0, // Sequence number will be set by scheduler
709            timestamp,
710            self.ssrc,
711        );
712        
713        // Set marker bit if needed
714        header.marker = marker;
715        
716        // Create packet
717        let packet = RtpPacket::new(header, payload);
718        
719        // If using scheduler, schedule the packet
720        if let Some(scheduler) = &mut self.scheduler {
721            scheduler.schedule_packet(packet)
722        } else {
723            // Otherwise send directly
724            self.sender.send(packet)
725                .await
726                .map_err(|_| Error::SessionError("Failed to send packet".to_string()))
727        }
728    }
729    
730    /// Receive an RTP packet
731    pub async fn receive_packet(&mut self) -> Result<RtpPacket> {
732        self.receiver.recv()
733            .await
734            .ok_or_else(|| Error::SessionError("Receiver channel closed".to_string()))
735    }
736    
737    /// Get the session statistics
738    pub fn get_stats(&self) -> RtpSessionStats {
739        if let Ok(stats) = self.stats.lock() {
740            stats.clone()
741        } else {
742            RtpSessionStats::default()
743        }
744    }
745    
746    /// Set the remote address
747    pub async fn set_remote_addr(&mut self, addr: SocketAddr) {
748        self.config.remote_addr = Some(addr);
749        
750        // Update stats with remote address
751        if let Ok(mut stats) = self.stats.lock() {
752            stats.remote_addr = Some(addr);
753        }
754        
755        // Update the transport's remote address
756        if let Some(t) = self.transport.as_any().downcast_ref::<UdpRtpTransport>() {
757            t.set_remote_rtp_addr(addr).await;
758        }
759    }
760    
761    /// Get the local address
762    pub fn local_addr(&self) -> Result<SocketAddr> {
763        self.transport.local_rtp_addr()
764    }
765    
766    /// Get the transport
767    pub fn transport(&self) -> Arc<dyn RtpTransport> {
768        self.transport.clone()
769    }
770    
771    /// Close the session and clean up resources
772    pub async fn close(&mut self) -> Result<()> {
773        // Send BYE packet if we have a remote address
774        if let Some(remote_addr) = self.config.remote_addr {
775            // Create BYE packet
776            let bye = crate::packet::rtcp::RtcpGoodbye::new_with_reason(
777                self.ssrc,
778                "Session closed".to_string(),
779            );
780            
781            // Create RTCP packet
782            let rtcp_packet = crate::packet::rtcp::RtcpPacket::Goodbye(bye);
783            
784            // Serialize and send
785            match rtcp_packet.serialize() {
786                Ok(data) => {
787                    // Send using transport (through RTCP port if available)
788                    if let Err(e) = self.transport.send_rtcp_bytes(&data, remote_addr).await {
789                        warn!("Failed to send RTCP BYE: {}", e);
790                    }
791                }
792                Err(e) => {
793                    warn!("Failed to serialize RTCP BYE: {}", e);
794                }
795            }
796        }
797        
798        // Stop the scheduler if running
799        if let Some(scheduler) = &mut self.scheduler {
800            scheduler.stop().await;
801        }
802        
803        // Stop the receive task
804        if let Some(handle) = self.recv_task.take() {
805            handle.abort();
806        }
807        
808        // Stop the send task
809        if let Some(handle) = self.send_task.take() {
810            handle.abort();
811        }
812        
813        // Stop the RTCP task
814        if let Some(handle) = self.rtcp_task.take() {
815            handle.abort();
816        }
817        
818        // Close the transport
819        let _ = self.transport.close().await;
820        
821        self.active = false;
822        info!("Closed RTP session with SSRC={:08x}", self.ssrc);
823        
824        Ok(())
825    }
826    
827    /// Get the current timestamp
828    pub fn get_timestamp(&self) -> RtpTimestamp {
829        if let Some(scheduler) = &self.scheduler {
830            scheduler.get_timestamp()
831        } else {
832            // Generate based on uptime if no scheduler
833            let now = std::time::SystemTime::now();
834            let since_epoch = now.duration_since(std::time::UNIX_EPOCH)
835                .unwrap_or_else(|_| Duration::from_secs(0));
836            
837            let secs = since_epoch.as_secs();
838            let nanos = since_epoch.subsec_nanos();
839            
840            // Convert to timestamp units (samples)
841            let timestamp_secs = secs * (self.config.clock_rate as u64);
842            let timestamp_fraction = ((nanos as u64) * (self.config.clock_rate as u64)) / 1_000_000_000;
843            
844            (timestamp_secs + timestamp_fraction) as u32
845        }
846    }
847    
848    /// Get the SSRC of this session
849    pub fn get_ssrc(&self) -> RtpSsrc {
850        self.ssrc
851    }
852    
853    /// Subscribe to session events
854    pub fn subscribe(&self) -> broadcast::Receiver<RtpSessionEvent> {
855        self.event_tx.subscribe()
856    }
857    
858    /// Get the current payload type
859    pub fn get_payload_type(&self) -> u8 {
860        self.config.payload_type
861    }
862    
863    /// Set the payload type
864    pub fn set_payload_type(&mut self, payload_type: u8) {
865        self.config.payload_type = payload_type;
866    }
867    
868    /// Get a stream by SSRC, if it exists
869    pub async fn get_stream(&self, ssrc: RtpSsrc) -> Option<RtpStreamStats> {
870        let streams = match self.streams.lock() {
871            Ok(streams) => streams,
872            Err(_) => return None,
873        };
874        
875        streams.get(&ssrc).map(|stream| stream.get_stats())
876    }
877    
878    /// Get a list of all current streams
879    pub async fn get_all_streams(&self) -> Vec<RtpStreamStats> {
880        let streams = match self.streams.lock() {
881            Ok(streams) => streams,
882            Err(_) => return Vec::new(),
883        };
884        
885        streams.values().map(|stream| stream.get_stats()).collect()
886    }
887    
888    /// Get the number of active streams
889    pub async fn stream_count(&self) -> usize {
890        let streams = match self.streams.lock() {
891            Ok(streams) => streams,
892            Err(_) => return 0,
893        };
894        
895        streams.len()
896    }
897    
898    /// Get a list of all SSRCs known to this session
899    /// 
900    /// This returns all SSRCs that have been seen, even if their streams
901    /// haven't released any packets from their jitter buffers yet.
902    pub async fn get_all_ssrcs(&self) -> Vec<RtpSsrc> {
903        if let Ok(streams) = self.streams.lock() {
904            streams.keys().copied().collect()
905        } else {
906            Vec::new()
907        }
908    }
909    
910    /// Force creation of a stream for a specific SSRC
911    /// 
912    /// This is useful when we want to ensure a stream exists for an SSRC
913    /// even if no packets have been received yet.
914    pub async fn create_stream_for_ssrc(&mut self, ssrc: RtpSsrc) -> bool {
915        let mut streams = match self.streams.lock() {
916            Ok(streams) => streams,
917            Err(e) => {
918                error!("Failed to lock streams map: {}", e);
919                return false;
920            }
921        };
922        
923        // Check if this SSRC already exists
924        if streams.contains_key(&ssrc) {
925            debug!("Stream for SSRC={:08x} already exists", ssrc);
926            return false;
927        }
928        
929        // Create the stream
930        info!("Manually creating new RTP stream for SSRC={:08x}", ssrc);
931        let stream = if self.config.enable_jitter_buffer {
932            debug!("Creating stream with jitter buffer for SSRC={:08x}", ssrc);
933            RtpStream::with_jitter_buffer(
934                ssrc, 
935                self.config.clock_rate, 
936                self.config.jitter_buffer_size.unwrap_or(50), 
937                self.config.max_packet_age_ms.unwrap_or(200) as u64
938            )
939        } else {
940            debug!("Creating stream without jitter buffer for SSRC={:08x}", ssrc);
941            RtpStream::new(ssrc, self.config.clock_rate)
942        };
943        
944        // Add the stream
945        streams.insert(ssrc, stream);
946        
947        // Emit the new stream event
948        debug!("Emitting NewStreamDetected event for SSRC={:08x}", ssrc);
949        let _ = self.event_tx.send(RtpSessionEvent::NewStreamDetected {
950            ssrc,
951        });
952        
953        true
954    }
955    
956    /// Send an RTCP BYE packet to notify that we're leaving the session
957    /// 
958    /// This can be used to notify other participants that we're leaving the session
959    /// without closing the entire RtpSession. The BYE packet includes our SSRC and 
960    /// an optional reason string.
961    /// 
962    /// Returns an error if serialization fails or if there's no remote address configured.
963    pub async fn send_bye(&self, reason: Option<String>) -> Result<()> {
964        // Check if we have a remote address
965        let remote_addr = match self.config.remote_addr {
966            Some(addr) => addr,
967            None => return Err(Error::SessionError("No remote address configured".to_string())),
968        };
969        
970        // Create BYE packet
971        let bye = crate::packet::rtcp::RtcpGoodbye::new_with_reason(
972            self.ssrc,
973            reason.unwrap_or_else(|| "Session terminated".to_string()),
974        );
975        
976        // Create RTCP packet
977        let rtcp_packet = crate::packet::rtcp::RtcpPacket::Goodbye(bye);
978        
979        // Serialize and send
980        match rtcp_packet.serialize() {
981            Ok(data) => {
982                // Send using transport
983                self.transport.send_rtcp_bytes(&data, remote_addr).await
984            }
985            Err(e) => {
986                Err(Error::SerializationError(format!("Failed to serialize RTCP BYE: {}", e)))
987            }
988        }
989    }
990    
991    /// Send an RTCP Sender Report (SR) packet
992    /// 
993    /// A Sender Report contains:
994    /// - Our SSRC
995    /// - Current NTP and RTP timestamps
996    /// - Packet and octet counts
997    /// - Optional report blocks with reception statistics about other sources
998    /// 
999    /// This method generates an SR based on the current session statistics, which is useful
1000    /// for providing quality metrics to other participants.
1001    /// 
1002    /// Returns an error if serialization fails or if there's no remote address configured.
1003    pub async fn send_sender_report(&self) -> Result<()> {
1004        // Check if we have a remote address
1005        let remote_addr = match self.config.remote_addr {
1006            Some(addr) => addr,
1007            None => return Err(Error::SessionError("No remote address configured".to_string())),
1008        };
1009        
1010        // Get session stats
1011        let session_stats = if let Ok(stats) = self.stats.lock() {
1012            stats.clone()
1013        } else {
1014            RtpSessionStats::default()
1015        };
1016        
1017        // Create a new SR packet
1018        let mut sr = crate::packet::rtcp::RtcpSenderReport::new(self.ssrc);
1019        
1020        // Set current NTP timestamp
1021        sr.ntp_timestamp = crate::packet::rtcp::NtpTimestamp::now();
1022        
1023        // Set current RTP timestamp (convert from NTP time)
1024        sr.rtp_timestamp = self.get_timestamp();
1025        
1026        // Set packet and octet count from session stats
1027        sr.sender_packet_count = session_stats.packets_sent as u32;
1028        sr.sender_octet_count = session_stats.bytes_sent as u32;
1029        
1030        // Add report blocks for active streams (remote SSRCs we're receiving from)
1031        if let Ok(streams) = self.streams.lock() {
1032            // Add report blocks for up to 31 streams (max allowed by RTCP)
1033            for (ssrc, stream) in streams.iter().take(31) {
1034                let stream_stats = stream.get_stats();
1035                
1036                // Create a report block for this source
1037                let mut block = crate::packet::rtcp::RtcpReportBlock::new(*ssrc);
1038                
1039                // Set statistics
1040                let expected_packets = stream_stats.highest_seq - stream_stats.first_seq + 1;
1041                let (fraction_lost, cumulative_lost) = 
1042                    block.calculate_packet_loss(expected_packets, stream_stats.received);
1043                
1044                block.fraction_lost = fraction_lost;
1045                block.cumulative_lost = cumulative_lost as u32;
1046                block.highest_seq = stream_stats.highest_seq;
1047                block.jitter = stream_stats.jitter;
1048                
1049                // TODO: Set last_sr and delay_since_last_sr when we process incoming SRs
1050                
1051                // Add the block to the SR
1052                sr.add_report_block(block);
1053            }
1054        }
1055        
1056        // **FIX: Update our own MediaSync context with the SR data we're sending**
1057        // This ensures our own timing data flows into MediaSync for API access
1058        if let Some(media_sync) = &self.media_sync {
1059            if let Ok(mut sync) = media_sync.write() {
1060                sync.update_from_sr(self.ssrc, sr.ntp_timestamp, sr.rtp_timestamp);
1061                debug!("Updated MediaSync with our own SR: SSRC={:08x}, NTP={:?}, RTP={}", 
1062                       self.ssrc, sr.ntp_timestamp, sr.rtp_timestamp);
1063            }
1064        }
1065        
1066        // Create RTCP packet
1067        let rtcp_packet = crate::packet::rtcp::RtcpPacket::SenderReport(sr);
1068        
1069        // Serialize and send
1070        match rtcp_packet.serialize() {
1071            Ok(data) => {
1072                self.transport.send_rtcp_bytes(&data, remote_addr).await
1073            }
1074            Err(e) => {
1075                Err(Error::SerializationError(format!("Failed to serialize RTCP SR: {}", e)))
1076            }
1077        }
1078    }
1079    
1080    /// Send an RTCP Receiver Report (RR) packet
1081    /// 
1082    /// A Receiver Report contains:
1083    /// - Our SSRC
1084    /// - Report blocks with reception statistics about other sources
1085    /// 
1086    /// This method generates an RR based on the current stream statistics, which is useful
1087    /// for providing quality metrics to other participants when we're receiving but not sending.
1088    /// 
1089    /// Returns an error if serialization fails or if there's no remote address configured.
1090    pub async fn send_receiver_report(&self) -> Result<()> {
1091        // Check if we have a remote address
1092        let remote_addr = match self.config.remote_addr {
1093            Some(addr) => addr,
1094            None => return Err(Error::SessionError("No remote address configured".to_string())),
1095        };
1096        
1097        // Create a new RR packet
1098        let mut rr = crate::packet::rtcp::RtcpReceiverReport::new(self.ssrc);
1099        
1100        // Add report blocks for active streams (remote SSRCs we're receiving from)
1101        if let Ok(streams) = self.streams.lock() {
1102            // Add report blocks for up to 31 streams (max allowed by RTCP)
1103            for (ssrc, stream) in streams.iter().take(31) {
1104                let stream_stats = stream.get_stats();
1105                
1106                // Create a report block for this source
1107                let mut block = crate::packet::rtcp::RtcpReportBlock::new(*ssrc);
1108                
1109                // Set statistics
1110                let expected_packets = stream_stats.highest_seq - stream_stats.first_seq + 1;
1111                let (fraction_lost, cumulative_lost) = 
1112                    block.calculate_packet_loss(expected_packets, stream_stats.received);
1113                
1114                block.fraction_lost = fraction_lost;
1115                block.cumulative_lost = cumulative_lost as u32;
1116                block.highest_seq = stream_stats.highest_seq;
1117                block.jitter = stream_stats.jitter;
1118                
1119                // TODO: Set last_sr and delay_since_last_sr when we process incoming SRs
1120                
1121                // Add the block to the RR
1122                rr.add_report_block(block);
1123            }
1124        }
1125        
1126        // Create RTCP packet
1127        let rtcp_packet = crate::packet::rtcp::RtcpPacket::ReceiverReport(rr);
1128        
1129        // Serialize and send
1130        match rtcp_packet.serialize() {
1131            Ok(data) => {
1132                self.transport.send_rtcp_bytes(&data, remote_addr).await
1133            }
1134            Err(e) => {
1135                Err(Error::SerializationError(format!("Failed to serialize RTCP RR: {}", e)))
1136            }
1137        }
1138    }
1139    
1140    /// Enable media synchronization
1141    pub fn enable_media_sync(&mut self) -> Arc<std::sync::RwLock<crate::sync::MediaSync>> {
1142        let sync = Arc::new(std::sync::RwLock::new(crate::sync::MediaSync::new()));
1143        self.media_sync = Some(sync.clone());
1144        
1145        // Register our stream
1146        if let Ok(mut media_sync) = sync.write() {
1147            media_sync.register_stream(self.ssrc, self.config.clock_rate);
1148        }
1149        
1150        sync
1151    }
1152    
1153    /// Get the media synchronization context
1154    pub fn media_sync(&self) -> Option<Arc<std::sync::RwLock<crate::sync::MediaSync>>> {
1155        self.media_sync.clone()
1156    }
1157    
1158    /// Set the session bandwidth in bits per second
1159    ///
1160    /// This affects the RTCP report interval calculation.
1161    /// Higher bandwidth means more frequent RTCP packets.
1162    pub fn set_bandwidth(&mut self, bandwidth_bps: u32) {
1163        self.bandwidth_bps = bandwidth_bps;
1164    }
1165    
1166    /// Create a sender handle for this session
1167    /// 
1168    /// This creates a lightweight handle that can be used to send RTP packets
1169    /// from another thread. This is useful when you need to send packets
1170    /// but don't want to clone the entire session.
1171    pub fn create_sender_handle(&self) -> RtpSessionSender {
1172        RtpSessionSender {
1173            sender: self.sender.clone(),
1174            ssrc: self.ssrc,
1175            payload_type: self.config.payload_type,
1176            clock_rate: self.config.clock_rate,
1177        }
1178    }
1179    
1180    /// Get the UDP socket handle from the transport
1181    /// 
1182    /// This method is used to access the underlying UDP socket when needed for
1183    /// other protocols that need to share the same socket (e.g., DTLS).
1184    pub async fn get_socket_handle(&self) -> Result<Arc<UdpSocket>> {
1185        // Try to get the socket from the UdpRtpTransport
1186        if let Some(t) = self.transport.as_any().downcast_ref::<UdpRtpTransport>() {
1187            // Clone and return the RTP socket using the public method
1188            let socket = t.get_socket();
1189            return Ok(socket);
1190        }
1191        
1192        // If we get here, the transport is not UdpRtpTransport
1193        Err(Error::Transport("Transport is not a UDP transport".to_string()))
1194    }
1195}
1196
1197/// A lightweight sender handle for an RTP session
1198///
1199/// This handle can be used to send RTP packets to the session
1200/// from another thread without having to clone the entire session.
1201#[derive(Clone)]
1202pub struct RtpSessionSender {
1203    /// Channel for sending packets
1204    sender: mpsc::Sender<RtpPacket>,
1205    
1206    /// SSRC for this session
1207    ssrc: RtpSsrc,
1208    
1209    /// Payload type
1210    payload_type: u8,
1211    
1212    /// Clock rate for the payload type
1213    clock_rate: u32,
1214}
1215
1216impl RtpSessionSender {
1217    /// Send an RTP packet with payload
1218    pub async fn send_packet(&self, timestamp: RtpTimestamp, payload: Bytes, marker: bool) -> Result<()> {
1219        // Create RTP header
1220        let mut header = RtpHeader::new(
1221            self.payload_type,
1222            0, // Sequence number will be set by scheduler
1223            timestamp,
1224            self.ssrc,
1225        );
1226        
1227        // Set marker bit if needed
1228        header.marker = marker;
1229        
1230        // Create packet
1231        let packet = RtpPacket::new(header, payload);
1232        
1233        // Send the packet
1234        self.sender.send(packet)
1235            .await
1236            .map_err(|_| Error::SessionError("Failed to send packet".to_string()))
1237    }
1238}