sfu 0.0.3

WebRTC Selective Forwarding Unit (SFU) in Rust with Sans-IO
Documentation
use crate::interceptor::report::receiver_stream::ReceiverStream;
use crate::interceptor::report::ReportBuilder;
use crate::interceptor::{Interceptor, InterceptorEvent};
use crate::messages::{MessageEvent, RTPMessageEvent, TaggedMessageEvent};
use crate::types::FourTuple;
use retty::transport::TransportContext;
use std::collections::HashMap;
use std::time::{Duration, Instant};

pub(crate) struct ReceiverReport {
    pub(super) interval: Duration,
    pub(super) eto: Instant,
    pub(crate) streams: HashMap<u32, ReceiverStream>,
    pub(super) next: Option<Box<dyn Interceptor>>,
}

impl ReceiverReport {
    pub(crate) fn builder() -> ReportBuilder {
        ReportBuilder {
            is_rr: true,
            ..Default::default()
        }
    }
}

impl Interceptor for ReceiverReport {
    fn chain(mut self: Box<Self>, next: Box<dyn Interceptor>) -> Box<dyn Interceptor> {
        self.next = Some(next);
        self
    }

    fn next(&mut self) -> Option<&mut Box<dyn Interceptor>> {
        self.next.as_mut()
    }

    fn read(&mut self, msg: &mut TaggedMessageEvent) -> Vec<InterceptorEvent> {
        if let MessageEvent::Rtp(RTPMessageEvent::Rtcp(rtcp_packets)) = &msg.message {
            for rtcp_packet in rtcp_packets {
                if let Some(sr) = rtcp_packet
                    .as_any()
                    .downcast_ref::<rtcp::sender_report::SenderReport>()
                {
                    if let Some(stream) = self.streams.get_mut(&sr.ssrc) {
                        stream.process_sender_report(msg.now, sr);
                    }
                }
            }
        } else if let MessageEvent::Rtp(RTPMessageEvent::Rtp(rtp_packet)) = &msg.message {
            if let Some(stream) = self.streams.get_mut(&rtp_packet.header.ssrc) {
                stream.process_rtp(msg.now, rtp_packet);
            }
        }

        if let Some(next) = self.next() {
            next.read(msg)
        } else {
            vec![]
        }
    }

    fn handle_timeout(&mut self, now: Instant, four_tuples: &[FourTuple]) -> Vec<InterceptorEvent> {
        let mut interceptor_events = vec![];

        if self.eto <= now {
            self.eto = now + self.interval;

            for stream in self.streams.values_mut() {
                let rr = stream.generate_report(now);
                for four_tuple in four_tuples {
                    interceptor_events.push(InterceptorEvent::Outbound(TaggedMessageEvent {
                        now,
                        transport: TransportContext {
                            local_addr: four_tuple.local_addr,
                            peer_addr: four_tuple.peer_addr,
                            ecn: None,
                        },
                        message: MessageEvent::Rtp(RTPMessageEvent::Rtcp(vec![Box::new(
                            rr.clone(),
                        )])),
                    }));
                }
            }
        }

        if let Some(next) = self.next() {
            let mut events = next.handle_timeout(now, four_tuples);
            interceptor_events.append(&mut events);
        }
        interceptor_events
    }

    fn poll_timeout(&mut self, eto: &mut Instant) {
        if self.eto < *eto {
            *eto = self.eto
        }

        if let Some(next) = self.next() {
            next.poll_timeout(eto);
        }
    }
}