rtc_interceptor/report/
sender.rs

1//! Sender Report Interceptor - Filters hop-by-hop RTCP feedback.
2
3use super::sender_stream::SenderStream;
4use crate::stream_info::StreamInfo;
5use crate::{Interceptor, Packet, TaggedPacket};
6use rtcp::header::PacketType;
7use shared::TransportContext;
8use shared::error::Error;
9use std::collections::{HashMap, VecDeque};
10use std::marker::PhantomData;
11use std::time::{Duration, Instant};
12
13/// Builder for the SenderReportInterceptor.
14///
15/// # Example
16///
17/// ```ignore
18/// use rtc_interceptor::{Registry, SenderReportBuilder};
19/// use std::time::Duration;
20///
21/// // With default interval (1 second)
22/// let chain = Registry::new()
23///     .with(SenderReportBuilder::new().build())
24///     .build();
25///
26/// // With custom interval
27/// let chain = Registry::new()
28///     .with(SenderReportBuilder::new().with_interval(Duration::from_millis(500)).build())
29///     .build();
30///
31/// // With use_latest_packet enabled
32/// let chain = Registry::new()
33///     .with(SenderReportBuilder::new().with_use_latest_packet().build())
34///     .build();
35/// ```
36pub struct SenderReportBuilder<P> {
37    /// Interval between sender reports.
38    interval: Duration,
39    /// Whether to always use the latest packet, even if out-of-order.
40    use_latest_packet: bool,
41    _phantom: PhantomData<P>,
42}
43
44impl<P> Default for SenderReportBuilder<P> {
45    fn default() -> Self {
46        Self {
47            interval: Duration::from_secs(1),
48            use_latest_packet: false,
49            _phantom: PhantomData,
50        }
51    }
52}
53
54impl<P> SenderReportBuilder<P> {
55    /// Create a new builder with default settings.
56    ///
57    /// Default interval is 1 second.
58    pub fn new() -> Self {
59        Self::default()
60    }
61
62    /// Set a custom interval between sender reports.
63    ///
64    /// # Example
65    ///
66    /// ```ignore
67    /// use std::time::Duration;
68    /// use rtc_interceptor::SenderReportBuilder;
69    ///
70    /// let builder = SenderReportBuilder::new()
71    ///     .with_interval(Duration::from_millis(500));
72    /// ```
73    pub fn with_interval(mut self, interval: Duration) -> Self {
74        self.interval = interval;
75        self
76    }
77
78    /// Enable always using the latest packet for timestamp tracking,
79    /// even if it appears to be out-of-order based on sequence numbers.
80    ///
81    /// By default (disabled), only in-order packets update the RTP↔NTP
82    /// timestamp correlation. This prevents out-of-order packets from
83    /// corrupting the timestamp mapping.
84    ///
85    /// Enable this option when:
86    /// - Packets are guaranteed to arrive in order
87    /// - The application reorders packets before the interceptor
88    /// - You want the sender report to always reflect the most recent packet
89    ///
90    /// # Example
91    ///
92    /// ```ignore
93    /// use rtc_interceptor::SenderReportBuilder;
94    ///
95    /// let builder = SenderReportBuilder::new()
96    ///     .with_use_latest_packet();
97    /// ```
98    pub fn with_use_latest_packet(mut self) -> Self {
99        self.use_latest_packet = true;
100        self
101    }
102
103    /// Create a builder function for use with Registry.
104    ///
105    /// This returns a closure that can be passed to `Registry::with()`.
106    ///
107    /// # Example
108    ///
109    /// ```ignore
110    /// use rtc_interceptor::{Registry, SenderReportBuilder};
111    ///
112    /// let registry = Registry::new()
113    ///     .with(SenderReportBuilder::new().build());
114    /// ```
115    pub fn build(self) -> impl FnOnce(P) -> SenderReportInterceptor<P> {
116        move |inner| SenderReportInterceptor::new(inner, self.interval, self.use_latest_packet)
117    }
118}
119
120/// Interceptor that filters hop-by-hop RTCP reports.
121///
122/// This interceptor filters out RTCP Receiver Reports and Transport-Specific
123/// Feedback, which are hop-by-hop reports that should not be forwarded
124/// end-to-end.
125///
126/// # Type Parameters
127///
128/// - `P`: The inner protocol being wrapped
129///
130/// # Example
131///
132/// ```ignore
133/// use rtc_interceptor::{Registry, SenderReportBuilder};
134///
135/// let chain = Registry::new()
136///     .with(SenderReportBuilder::new().build())
137///     .build();
138/// ```
139pub struct SenderReportInterceptor<P> {
140    inner: P,
141
142    interval: Duration,
143    eto: Instant,
144
145    /// Whether to always use the latest packet, even if out-of-order.
146    use_latest_packet: bool,
147
148    streams: HashMap<u32, SenderStream>,
149
150    read_queue: VecDeque<TaggedPacket>,
151    write_queue: VecDeque<TaggedPacket>,
152}
153
154impl<P> SenderReportInterceptor<P> {
155    /// Create a new SenderReportInterceptor.
156    fn new(inner: P, interval: Duration, use_latest_packet: bool) -> Self {
157        Self {
158            inner,
159
160            interval,
161            eto: Instant::now(),
162
163            use_latest_packet,
164
165            streams: HashMap::new(),
166
167            read_queue: VecDeque::new(),
168            write_queue: VecDeque::new(),
169        }
170    }
171
172    /// Check if an RTCP packet type should be filtered.
173    ///
174    /// Returns `true` for hop-by-hop report types that should not be forwarded:
175    /// - Receiver Report (201)
176    /// - Transport-Specific Feedback (205)
177    fn should_filter(packet_type: PacketType) -> bool {
178        packet_type == PacketType::ReceiverReport
179            || (packet_type == PacketType::TransportSpecificFeedback)
180    }
181
182    /// Get a reference to the inner protocol.
183    fn inner(&self) -> &P {
184        &self.inner
185    }
186
187    /// Get a mutable reference to the inner protocol.
188    fn inner_mut(&mut self) -> &mut P {
189        &mut self.inner
190    }
191}
192
193impl<P: Interceptor> sansio::Protocol<TaggedPacket, TaggedPacket, ()>
194    for SenderReportInterceptor<P>
195{
196    type Rout = TaggedPacket;
197    type Wout = TaggedPacket;
198    type Eout = ();
199    type Error = Error;
200    type Time = Instant;
201
202    fn handle_read(&mut self, msg: TaggedPacket) -> Result<(), Self::Error> {
203        self.inner.handle_read(msg)
204    }
205
206    fn poll_read(&mut self) -> Option<Self::Rout> {
207        self.inner.poll_read()
208    }
209
210    fn handle_write(&mut self, msg: TaggedPacket) -> Result<(), Self::Error> {
211        if let Packet::Rtp(rtp_packet) = &msg.message
212            && let Some(stream) = self.streams.get_mut(&rtp_packet.header.ssrc)
213        {
214            stream.process_rtp(msg.now, rtp_packet);
215        }
216
217        self.inner.handle_write(msg)
218    }
219
220    fn poll_write(&mut self) -> Option<Self::Wout> {
221        // First drain generated RTCP reports
222        if let Some(pkt) = self.write_queue.pop_front() {
223            return Some(pkt);
224        }
225        self.inner.poll_write()
226    }
227
228    fn handle_timeout(&mut self, now: Self::Time) -> Result<(), Self::Error> {
229        if self.eto <= now {
230            self.eto = now + self.interval;
231
232            for stream in self.streams.values_mut() {
233                let rr = stream.generate_report(now);
234                self.write_queue.push_back(TaggedPacket {
235                    now,
236                    transport: TransportContext::default(),
237                    message: Packet::Rtcp(vec![Box::new(rr)]),
238                });
239            }
240        }
241
242        self.inner.handle_timeout(now)
243    }
244
245    fn poll_timeout(&mut self) -> Option<Self::Time> {
246        if let Some(eto) = self.inner.poll_timeout()
247            && eto < self.eto
248        {
249            Some(eto)
250        } else {
251            Some(self.eto)
252        }
253    }
254}
255
256impl<P: Interceptor> Interceptor for SenderReportInterceptor<P> {
257    fn bind_local_stream(&mut self, info: &StreamInfo) {
258        let stream = SenderStream::new(info.ssrc, info.clock_rate, self.use_latest_packet);
259        self.streams.insert(info.ssrc, stream);
260
261        self.inner.bind_local_stream(info);
262    }
263    fn unbind_local_stream(&mut self, info: &StreamInfo) {
264        self.streams.remove(&info.ssrc);
265
266        self.inner.unbind_local_stream(info);
267    }
268    fn bind_remote_stream(&mut self, info: &StreamInfo) {
269        self.inner.bind_remote_stream(info);
270    }
271    fn unbind_remote_stream(&mut self, info: &StreamInfo) {
272        self.inner.unbind_remote_stream(info);
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use crate::{NoopInterceptor, Registry};
280    use sansio::Protocol;
281
282    fn dummy_rtp_packet() -> TaggedPacket {
283        TaggedPacket {
284            now: Instant::now(),
285            transport: Default::default(),
286            message: crate::Packet::Rtp(rtp::Packet::default()),
287        }
288    }
289
290    #[test]
291    fn test_sender_report_builder_default() {
292        // Build with default interval (1 second)
293        let chain = Registry::new()
294            .with(SenderReportBuilder::default().build())
295            .build();
296
297        assert_eq!(chain.interval, Duration::from_secs(1));
298    }
299
300    #[test]
301    fn test_sender_report_builder_with_custom_interval() {
302        // Build with custom interval
303        let chain = Registry::new()
304            .with(
305                SenderReportBuilder::default()
306                    .with_interval(Duration::from_millis(500))
307                    .build(),
308            )
309            .build();
310
311        assert_eq!(chain.interval, Duration::from_millis(500));
312    }
313
314    #[test]
315    fn test_sender_report_chain_handle_read_write() {
316        // Build a chain and test packet flow
317        let mut chain = Registry::new()
318            .with(SenderReportBuilder::default().build())
319            .build();
320
321        // Test read path
322        let pkt = dummy_rtp_packet();
323        let pkt_message = pkt.message.clone();
324        chain.handle_read(pkt).unwrap();
325        assert_eq!(chain.poll_read().unwrap().message, pkt_message);
326
327        // Test write path
328        let pkt2 = dummy_rtp_packet();
329        let pkt2_message = pkt2.message.clone();
330        chain.handle_write(pkt2).unwrap();
331        assert_eq!(chain.poll_write().unwrap().message, pkt2_message);
332    }
333
334    #[test]
335    fn test_should_filter() {
336        // Receiver Report (RR) - should filter
337        assert!(SenderReportInterceptor::<NoopInterceptor>::should_filter(
338            PacketType::ReceiverReport
339        ));
340
341        // Transport-Specific Feedback - should filter
342        assert!(SenderReportInterceptor::<NoopInterceptor>::should_filter(
343            PacketType::TransportSpecificFeedback
344        ));
345
346        // Sender Report (SR) - should NOT filter
347        assert!(!SenderReportInterceptor::<NoopInterceptor>::should_filter(
348            PacketType::SenderReport
349        ));
350
351        // Source Description (SDES) - should NOT filter
352        assert!(!SenderReportInterceptor::<NoopInterceptor>::should_filter(
353            PacketType::SourceDescription
354        ));
355
356        // Goodbye (BYE) - should NOT filter
357        assert!(!SenderReportInterceptor::<NoopInterceptor>::should_filter(
358            PacketType::Goodbye
359        ));
360    }
361
362    #[test]
363    fn test_inner_access() {
364        let mut chain = Registry::new()
365            .with(SenderReportBuilder::default().build())
366            .build();
367
368        // Test immutable access
369        let _ = chain.inner();
370
371        // Test mutable access - can modify inner
372        let pkt = dummy_rtp_packet();
373        let pkt_message = pkt.message.clone();
374        chain.inner_mut().handle_write(pkt).unwrap();
375        assert_eq!(chain.inner_mut().poll_write().unwrap().message, pkt_message);
376    }
377
378    #[test]
379    fn test_use_latest_packet_option() {
380        // Build with use_latest_packet enabled
381        let chain = Registry::new()
382            .with(
383                SenderReportBuilder::default()
384                    .with_use_latest_packet()
385                    .build(),
386            )
387            .build();
388
389        assert!(chain.use_latest_packet);
390
391        // Build without use_latest_packet (default)
392        let chain_default = Registry::new()
393            .with(SenderReportBuilder::default().build())
394            .build();
395
396        assert!(!chain_default.use_latest_packet);
397    }
398
399    #[test]
400    fn test_use_latest_packet_combined_options() {
401        // Test combining multiple options
402        let chain = Registry::new()
403            .with(
404                SenderReportBuilder::default()
405                    .with_interval(Duration::from_millis(250))
406                    .with_use_latest_packet()
407                    .build(),
408            )
409            .build();
410
411        assert_eq!(chain.interval, Duration::from_millis(250));
412        assert!(chain.use_latest_packet);
413    }
414
415    #[test]
416    fn test_sender_report_generation_on_timeout() {
417        // Port of pion's TestSenderInterceptor - tests full timeout/report cycle
418        // No ticker mocking needed - sans-I/O pattern lets us control time directly
419        let mut chain = Registry::new()
420            .with(
421                SenderReportBuilder::default()
422                    .with_interval(Duration::from_secs(1))
423                    .build(),
424            )
425            .build();
426
427        // Bind a local stream
428        let info = StreamInfo {
429            ssrc: 123456,
430            clock_rate: 90000,
431            ..Default::default()
432        };
433        chain.bind_local_stream(&info);
434
435        let base_time = Instant::now();
436
437        // Send some RTP packets through the write path
438        for i in 0..5u16 {
439            let pkt = TaggedPacket {
440                now: base_time,
441                transport: Default::default(),
442                message: Packet::Rtp(rtp::Packet {
443                    header: rtp::header::Header {
444                        ssrc: 123456,
445                        sequence_number: i,
446                        timestamp: i as u32 * 3000,
447                        ..Default::default()
448                    },
449                    payload: vec![0u8; 100].into(),
450                    ..Default::default()
451                }),
452            };
453            chain.handle_write(pkt).unwrap();
454            // Drain the write queue
455            chain.poll_write();
456        }
457
458        // First timeout triggers report generation (eto was set at construction)
459        chain.handle_timeout(base_time).unwrap();
460
461        // Drain any reports from initial timeout
462        while chain.poll_write().is_some() {}
463
464        // Advance time past the interval
465        let later_time = base_time + Duration::from_secs(2);
466        chain.handle_timeout(later_time).unwrap();
467
468        // Now a sender report should be generated
469        let report = chain.poll_write();
470        assert!(report.is_some());
471
472        if let Some(tagged) = report {
473            if let Packet::Rtcp(rtcp_packets) = tagged.message {
474                assert_eq!(rtcp_packets.len(), 1);
475                let sr = rtcp_packets[0]
476                    .as_any()
477                    .downcast_ref::<rtcp::sender_report::SenderReport>()
478                    .expect("Expected SenderReport");
479                assert_eq!(sr.ssrc, 123456);
480                assert_eq!(sr.packet_count, 5);
481                assert_eq!(sr.octet_count, 500);
482            } else {
483                panic!("Expected RTCP packet");
484            }
485        }
486    }
487
488    #[test]
489    fn test_sender_report_multiple_streams() {
490        // Test that multiple streams each generate their own sender reports
491        let mut chain = Registry::new()
492            .with(
493                SenderReportBuilder::default()
494                    .with_interval(Duration::from_secs(1))
495                    .build(),
496            )
497            .build();
498
499        // Bind two local streams
500        let info1 = StreamInfo {
501            ssrc: 111111,
502            clock_rate: 90000,
503            ..Default::default()
504        };
505        let info2 = StreamInfo {
506            ssrc: 222222,
507            clock_rate: 48000,
508            ..Default::default()
509        };
510        chain.bind_local_stream(&info1);
511        chain.bind_local_stream(&info2);
512
513        let base_time = Instant::now();
514
515        // Send packets on stream 1
516        for i in 0..3u16 {
517            let pkt = TaggedPacket {
518                now: base_time,
519                transport: Default::default(),
520                message: Packet::Rtp(rtp::Packet {
521                    header: rtp::header::Header {
522                        ssrc: 111111,
523                        sequence_number: i,
524                        timestamp: i as u32 * 3000,
525                        ..Default::default()
526                    },
527                    payload: vec![0u8; 50].into(),
528                    ..Default::default()
529                }),
530            };
531            chain.handle_write(pkt).unwrap();
532            chain.poll_write();
533        }
534
535        // Send packets on stream 2
536        for i in 0..7u16 {
537            let pkt = TaggedPacket {
538                now: base_time,
539                transport: Default::default(),
540                message: Packet::Rtp(rtp::Packet {
541                    header: rtp::header::Header {
542                        ssrc: 222222,
543                        sequence_number: i,
544                        timestamp: i as u32 * 960,
545                        ..Default::default()
546                    },
547                    payload: vec![0u8; 200].into(),
548                    ..Default::default()
549                }),
550            };
551            chain.handle_write(pkt).unwrap();
552            chain.poll_write();
553        }
554
555        // Trigger timeout
556        let later_time = base_time + Duration::from_secs(2);
557        chain.handle_timeout(later_time).unwrap();
558
559        // Should get two sender reports
560        let mut ssrcs = vec![];
561        let mut packet_counts = vec![];
562        let mut octet_counts = vec![];
563
564        while let Some(tagged) = chain.poll_write() {
565            if let Packet::Rtcp(rtcp_packets) = tagged.message {
566                for rtcp_pkt in rtcp_packets {
567                    if let Some(sr) = rtcp_pkt
568                        .as_any()
569                        .downcast_ref::<rtcp::sender_report::SenderReport>()
570                    {
571                        ssrcs.push(sr.ssrc);
572                        packet_counts.push(sr.packet_count);
573                        octet_counts.push(sr.octet_count);
574                    }
575                }
576            }
577        }
578
579        assert_eq!(ssrcs.len(), 2);
580        assert!(ssrcs.contains(&111111));
581        assert!(ssrcs.contains(&222222));
582
583        // Find stream 1's report
584        let idx1 = ssrcs.iter().position(|&s| s == 111111).unwrap();
585        assert_eq!(packet_counts[idx1], 3);
586        assert_eq!(octet_counts[idx1], 150);
587
588        // Find stream 2's report
589        let idx2 = ssrcs.iter().position(|&s| s == 222222).unwrap();
590        assert_eq!(packet_counts[idx2], 7);
591        assert_eq!(octet_counts[idx2], 1400);
592    }
593
594    #[test]
595    fn test_sender_report_unbind_stream() {
596        // Test that unbinding a stream stops generating reports for it
597        let mut chain = Registry::new()
598            .with(
599                SenderReportBuilder::default()
600                    .with_interval(Duration::from_secs(1))
601                    .build(),
602            )
603            .build();
604
605        let info = StreamInfo {
606            ssrc: 123456,
607            clock_rate: 90000,
608            ..Default::default()
609        };
610        chain.bind_local_stream(&info);
611
612        let base_time = Instant::now();
613
614        // Send some packets
615        let pkt = TaggedPacket {
616            now: base_time,
617            transport: Default::default(),
618            message: Packet::Rtp(rtp::Packet {
619                header: rtp::header::Header {
620                    ssrc: 123456,
621                    sequence_number: 0,
622                    timestamp: 0,
623                    ..Default::default()
624                },
625                payload: vec![0u8; 100].into(),
626                ..Default::default()
627            }),
628        };
629        chain.handle_write(pkt).unwrap();
630        chain.poll_write();
631
632        // Unbind the stream
633        chain.unbind_local_stream(&info);
634
635        // Trigger timeout
636        let later_time = base_time + Duration::from_secs(2);
637        chain.handle_timeout(later_time).unwrap();
638
639        // No report should be generated (stream was unbound)
640        assert!(chain.poll_write().is_none());
641    }
642
643    #[test]
644    fn test_poll_timeout_returns_earliest() {
645        // Test that poll_timeout returns the earliest timeout
646        let mut chain = Registry::new()
647            .with(
648                SenderReportBuilder::default()
649                    .with_interval(Duration::from_secs(5))
650                    .build(),
651            )
652            .build();
653
654        // The interceptor should return its own eto
655        let timeout = chain.poll_timeout();
656        assert!(timeout.is_some());
657    }
658}