rvoip_rtp_core/session/mod.rs
1//! RTP Session Management
2//!
3//! This module provides functionality for managing RTP sessions, including
4//! configuration, packet sending/receiving, and jitter buffer management.
5
6mod stream;
7mod scheduling;
8
9pub use stream::{RtpStream, RtpStreamStats};
10pub use scheduling::{RtpScheduler, RtpSchedulerStats};
11
12use bytes::Bytes;
13use rand::Rng;
14use std::collections::HashMap;
15use std::net::SocketAddr;
16use std::sync::{Arc, Mutex};
17use std::time::{Duration, Instant};
18use tokio::net::UdpSocket;
19use tokio::sync::{mpsc, broadcast};
20use tokio::task::JoinHandle;
21use tracing::{error, warn, debug, trace, info};
22
23use crate::error::Error;
24use crate::packet::{RtpHeader, RtpPacket};
25use crate::transport::{RtpTransport, RtpTransportConfig, UdpRtpTransport};
26use crate::{Result, RtpSequenceNumber, RtpSsrc, RtpTimestamp, DEFAULT_MAX_PACKET_SIZE};
27
28// Define the constant locally since it's not publicly exported
29const RTP_MIN_HEADER_SIZE: usize = 12;
30
31/// Stats for an RTP session
32#[derive(Debug, Clone, Default)]
33pub struct RtpSessionStats {
34 /// Total packets sent
35 pub packets_sent: u64,
36
37 /// Total packets received
38 pub packets_received: u64,
39
40 /// Total bytes sent
41 pub bytes_sent: u64,
42
43 /// Total bytes received
44 pub bytes_received: u64,
45
46 /// Packets lost (based on sequence numbers)
47 pub packets_lost: u64,
48
49 /// Duplicate packets received
50 pub packets_duplicated: u64,
51
52 /// Out-of-order packets received
53 pub packets_out_of_order: u64,
54
55 /// Packets discarded by jitter buffer (too old)
56 pub packets_discarded_by_jitter: u64,
57
58 /// Current jitter estimate (in milliseconds)
59 pub jitter_ms: f64,
60
61 /// Remote address of the most recent packet
62 pub remote_addr: Option<SocketAddr>,
63}
64
65/// RTP session configuration options
66#[derive(Debug, Clone)]
67pub struct RtpSessionConfig {
68 /// Local address to bind to
69 pub local_addr: SocketAddr,
70
71 /// Remote address to send packets to
72 pub remote_addr: Option<SocketAddr>,
73
74 /// SSRC to use for sending packets
75 pub ssrc: Option<RtpSsrc>,
76
77 /// Payload type
78 pub payload_type: u8,
79
80 /// Clock rate for the payload type (needed for jitter buffer)
81 pub clock_rate: u32,
82
83 /// Jitter buffer size in packets
84 pub jitter_buffer_size: Option<usize>,
85
86 /// Maximum packet age in the jitter buffer (ms)
87 pub max_packet_age_ms: Option<u32>,
88
89 /// Enable jitter buffer
90 pub enable_jitter_buffer: bool,
91}
92
93impl Default for RtpSessionConfig {
94 fn default() -> Self {
95 Self {
96 local_addr: "0.0.0.0:0".parse().unwrap(),
97 remote_addr: None,
98 ssrc: None,
99 payload_type: 0,
100 clock_rate: 8000, // Default for most audio codecs (8kHz)
101 jitter_buffer_size: Some(50),
102 max_packet_age_ms: Some(200),
103 enable_jitter_buffer: true,
104 }
105 }
106}
107
108/// Events emitted by the RTP session
109#[derive(Debug, Clone)]
110pub enum RtpSessionEvent {
111 /// New packet received
112 PacketReceived(RtpPacket),
113
114 /// Error in the session
115 Error(Error),
116
117 /// BYE RTCP packet received (a party is leaving the session)
118 Bye {
119 /// SSRC of the source that sent the BYE
120 ssrc: RtpSsrc,
121
122 /// Optional reason text
123 reason: Option<String>,
124 },
125
126 /// New stream detected with a specific SSRC
127 /// This event is emitted as soon as the first packet for a new SSRC is received,
128 /// even if the packet is being held in a jitter buffer.
129 NewStreamDetected {
130 /// SSRC of the new stream
131 ssrc: RtpSsrc,
132 },
133
134 /// RTCP Sender Report received
135 RtcpSenderReport {
136 /// SSRC of the sender
137 ssrc: RtpSsrc,
138
139 /// NTP timestamp
140 ntp_timestamp: crate::packet::rtcp::NtpTimestamp,
141
142 /// RTP timestamp
143 rtp_timestamp: RtpTimestamp,
144
145 /// Packet count
146 packet_count: u32,
147
148 /// Octet count
149 octet_count: u32,
150
151 /// Report blocks
152 report_blocks: Vec<crate::packet::rtcp::RtcpReportBlock>,
153 },
154
155 /// RTCP Receiver Report received
156 RtcpReceiverReport {
157 /// SSRC of the receiver
158 ssrc: RtpSsrc,
159
160 /// Report blocks
161 report_blocks: Vec<crate::packet::rtcp::RtcpReportBlock>,
162 },
163}
164
165/// RTP session for sending and receiving RTP packets
166///
167/// This class manages an RTP session, including sending and receiving packets,
168/// jitter buffer management, and demultiplexing of multiple streams.
169///
170/// # SSRC Demultiplexing
171///
172/// An RTP session can receive packets from multiple sources, each identified by
173/// a unique Synchronization Source identifier (SSRC). This implementation
174/// automatically demultiplexes incoming packets based on their SSRC:
175///
176/// 1. When a packet arrives, its SSRC is extracted
177/// 2. If this is the first packet from this SSRC, a new stream is created
178/// 3. The packet is processed by the appropriate stream, which handles:
179/// - Sequence number tracking
180/// - Jitter calculation
181/// - Duplicate detection
182/// - Packet reordering (via jitter buffer)
183///
184/// Each stream maintains its own statistics and state. You can access information
185/// about individual streams using the `get_stream()`, `get_all_streams()`, and
186/// `stream_count()` methods.
187///
188/// This approach aligns with RFC 3550 Section 8.2, which describes how to handle
189/// multiple sources in a single RTP session.
190pub struct RtpSession {
191 /// Session configuration
192 config: RtpSessionConfig,
193
194 /// SSRC for this session
195 ssrc: RtpSsrc,
196
197 /// Transport for sending/receiving packets
198 transport: Arc<dyn RtpTransport>,
199
200 /// Map of received streams by SSRC
201 streams: Arc<Mutex<HashMap<RtpSsrc, RtpStream>>>,
202
203 /// Packet scheduler for sending packets
204 scheduler: Option<RtpScheduler>,
205
206 /// Channel for receiving packets
207 receiver: mpsc::Receiver<RtpPacket>,
208
209 /// Channel for sending packets
210 sender: mpsc::Sender<RtpPacket>,
211
212 /// Event broadcaster
213 event_tx: broadcast::Sender<RtpSessionEvent>,
214
215 /// Receiving task handle
216 recv_task: Option<JoinHandle<()>>,
217
218 /// Sending task handle
219 send_task: Option<JoinHandle<()>>,
220
221 /// Session statistics
222 stats: Arc<Mutex<RtpSessionStats>>,
223
224 /// Media synchronization context
225 media_sync: Option<Arc<std::sync::RwLock<crate::sync::MediaSync>>>,
226
227 /// Whether the session is active
228 active: bool,
229
230 /// RTCP report generator
231 rtcp_generator: Option<crate::stats::reports::RtcpReportGenerator>,
232
233 /// RTCP sender task
234 rtcp_task: Option<JoinHandle<()>>,
235
236 /// Session bandwidth (bits per second)
237 bandwidth_bps: u32,
238}
239
240impl RtpSession {
241 /// Create a new RTP session
242 pub async fn new(config: RtpSessionConfig) -> Result<Self> {
243 // Generate SSRC if not provided
244 let ssrc = config.ssrc.unwrap_or_else(|| {
245 let mut rng = rand::thread_rng();
246 rng.gen::<u32>()
247 });
248
249 // Create transport config - respect provided ports!
250 let transport_config = RtpTransportConfig {
251 local_rtp_addr: config.local_addr,
252 local_rtcp_addr: None, // RTCP on same port for now
253 symmetric_rtp: true,
254 rtcp_mux: true, // Enable RTCP multiplexing by default
255 session_id: Some(format!("rtp-session-{}", ssrc)),
256 // Don't allocate a new port - use the one provided in config
257 use_port_allocator: false,
258 };
259
260 // Create UDP transport
261 let transport = Arc::new(UdpRtpTransport::new(transport_config).await?);
262
263 // Create channels for internal communication
264 // Increased capacity to handle longer sessions without dropping packets
265 let (sender_tx, sender_rx) = mpsc::channel(1000);
266 let (receiver_tx, receiver_rx) = mpsc::channel(1000);
267 let (event_tx, _) = broadcast::channel(1000);
268
269 // Create scheduler if needed
270 let scheduler = Some(RtpScheduler::new(
271 config.clock_rate,
272 rand::thread_rng().gen::<u16>(), // Random starting sequence
273 rand::thread_rng().gen::<u32>(), // Random starting timestamp
274 ));
275
276 // Create RTCP report generator
277 let hostname = hostname::get().unwrap_or_else(|_| "unknown".into());
278 let hostname_str = hostname.to_string_lossy();
279 let cname = format!("{}@{}", std::env::var("USER").unwrap_or_else(|_| "user".to_string()), hostname_str);
280 let rtcp_generator = crate::stats::reports::RtcpReportGenerator::new(ssrc, cname);
281
282 let mut session = Self {
283 config,
284 ssrc,
285 transport,
286 streams: Arc::new(Mutex::new(HashMap::new())),
287 scheduler,
288 receiver: receiver_rx,
289 sender: sender_tx,
290 event_tx,
291 recv_task: None,
292 send_task: None,
293 stats: Arc::new(Mutex::new(RtpSessionStats::default())),
294 media_sync: None,
295 active: false,
296 rtcp_generator: Some(rtcp_generator),
297 rtcp_task: None,
298 bandwidth_bps: 64000, // Default bandwidth: 64 kbps
299 };
300
301 // Start the session
302 session.start(sender_rx, receiver_tx).await?;
303
304 Ok(session)
305 }
306
307 /// Start the session tasks
308 async fn start(
309 &mut self,
310 mut sender_rx: mpsc::Receiver<RtpPacket>,
311 receiver_tx: mpsc::Sender<RtpPacket>,
312 ) -> Result<()> {
313 if self.active {
314 return Ok(());
315 }
316
317 let transport = self.transport.clone();
318 let stats_send = self.stats.clone();
319 let stats_recv = self.stats.clone();
320 let remote_addr = self.config.remote_addr;
321 let event_tx_send = self.event_tx.clone();
322 let event_tx_recv = self.event_tx.clone();
323 let clock_rate = self.config.clock_rate;
324 let payload_type = self.config.payload_type;
325 let ssrc = self.ssrc;
326 let streams_map = self.streams.clone();
327 let jitter_buffer_enabled = self.config.enable_jitter_buffer;
328 let jitter_size = self.config.jitter_buffer_size.unwrap_or(50);
329 let max_age_ms = self.config.max_packet_age_ms.unwrap_or(200);
330
331 let media_sync = self.media_sync.clone();
332
333 // If we have a remote address, set it on the transport
334 if let Some(addr) = remote_addr {
335 // Set the remote RTP address on the UDP transport
336 if let Some(t) = transport.as_any().downcast_ref::<UdpRtpTransport>() {
337 t.set_remote_rtp_addr(addr).await;
338 }
339 }
340
341 // Start the scheduler if available
342 if let Some(scheduler) = &mut self.scheduler {
343 let sender_tx = self.sender.clone();
344 scheduler.set_sender(sender_tx);
345
346 // Set appropriate timestamp increment based on packet interval
347 let interval_ms = 20; // Default 20ms packet interval
348 let samples_per_packet = (clock_rate as f64 * (interval_ms as f64 / 1000.0)) as u32;
349 scheduler.set_interval(interval_ms, samples_per_packet);
350
351 scheduler.start()?;
352 }
353
354 // Start sending task
355 let send_transport = transport.clone();
356 let send_task = tokio::spawn(async move {
357 let mut last_remote_addr = remote_addr;
358
359 while let Some(packet) = sender_rx.recv().await {
360 // Always try to get the current remote address from transport first
361 let dest = if let Some(t) = send_transport.as_any().downcast_ref::<UdpRtpTransport>() {
362 // Check transport for current remote address
363 match t.remote_rtp_addr().await {
364 Some(addr) => {
365 // Update our cached value
366 last_remote_addr = Some(addr);
367 addr
368 }
369 None => {
370 // Fall back to cached value if transport doesn't have one
371 if let Some(addr) = last_remote_addr {
372 addr
373 } else {
374 // No destination address, can't send
375 warn!("No destination address for RTP packet, dropping");
376 continue;
377 }
378 }
379 }
380 } else {
381 // Not a UDP transport, use cached value
382 if let Some(addr) = last_remote_addr {
383 addr
384 } else {
385 // No destination address, can't send
386 warn!("No destination address for RTP packet, dropping");
387 continue;
388 }
389 };
390
391 // Send the packet
392 debug!("Sending RTP packet to {} (seq={}, timestamp={})",
393 dest, packet.header.sequence_number, packet.header.timestamp);
394
395 if let Err(e) = send_transport.send_rtp(&packet, dest).await {
396 error!("Failed to send RTP packet: {}", e);
397
398 // Broadcast error event
399 let _ = event_tx_send.send(RtpSessionEvent::Error(e));
400 continue;
401 }
402
403 debug!("Successfully sent RTP packet to {}", dest);
404
405 // Update stats
406 if let Ok(mut session_stats) = stats_send.lock() {
407 session_stats.packets_sent += 1;
408 session_stats.bytes_sent += packet.size() as u64;
409 }
410 }
411 });
412
413 // Start receiving task
414 let recv_transport = transport.clone();
415
416 // Subscribe to transport events to handle RTCP packets
417 let mut transport_events = recv_transport.subscribe();
418
419 let recv_task = tokio::spawn(async move {
420 // IMPORTANT: Only handle events from transport, no direct packet reception
421 // to avoid race conditions where two tasks read from the same socket
422 loop {
423 match transport_events.recv().await {
424 Ok(crate::traits::RtpEvent::RtcpReceived { data, source }) => {
425 // Try to parse the RTCP packet
426 if let Ok(rtcp_packet) = crate::packet::rtcp::RtcpPacket::parse(&data) {
427 // Handle the RTCP packet based on its type
428 match rtcp_packet {
429 crate::packet::rtcp::RtcpPacket::Goodbye(bye) => {
430 // Extract the SSRC and reason
431 if !bye.sources.is_empty() {
432 let source_ssrc = bye.sources[0];
433
434 // Broadcast BYE event
435 let _ = event_tx_recv.send(RtpSessionEvent::Bye {
436 ssrc: source_ssrc,
437 reason: bye.reason,
438 });
439
440 info!("Received RTCP BYE from SSRC={:08x}", source_ssrc);
441 }
442 },
443 crate::packet::rtcp::RtcpPacket::SenderReport(sr) => {
444 // Process sender report
445 let report_ssrc = sr.ssrc;
446
447 debug!("Received RTCP SR from SSRC={:08x}", report_ssrc);
448
449 // Update stream statistics if this stream exists
450 if let Ok(mut streams) = streams_map.lock() {
451 if let Some(stream) = streams.get_mut(&report_ssrc) {
452 // Update the stream's RTCP SR info
453 // This will be used for calculating round-trip time
454 stream.update_last_sr_info(
455 sr.ntp_timestamp.to_u32(),
456 std::time::Instant::now(),
457 );
458
459 debug!("Updated RTCP SR info for stream SSRC={:08x}", report_ssrc);
460 }
461 }
462
463 // If media sync is enabled, update it
464 if let Some(sync) = &media_sync {
465 if let Ok(mut media_sync) = sync.write() {
466 // Update synchronization data
467 media_sync.update_from_sr(report_ssrc, sr.ntp_timestamp, sr.rtp_timestamp);
468 }
469 }
470
471 // Emit SR event for external processing
472 let _ = event_tx_recv.send(RtpSessionEvent::RtcpSenderReport {
473 ssrc: report_ssrc,
474 ntp_timestamp: sr.ntp_timestamp,
475 rtp_timestamp: sr.rtp_timestamp,
476 packet_count: sr.sender_packet_count,
477 octet_count: sr.sender_octet_count,
478 report_blocks: sr.report_blocks,
479 });
480 },
481 crate::packet::rtcp::RtcpPacket::ReceiverReport(rr) => {
482 // Process receiver report
483 let report_ssrc = rr.ssrc;
484
485 debug!("Received RTCP RR from SSRC={:08x} with {} report blocks",
486 report_ssrc, rr.report_blocks.len());
487
488 // If there's a report block about our SSRC, process it
489 for block in &rr.report_blocks {
490 if block.ssrc == ssrc {
491 debug!("Processing report block about our SSRC={:08x}", ssrc);
492
493 // Update session stats with packet loss info
494 if let Ok(mut stats) = stats_recv.lock() {
495 stats.packets_lost = block.cumulative_lost as u64;
496
497 // Calculate packet loss percentage
498 let fraction_lost = block.fraction_lost as f64 / 256.0;
499 debug!("Packet loss: {}% (fraction={})",
500 fraction_lost * 100.0, block.fraction_lost);
501 }
502 }
503 }
504
505 // Emit RR event for external processing
506 let _ = event_tx_recv.send(RtpSessionEvent::RtcpReceiverReport {
507 ssrc: report_ssrc,
508 report_blocks: rr.report_blocks,
509 });
510 },
511 // Handle other RTCP packet types as needed
512 _ => {
513 // For now, we're just logging other packet types
514 trace!("Received RTCP packet: {:?}", rtcp_packet);
515 }
516 }
517 } else {
518 warn!("Failed to parse RTCP packet");
519 }
520 }
521 Ok(crate::traits::RtpEvent::MediaReceived { payload_type, timestamp, payload, source, ssrc: ssrc_from_event, marker, .. }) => {
522 // Handle RTP packets received via transport events
523 // This is the ONLY path for RTP packets to avoid race conditions
524
525 // Reconstruct minimal RTP header for processing
526 let header = RtpHeader {
527 version: 2,
528 padding: false,
529 extension: false,
530 cc: 0,
531 marker,
532 payload_type,
533 sequence_number: 0, // Will be set by stream tracking
534 timestamp,
535 ssrc,
536 csrc: vec![],
537 extensions: None,
538 };
539
540 let packet = RtpPacket {
541 header,
542 payload: payload.clone(),
543 };
544
545 // Update stats
546 if let Ok(mut session_stats) = stats_recv.lock() {
547 session_stats.packets_received += 1;
548 session_stats.bytes_received += payload.len() as u64 + 12; // payload + header
549 session_stats.remote_addr = Some(source);
550 }
551
552 // Use the SSRC from the event
553 let packet_ssrc = ssrc_from_event;
554
555 // Get or create the stream for this SSRC
556 let (is_new_stream, output_packet) = {
557 let mut streams = match streams_map.lock() {
558 Ok(streams) => streams,
559 Err(e) => {
560 error!("Failed to lock streams map: {}", e);
561 continue;
562 }
563 };
564
565 let is_new = !streams.contains_key(&packet_ssrc);
566 let stream = streams.entry(packet_ssrc).or_insert_with(|| {
567 info!("New RTP stream detected with SSRC={:08x}", packet_ssrc);
568 RtpStream::new(packet_ssrc, clock_rate)
569 });
570
571 // For simplified processing without full RTP header,
572 // just forward the packet immediately
573 (is_new, Some(packet.clone()))
574 };
575
576 // If this is a new stream, emit the NewStreamDetected event
577 if is_new_stream {
578 let _ = event_tx_recv.send(RtpSessionEvent::NewStreamDetected {
579 ssrc: packet_ssrc,
580 });
581 }
582
583 // Forward the packet
584 if let Some(output) = output_packet {
585 if let Err(e) = receiver_tx.send(output.clone()).await {
586 error!("Failed to forward RTP packet to receiver: {}", e);
587 }
588
589 // Broadcast packet received event
590 let _ = event_tx_recv.send(RtpSessionEvent::PacketReceived(output));
591 }
592 }
593 Ok(crate::traits::RtpEvent::Error(e)) => {
594 error!("Transport error: {}", e);
595 let _ = event_tx_recv.send(RtpSessionEvent::Error(e));
596 }
597 Err(e) => {
598 debug!("Transport event channel error: {}", e);
599 }
600 }
601 }
602 });
603
604 // Start RTCP sending task if we have a remote address and report generator
605 if let (Some(remote_addr), Some(mut rtcp_generator)) = (self.config.remote_addr, self.rtcp_generator.take()) {
606 let transport = self.transport.clone();
607 let ssrc = self.ssrc;
608 let event_tx = self.event_tx.clone();
609 let stats = self.stats.clone();
610 let active_state = Arc::new(tokio::sync::Mutex::new(true));
611 let active_state_clone = active_state.clone();
612 let bandwidth = self.bandwidth_bps;
613
614 // Set bandwidth in the generator
615 rtcp_generator.set_bandwidth(bandwidth);
616
617 // Start the RTCP task
618 let rtcp_task = tokio::spawn(async move {
619 debug!("RTCP scheduling task started");
620
621 // Initial interval calculation
622 let mut interval = rtcp_generator.calculate_interval();
623 debug!("Initial RTCP interval: {:?}", interval);
624
625 while *active_state.lock().await {
626 // Wait for the calculated interval
627 tokio::time::sleep(interval).await;
628
629 // Check if we should continue
630 if !*active_state.lock().await {
631 break;
632 }
633
634 // Update RTP statistics before sending the report
635 if let Ok(session_stats) = stats.lock() {
636 rtcp_generator.update_sent_stats(
637 session_stats.packets_sent as u32,
638 session_stats.bytes_sent as u32
639 );
640
641 // Log the current stats for debugging
642 debug!("Current stats for RTCP report: packets={}, bytes={}",
643 session_stats.packets_sent, session_stats.bytes_sent);
644 }
645
646 // Send an RTCP report regardless of should_send_report logic for this example
647 // We'll send a compound packet with SR and SDES
648 debug!("Sending RTCP report");
649
650 // Generate sender report
651 let rtp_timestamp = std::time::SystemTime::now()
652 .duration_since(std::time::UNIX_EPOCH)
653 .unwrap_or_default()
654 .as_millis() as u32;
655
656 let sr = rtcp_generator.generate_sender_report(rtp_timestamp);
657 let sdes = rtcp_generator.generate_sdes();
658
659 // Create compound packet
660 let mut compound = crate::packet::rtcp::RtcpCompoundPacket::new_with_sr(sr);
661 compound.add_sdes(sdes);
662
663 // Send the compound packet
664 if let Ok(data) = compound.serialize() {
665 if let Err(e) = transport.send_rtcp_bytes(&data, remote_addr).await {
666 warn!("Failed to send RTCP compound packet: {}", e);
667 } else {
668 info!("Sent RTCP compound packet of {} bytes", data.len());
669
670 // Emit SR event
671 if let Some(sr) = compound.get_sr() {
672 let _ = event_tx.send(RtpSessionEvent::RtcpSenderReport {
673 ssrc,
674 ntp_timestamp: sr.ntp_timestamp,
675 rtp_timestamp: sr.rtp_timestamp,
676 packet_count: sr.sender_packet_count,
677 octet_count: sr.sender_octet_count,
678 report_blocks: sr.report_blocks.clone(),
679 });
680 }
681 }
682 }
683
684 // Recalculate interval for next report
685 interval = rtcp_generator.calculate_interval();
686 debug!("Next RTCP report in {:?}", interval);
687 }
688
689 debug!("RTCP scheduling task ended");
690 });
691
692 self.rtcp_task = Some(rtcp_task);
693 }
694
695 self.recv_task = Some(recv_task);
696 self.send_task = Some(send_task);
697 self.active = true;
698
699 info!("Started RTP session with SSRC={:08x}", ssrc);
700 Ok(())
701 }
702
703 /// Send an RTP packet with payload
704 pub async fn send_packet(&mut self, timestamp: RtpTimestamp, payload: Bytes, marker: bool) -> Result<()> {
705 // Create RTP header
706 let mut header = RtpHeader::new(
707 self.config.payload_type,
708 0, // Sequence number will be set by scheduler
709 timestamp,
710 self.ssrc,
711 );
712
713 // Set marker bit if needed
714 header.marker = marker;
715
716 // Create packet
717 let packet = RtpPacket::new(header, payload);
718
719 // If using scheduler, schedule the packet
720 if let Some(scheduler) = &mut self.scheduler {
721 scheduler.schedule_packet(packet)
722 } else {
723 // Otherwise send directly
724 self.sender.send(packet)
725 .await
726 .map_err(|_| Error::SessionError("Failed to send packet".to_string()))
727 }
728 }
729
730 /// Receive an RTP packet
731 pub async fn receive_packet(&mut self) -> Result<RtpPacket> {
732 self.receiver.recv()
733 .await
734 .ok_or_else(|| Error::SessionError("Receiver channel closed".to_string()))
735 }
736
737 /// Get the session statistics
738 pub fn get_stats(&self) -> RtpSessionStats {
739 if let Ok(stats) = self.stats.lock() {
740 stats.clone()
741 } else {
742 RtpSessionStats::default()
743 }
744 }
745
746 /// Set the remote address
747 pub async fn set_remote_addr(&mut self, addr: SocketAddr) {
748 self.config.remote_addr = Some(addr);
749
750 // Update stats with remote address
751 if let Ok(mut stats) = self.stats.lock() {
752 stats.remote_addr = Some(addr);
753 }
754
755 // Update the transport's remote address
756 if let Some(t) = self.transport.as_any().downcast_ref::<UdpRtpTransport>() {
757 t.set_remote_rtp_addr(addr).await;
758 }
759 }
760
761 /// Get the local address
762 pub fn local_addr(&self) -> Result<SocketAddr> {
763 self.transport.local_rtp_addr()
764 }
765
766 /// Get the transport
767 pub fn transport(&self) -> Arc<dyn RtpTransport> {
768 self.transport.clone()
769 }
770
771 /// Close the session and clean up resources
772 pub async fn close(&mut self) -> Result<()> {
773 // Send BYE packet if we have a remote address
774 if let Some(remote_addr) = self.config.remote_addr {
775 // Create BYE packet
776 let bye = crate::packet::rtcp::RtcpGoodbye::new_with_reason(
777 self.ssrc,
778 "Session closed".to_string(),
779 );
780
781 // Create RTCP packet
782 let rtcp_packet = crate::packet::rtcp::RtcpPacket::Goodbye(bye);
783
784 // Serialize and send
785 match rtcp_packet.serialize() {
786 Ok(data) => {
787 // Send using transport (through RTCP port if available)
788 if let Err(e) = self.transport.send_rtcp_bytes(&data, remote_addr).await {
789 warn!("Failed to send RTCP BYE: {}", e);
790 }
791 }
792 Err(e) => {
793 warn!("Failed to serialize RTCP BYE: {}", e);
794 }
795 }
796 }
797
798 // Stop the scheduler if running
799 if let Some(scheduler) = &mut self.scheduler {
800 scheduler.stop().await;
801 }
802
803 // Stop the receive task
804 if let Some(handle) = self.recv_task.take() {
805 handle.abort();
806 }
807
808 // Stop the send task
809 if let Some(handle) = self.send_task.take() {
810 handle.abort();
811 }
812
813 // Stop the RTCP task
814 if let Some(handle) = self.rtcp_task.take() {
815 handle.abort();
816 }
817
818 // Close the transport
819 let _ = self.transport.close().await;
820
821 self.active = false;
822 info!("Closed RTP session with SSRC={:08x}", self.ssrc);
823
824 Ok(())
825 }
826
827 /// Get the current timestamp
828 pub fn get_timestamp(&self) -> RtpTimestamp {
829 if let Some(scheduler) = &self.scheduler {
830 scheduler.get_timestamp()
831 } else {
832 // Generate based on uptime if no scheduler
833 let now = std::time::SystemTime::now();
834 let since_epoch = now.duration_since(std::time::UNIX_EPOCH)
835 .unwrap_or_else(|_| Duration::from_secs(0));
836
837 let secs = since_epoch.as_secs();
838 let nanos = since_epoch.subsec_nanos();
839
840 // Convert to timestamp units (samples)
841 let timestamp_secs = secs * (self.config.clock_rate as u64);
842 let timestamp_fraction = ((nanos as u64) * (self.config.clock_rate as u64)) / 1_000_000_000;
843
844 (timestamp_secs + timestamp_fraction) as u32
845 }
846 }
847
848 /// Get the SSRC of this session
849 pub fn get_ssrc(&self) -> RtpSsrc {
850 self.ssrc
851 }
852
853 /// Subscribe to session events
854 pub fn subscribe(&self) -> broadcast::Receiver<RtpSessionEvent> {
855 self.event_tx.subscribe()
856 }
857
858 /// Get the current payload type
859 pub fn get_payload_type(&self) -> u8 {
860 self.config.payload_type
861 }
862
863 /// Set the payload type
864 pub fn set_payload_type(&mut self, payload_type: u8) {
865 self.config.payload_type = payload_type;
866 }
867
868 /// Get a stream by SSRC, if it exists
869 pub async fn get_stream(&self, ssrc: RtpSsrc) -> Option<RtpStreamStats> {
870 let streams = match self.streams.lock() {
871 Ok(streams) => streams,
872 Err(_) => return None,
873 };
874
875 streams.get(&ssrc).map(|stream| stream.get_stats())
876 }
877
878 /// Get a list of all current streams
879 pub async fn get_all_streams(&self) -> Vec<RtpStreamStats> {
880 let streams = match self.streams.lock() {
881 Ok(streams) => streams,
882 Err(_) => return Vec::new(),
883 };
884
885 streams.values().map(|stream| stream.get_stats()).collect()
886 }
887
888 /// Get the number of active streams
889 pub async fn stream_count(&self) -> usize {
890 let streams = match self.streams.lock() {
891 Ok(streams) => streams,
892 Err(_) => return 0,
893 };
894
895 streams.len()
896 }
897
898 /// Get a list of all SSRCs known to this session
899 ///
900 /// This returns all SSRCs that have been seen, even if their streams
901 /// haven't released any packets from their jitter buffers yet.
902 pub async fn get_all_ssrcs(&self) -> Vec<RtpSsrc> {
903 if let Ok(streams) = self.streams.lock() {
904 streams.keys().copied().collect()
905 } else {
906 Vec::new()
907 }
908 }
909
910 /// Force creation of a stream for a specific SSRC
911 ///
912 /// This is useful when we want to ensure a stream exists for an SSRC
913 /// even if no packets have been received yet.
914 pub async fn create_stream_for_ssrc(&mut self, ssrc: RtpSsrc) -> bool {
915 let mut streams = match self.streams.lock() {
916 Ok(streams) => streams,
917 Err(e) => {
918 error!("Failed to lock streams map: {}", e);
919 return false;
920 }
921 };
922
923 // Check if this SSRC already exists
924 if streams.contains_key(&ssrc) {
925 debug!("Stream for SSRC={:08x} already exists", ssrc);
926 return false;
927 }
928
929 // Create the stream
930 info!("Manually creating new RTP stream for SSRC={:08x}", ssrc);
931 let stream = if self.config.enable_jitter_buffer {
932 debug!("Creating stream with jitter buffer for SSRC={:08x}", ssrc);
933 RtpStream::with_jitter_buffer(
934 ssrc,
935 self.config.clock_rate,
936 self.config.jitter_buffer_size.unwrap_or(50),
937 self.config.max_packet_age_ms.unwrap_or(200) as u64
938 )
939 } else {
940 debug!("Creating stream without jitter buffer for SSRC={:08x}", ssrc);
941 RtpStream::new(ssrc, self.config.clock_rate)
942 };
943
944 // Add the stream
945 streams.insert(ssrc, stream);
946
947 // Emit the new stream event
948 debug!("Emitting NewStreamDetected event for SSRC={:08x}", ssrc);
949 let _ = self.event_tx.send(RtpSessionEvent::NewStreamDetected {
950 ssrc,
951 });
952
953 true
954 }
955
956 /// Send an RTCP BYE packet to notify that we're leaving the session
957 ///
958 /// This can be used to notify other participants that we're leaving the session
959 /// without closing the entire RtpSession. The BYE packet includes our SSRC and
960 /// an optional reason string.
961 ///
962 /// Returns an error if serialization fails or if there's no remote address configured.
963 pub async fn send_bye(&self, reason: Option<String>) -> Result<()> {
964 // Check if we have a remote address
965 let remote_addr = match self.config.remote_addr {
966 Some(addr) => addr,
967 None => return Err(Error::SessionError("No remote address configured".to_string())),
968 };
969
970 // Create BYE packet
971 let bye = crate::packet::rtcp::RtcpGoodbye::new_with_reason(
972 self.ssrc,
973 reason.unwrap_or_else(|| "Session terminated".to_string()),
974 );
975
976 // Create RTCP packet
977 let rtcp_packet = crate::packet::rtcp::RtcpPacket::Goodbye(bye);
978
979 // Serialize and send
980 match rtcp_packet.serialize() {
981 Ok(data) => {
982 // Send using transport
983 self.transport.send_rtcp_bytes(&data, remote_addr).await
984 }
985 Err(e) => {
986 Err(Error::SerializationError(format!("Failed to serialize RTCP BYE: {}", e)))
987 }
988 }
989 }
990
991 /// Send an RTCP Sender Report (SR) packet
992 ///
993 /// A Sender Report contains:
994 /// - Our SSRC
995 /// - Current NTP and RTP timestamps
996 /// - Packet and octet counts
997 /// - Optional report blocks with reception statistics about other sources
998 ///
999 /// This method generates an SR based on the current session statistics, which is useful
1000 /// for providing quality metrics to other participants.
1001 ///
1002 /// Returns an error if serialization fails or if there's no remote address configured.
1003 pub async fn send_sender_report(&self) -> Result<()> {
1004 // Check if we have a remote address
1005 let remote_addr = match self.config.remote_addr {
1006 Some(addr) => addr,
1007 None => return Err(Error::SessionError("No remote address configured".to_string())),
1008 };
1009
1010 // Get session stats
1011 let session_stats = if let Ok(stats) = self.stats.lock() {
1012 stats.clone()
1013 } else {
1014 RtpSessionStats::default()
1015 };
1016
1017 // Create a new SR packet
1018 let mut sr = crate::packet::rtcp::RtcpSenderReport::new(self.ssrc);
1019
1020 // Set current NTP timestamp
1021 sr.ntp_timestamp = crate::packet::rtcp::NtpTimestamp::now();
1022
1023 // Set current RTP timestamp (convert from NTP time)
1024 sr.rtp_timestamp = self.get_timestamp();
1025
1026 // Set packet and octet count from session stats
1027 sr.sender_packet_count = session_stats.packets_sent as u32;
1028 sr.sender_octet_count = session_stats.bytes_sent as u32;
1029
1030 // Add report blocks for active streams (remote SSRCs we're receiving from)
1031 if let Ok(streams) = self.streams.lock() {
1032 // Add report blocks for up to 31 streams (max allowed by RTCP)
1033 for (ssrc, stream) in streams.iter().take(31) {
1034 let stream_stats = stream.get_stats();
1035
1036 // Create a report block for this source
1037 let mut block = crate::packet::rtcp::RtcpReportBlock::new(*ssrc);
1038
1039 // Set statistics
1040 let expected_packets = stream_stats.highest_seq - stream_stats.first_seq + 1;
1041 let (fraction_lost, cumulative_lost) =
1042 block.calculate_packet_loss(expected_packets, stream_stats.received);
1043
1044 block.fraction_lost = fraction_lost;
1045 block.cumulative_lost = cumulative_lost as u32;
1046 block.highest_seq = stream_stats.highest_seq;
1047 block.jitter = stream_stats.jitter;
1048
1049 // TODO: Set last_sr and delay_since_last_sr when we process incoming SRs
1050
1051 // Add the block to the SR
1052 sr.add_report_block(block);
1053 }
1054 }
1055
1056 // **FIX: Update our own MediaSync context with the SR data we're sending**
1057 // This ensures our own timing data flows into MediaSync for API access
1058 if let Some(media_sync) = &self.media_sync {
1059 if let Ok(mut sync) = media_sync.write() {
1060 sync.update_from_sr(self.ssrc, sr.ntp_timestamp, sr.rtp_timestamp);
1061 debug!("Updated MediaSync with our own SR: SSRC={:08x}, NTP={:?}, RTP={}",
1062 self.ssrc, sr.ntp_timestamp, sr.rtp_timestamp);
1063 }
1064 }
1065
1066 // Create RTCP packet
1067 let rtcp_packet = crate::packet::rtcp::RtcpPacket::SenderReport(sr);
1068
1069 // Serialize and send
1070 match rtcp_packet.serialize() {
1071 Ok(data) => {
1072 self.transport.send_rtcp_bytes(&data, remote_addr).await
1073 }
1074 Err(e) => {
1075 Err(Error::SerializationError(format!("Failed to serialize RTCP SR: {}", e)))
1076 }
1077 }
1078 }
1079
1080 /// Send an RTCP Receiver Report (RR) packet
1081 ///
1082 /// A Receiver Report contains:
1083 /// - Our SSRC
1084 /// - Report blocks with reception statistics about other sources
1085 ///
1086 /// This method generates an RR based on the current stream statistics, which is useful
1087 /// for providing quality metrics to other participants when we're receiving but not sending.
1088 ///
1089 /// Returns an error if serialization fails or if there's no remote address configured.
1090 pub async fn send_receiver_report(&self) -> Result<()> {
1091 // Check if we have a remote address
1092 let remote_addr = match self.config.remote_addr {
1093 Some(addr) => addr,
1094 None => return Err(Error::SessionError("No remote address configured".to_string())),
1095 };
1096
1097 // Create a new RR packet
1098 let mut rr = crate::packet::rtcp::RtcpReceiverReport::new(self.ssrc);
1099
1100 // Add report blocks for active streams (remote SSRCs we're receiving from)
1101 if let Ok(streams) = self.streams.lock() {
1102 // Add report blocks for up to 31 streams (max allowed by RTCP)
1103 for (ssrc, stream) in streams.iter().take(31) {
1104 let stream_stats = stream.get_stats();
1105
1106 // Create a report block for this source
1107 let mut block = crate::packet::rtcp::RtcpReportBlock::new(*ssrc);
1108
1109 // Set statistics
1110 let expected_packets = stream_stats.highest_seq - stream_stats.first_seq + 1;
1111 let (fraction_lost, cumulative_lost) =
1112 block.calculate_packet_loss(expected_packets, stream_stats.received);
1113
1114 block.fraction_lost = fraction_lost;
1115 block.cumulative_lost = cumulative_lost as u32;
1116 block.highest_seq = stream_stats.highest_seq;
1117 block.jitter = stream_stats.jitter;
1118
1119 // TODO: Set last_sr and delay_since_last_sr when we process incoming SRs
1120
1121 // Add the block to the RR
1122 rr.add_report_block(block);
1123 }
1124 }
1125
1126 // Create RTCP packet
1127 let rtcp_packet = crate::packet::rtcp::RtcpPacket::ReceiverReport(rr);
1128
1129 // Serialize and send
1130 match rtcp_packet.serialize() {
1131 Ok(data) => {
1132 self.transport.send_rtcp_bytes(&data, remote_addr).await
1133 }
1134 Err(e) => {
1135 Err(Error::SerializationError(format!("Failed to serialize RTCP RR: {}", e)))
1136 }
1137 }
1138 }
1139
1140 /// Enable media synchronization
1141 pub fn enable_media_sync(&mut self) -> Arc<std::sync::RwLock<crate::sync::MediaSync>> {
1142 let sync = Arc::new(std::sync::RwLock::new(crate::sync::MediaSync::new()));
1143 self.media_sync = Some(sync.clone());
1144
1145 // Register our stream
1146 if let Ok(mut media_sync) = sync.write() {
1147 media_sync.register_stream(self.ssrc, self.config.clock_rate);
1148 }
1149
1150 sync
1151 }
1152
1153 /// Get the media synchronization context
1154 pub fn media_sync(&self) -> Option<Arc<std::sync::RwLock<crate::sync::MediaSync>>> {
1155 self.media_sync.clone()
1156 }
1157
1158 /// Set the session bandwidth in bits per second
1159 ///
1160 /// This affects the RTCP report interval calculation.
1161 /// Higher bandwidth means more frequent RTCP packets.
1162 pub fn set_bandwidth(&mut self, bandwidth_bps: u32) {
1163 self.bandwidth_bps = bandwidth_bps;
1164 }
1165
1166 /// Create a sender handle for this session
1167 ///
1168 /// This creates a lightweight handle that can be used to send RTP packets
1169 /// from another thread. This is useful when you need to send packets
1170 /// but don't want to clone the entire session.
1171 pub fn create_sender_handle(&self) -> RtpSessionSender {
1172 RtpSessionSender {
1173 sender: self.sender.clone(),
1174 ssrc: self.ssrc,
1175 payload_type: self.config.payload_type,
1176 clock_rate: self.config.clock_rate,
1177 }
1178 }
1179
1180 /// Get the UDP socket handle from the transport
1181 ///
1182 /// This method is used to access the underlying UDP socket when needed for
1183 /// other protocols that need to share the same socket (e.g., DTLS).
1184 pub async fn get_socket_handle(&self) -> Result<Arc<UdpSocket>> {
1185 // Try to get the socket from the UdpRtpTransport
1186 if let Some(t) = self.transport.as_any().downcast_ref::<UdpRtpTransport>() {
1187 // Clone and return the RTP socket using the public method
1188 let socket = t.get_socket();
1189 return Ok(socket);
1190 }
1191
1192 // If we get here, the transport is not UdpRtpTransport
1193 Err(Error::Transport("Transport is not a UDP transport".to_string()))
1194 }
1195}
1196
1197/// A lightweight sender handle for an RTP session
1198///
1199/// This handle can be used to send RTP packets to the session
1200/// from another thread without having to clone the entire session.
1201#[derive(Clone)]
1202pub struct RtpSessionSender {
1203 /// Channel for sending packets
1204 sender: mpsc::Sender<RtpPacket>,
1205
1206 /// SSRC for this session
1207 ssrc: RtpSsrc,
1208
1209 /// Payload type
1210 payload_type: u8,
1211
1212 /// Clock rate for the payload type
1213 clock_rate: u32,
1214}
1215
1216impl RtpSessionSender {
1217 /// Send an RTP packet with payload
1218 pub async fn send_packet(&self, timestamp: RtpTimestamp, payload: Bytes, marker: bool) -> Result<()> {
1219 // Create RTP header
1220 let mut header = RtpHeader::new(
1221 self.payload_type,
1222 0, // Sequence number will be set by scheduler
1223 timestamp,
1224 self.ssrc,
1225 );
1226
1227 // Set marker bit if needed
1228 header.marker = marker;
1229
1230 // Create packet
1231 let packet = RtpPacket::new(header, payload);
1232
1233 // Send the packet
1234 self.sender.send(packet)
1235 .await
1236 .map_err(|_| Error::SessionError("Failed to send packet".to_string()))
1237 }
1238}