str0m/
session.rs

1use std::collections::{HashMap, VecDeque};
2use std::time::{Duration, Instant};
3
4use crate::bwe::BweKind;
5use crate::crypto::SrtpProfile;
6use crate::crypto::{KeyingMaterial, SrtpCrypto};
7use crate::format::CodecConfig;
8use crate::format::PayloadParams;
9use crate::io::{DatagramSend, DATAGRAM_MTU, DATAGRAM_MTU_WARN};
10use crate::media::KeyframeRequestKind;
11use crate::media::Media;
12use crate::media::{MediaAdded, MediaChanged};
13use crate::packet::SendSideBandwithEstimator;
14use crate::packet::{LeakyBucketPacer, NullPacer, Pacer, PacerImpl};
15use crate::rtp::{Extension, RawPacket};
16use crate::rtp_::Direction;
17use crate::rtp_::MidRid;
18use crate::rtp_::Pt;
19use crate::rtp_::SeqNo;
20use crate::rtp_::SRTCP_OVERHEAD;
21use crate::rtp_::{extend_u16, RtpHeader, SessionId, TwccRecvRegister, TwccSendRegister};
22use crate::rtp_::{Bitrate, ExtensionMap, Mid, Rtcp, RtcpFb};
23use crate::rtp_::{SrtpContext, Ssrc};
24use crate::stats::StatsSnapshot;
25use crate::streams::{RtpPacket, Streams};
26use crate::util::{already_happened, not_happening, Soonest};
27use crate::Event;
28use crate::{net, Reason};
29use crate::{RtcConfig, RtcError};
30
31/// Minimum time we delay between sending nacks. This should be
32/// set high enough to not cause additional problems in very bad
33/// network conditions.
34const NACK_MIN_INTERVAL: Duration = Duration::from_millis(33);
35
36/// Delay between reports of TWCC. This is deliberately very low.
37const TWCC_INTERVAL: Duration = Duration::from_millis(100);
38
39/// Amend to the current_bitrate value.
40const PACING_FACTOR: f64 = 1.1;
41
42/// Amount of deviation needed to emit a new BWE value. This is to reduce
43/// the total number BWE events to only fire when there is a substantial change.
44const ESTIMATE_TOLERANCE: f64 = 0.05;
45
46pub(crate) struct Session {
47    id: SessionId,
48
49    // These fields are pub to allow session_sdp.rs modify them.
50    // Notice the fields are maybe not in m-line index order since the app
51    // might be spliced in somewhere.
52    pub medias: Vec<Media>,
53
54    // The actual RTP encoded streams.
55    pub streams: Streams,
56
57    /// The app m-line. Spliced into medias above.
58    app: Option<(Mid, usize)>,
59
60    reordering_size_audio: usize,
61    reordering_size_video: usize,
62    pub send_buffer_audio: usize,
63    pub send_buffer_video: usize,
64
65    /// Extension mappings are _per BUNDLE_, but we can only have one a=group BUNDLE
66    /// in WebRTC (one ice connection), so they are effectively per session.
67    pub exts: ExtensionMap,
68
69    // Configuration of how we are sending/receiving media.
70    pub codec_config: CodecConfig,
71
72    srtp_rx: Option<SrtpContext>,
73    srtp_tx: Option<SrtpContext>,
74    last_nack: Instant,
75    last_twcc: Instant,
76    twcc: u64,
77    twcc_rx_register: TwccRecvRegister,
78    twcc_tx_register: TwccSendRegister,
79    max_rx_seq_lookup: HashMap<Ssrc, SeqNo>,
80
81    bwe: Option<Bwe>,
82
83    enable_twcc_feedback: bool,
84
85    /// A pacer for sending RTP at specific rate.
86    pacer: PacerImpl,
87
88    // temporary buffer when getting the next (unencrypted) RTP packet from Media line.
89    poll_packet_buf: Vec<u8>,
90
91    // Next packet for RtpPacket event.
92    pending_packet: Option<RtpPacket>,
93
94    pub ice_lite: bool,
95
96    /// Whether we are running in RTP-mode.
97    pub rtp_mode: bool,
98
99    feedback_tx: VecDeque<Rtcp>,
100    feedback_rx: VecDeque<Rtcp>,
101
102    raw_packets: Option<VecDeque<Box<RawPacket>>>,
103}
104
105impl Session {
106    pub fn new(config: &RtcConfig) -> Self {
107        let mut id = SessionId::new();
108        // Max 2^62 - 1: https://bugzilla.mozilla.org/show_bug.cgi?id=861895
109        const MAX_ID: u64 = 2_u64.pow(62) - 1;
110        while *id > MAX_ID {
111            id = (*id >> 1).into();
112        }
113        let (pacer, bwe) = if let Some(config) = &config.bwe_config {
114            let rate = config.initial_bitrate;
115            let pacer = PacerImpl::LeakyBucket(LeakyBucketPacer::new(rate * PACING_FACTOR * 2.0));
116
117            let send_side_bwe = SendSideBandwithEstimator::new(rate, config.enable_loss_controller);
118            let bwe = Bwe {
119                bwe: send_side_bwe,
120                desired_bitrate: Bitrate::ZERO,
121                current_bitrate: rate,
122
123                last_emitted_estimate: Bitrate::ZERO,
124            };
125
126            (pacer, Some(bwe))
127        } else {
128            (PacerImpl::Null(NullPacer::default()), None)
129        };
130
131        let enable_stats = config.stats_interval.is_some();
132
133        Session {
134            id,
135            medias: vec![],
136            streams: Streams::new(enable_stats),
137            app: None,
138            reordering_size_audio: config.reordering_size_audio,
139            reordering_size_video: config.reordering_size_video,
140            send_buffer_audio: config.send_buffer_audio,
141            send_buffer_video: config.send_buffer_video,
142            exts: config.exts.clone(),
143
144            // Both sending and receiving starts from the configured codecs.
145            // These can then be changed in the SDP OFFER/ANSWER dance.
146            codec_config: config.codec_config.clone(),
147
148            srtp_rx: None,
149            srtp_tx: None,
150            last_nack: already_happened(),
151            last_twcc: already_happened(),
152            twcc: 0,
153            twcc_rx_register: TwccRecvRegister::new(100),
154            twcc_tx_register: TwccSendRegister::new(1000),
155            max_rx_seq_lookup: HashMap::new(),
156            bwe,
157            enable_twcc_feedback: false,
158            pacer,
159            poll_packet_buf: vec![0; 2000],
160            pending_packet: None,
161            ice_lite: config.ice_lite,
162            rtp_mode: config.rtp_mode,
163            feedback_tx: VecDeque::new(),
164            feedback_rx: VecDeque::new(),
165            raw_packets: if config.enable_raw_packets {
166                Some(VecDeque::new())
167            } else {
168                None
169            },
170        }
171    }
172
173    pub fn id(&self) -> SessionId {
174        self.id
175    }
176
177    pub fn set_app(&mut self, mid: Mid, index: usize) -> Result<(), String> {
178        if let Some((mid_existing, index_existing)) = self.app {
179            if mid_existing != mid {
180                return Err(format!("App mid changed {} != {}", mid, mid_existing,));
181            }
182            if index_existing != index {
183                return Err(format!("App index changed {} != {}", index, index_existing,));
184            }
185        } else {
186            self.app = Some((mid, index));
187        }
188        Ok(())
189    }
190
191    pub fn app(&self) -> &Option<(Mid, usize)> {
192        &self.app
193    }
194
195    pub fn set_keying_material(
196        &mut self,
197        mat: KeyingMaterial,
198        srtp_crypto: &SrtpCrypto,
199        srtp_profile: SrtpProfile,
200        active: bool,
201    ) {
202        // TODO: rename this to `initialise_srtp_context`?
203        // Whether we're active or passive determines if we use the left or right
204        // hand side of the key material to derive input/output.
205        let left = active;
206
207        self.srtp_rx = Some(SrtpContext::new(srtp_crypto, srtp_profile, &mat, !left));
208        self.srtp_tx = Some(SrtpContext::new(srtp_crypto, srtp_profile, &mat, left));
209    }
210
211    pub fn handle_timeout(&mut self, now: Instant) -> Result<(), RtcError> {
212        // Payload any waiting samples
213        self.do_payload()?;
214
215        let sender_ssrc = self.streams.first_ssrc_local();
216
217        let do_nack = now >= self.nack_at().unwrap_or(not_happening());
218
219        self.streams.handle_timeout(
220            now,
221            sender_ssrc,
222            do_nack,
223            &self.medias,
224            &self.codec_config,
225            &mut self.feedback_tx,
226        );
227
228        if do_nack {
229            self.last_nack = now;
230        }
231
232        self.update_queue_state(now);
233
234        if let Some(twcc_at) = self.twcc_at() {
235            if now >= twcc_at {
236                self.create_twcc_feedback(sender_ssrc, now);
237            }
238        }
239
240        if let Some(bwe) = self.bwe.as_mut() {
241            bwe.handle_timeout(now);
242        }
243
244        Ok(())
245    }
246
247    fn update_queue_state(&mut self, now: Instant) {
248        let iter = self.streams.streams_tx().map(|m| m.queue_state(now));
249
250        let Some(padding_request) = self.pacer.handle_timeout(now, iter) else {
251            return;
252        };
253
254        let stream = self
255            .streams
256            .stream_tx_by_midrid(padding_request.midrid)
257            .expect("pacer to use an existing stream");
258
259        stream.generate_padding(padding_request.padding);
260    }
261
262    fn create_twcc_feedback(&mut self, sender_ssrc: Ssrc, now: Instant) -> Option<()> {
263        self.last_twcc = now;
264        let mut twcc = self.twcc_rx_register.build_report(DATAGRAM_MTU - 100)?;
265
266        // These SSRC are on media level, but twcc is on session level,
267        // we fill in the first discovered media SSRC in each direction.
268        twcc.sender_ssrc = sender_ssrc;
269        twcc.ssrc = self.streams.first_ssrc_remote();
270
271        trace!("Created feedback TWCC: {:?}", twcc);
272        self.feedback_tx.push_front(Rtcp::Twcc(twcc));
273        Some(())
274    }
275
276    pub fn handle_rtp_receive(&mut self, now: Instant, message: &[u8]) {
277        let Some(header) = RtpHeader::parse(message, &self.exts) else {
278            trace!("Failed to parse RTP header");
279            return;
280        };
281
282        self.handle_rtp(now, header, message);
283    }
284
285    pub fn handle_rtcp_receive(&mut self, now: Instant, message: &[u8]) {
286        // According to spec, the outer enclosing SRTCP packet should always be a SR or RR,
287        // even if it's irrelevant and empty.
288        // In practice I'm not sure that is happening, because libWebRTC hates empty packets.
289        self.handle_rtcp(now, message);
290    }
291
292    fn mid_and_ssrc_for_header(&mut self, now: Instant, header: &RtpHeader) -> Option<(Mid, Ssrc)> {
293        let ssrc_header = header.ssrc;
294
295        if let Some(r) = self.streams.mid_ssrc_rx_by_ssrc_or_rtx(now, ssrc_header) {
296            return Some(r);
297        }
298
299        // Attempt to dynamically map this header to some Media/ReceiveStream.
300        self.map_dynamic(header);
301
302        // The dynamic mapping might have added an entry by now.
303        self.streams.mid_ssrc_rx_by_ssrc_or_rtx(now, ssrc_header)
304    }
305
306    fn map_dynamic(&mut self, header: &RtpHeader) {
307        // There are two strategies for dynamically mapping SSRC. Both use the RTP "mid"
308        // header extension.
309        // A) Mid+Rid - used when doing simulcast. Rid points out which
310        //              simulcast layer is in use. There is a separate header
311        //              to indicate repair (RTX) stream.
312        // B) Mid+PT - when not doing simulcast, the PT identifies whether
313        //             this is a repair stream.
314
315        let Some(mid) = header.ext_vals.mid else {
316            return;
317        };
318        let rid = header.ext_vals.rid.or(header.ext_vals.rid_repair);
319
320        // The media the mid points out. Bail if the mid points to something
321        // we don't know about.
322        let Some(media) = self.medias.iter_mut().find(|m| m.mid() == mid) else {
323            return;
324        };
325
326        // Figure out which payload the PT maps to. Either main or RTX.
327        let maybe_payload = self
328            .codec_config
329            .iter()
330            .find(|p| p.pt() == header.payload_type || p.resend() == Some(header.payload_type));
331
332        // If we don't find it, bail out.
333        let Some(payload) = maybe_payload else {
334            return;
335        };
336
337        if let Some(rid) = rid {
338            // Case A - use the rid_repair header to identify RTX.
339            let is_main = header.ext_vals.rid.is_some();
340
341            let midrid = MidRid(mid, Some(rid));
342
343            self.streams
344                .map_dynamic_by_rid(header.ssrc, midrid, media, *payload, is_main);
345        } else {
346            // Case B - the payload type identifies RTX.
347            let is_main = payload.pt() == header.payload_type;
348
349            let midrid = MidRid(mid, None);
350
351            self.streams
352                .map_dynamic_by_pt(header.ssrc, midrid, media, *payload, is_main);
353        }
354    }
355
356    pub(crate) fn handle_rtp(&mut self, now: Instant, mut header: RtpHeader, buf: &[u8]) {
357        // Rewrite absolute-send-time (if present) to be relative to now.
358        header.ext_vals.update_absolute_send_time(now);
359
360        trace!("Handle RTP: {:?}", header);
361
362        // The ssrc is the _main_ ssrc (no the rtx, that might be in the header).
363        let Some((mid, ssrc)) = self.mid_and_ssrc_for_header(now, &header) else {
364            debug!("No mid/SSRC for header: {:?}", header);
365            return;
366        };
367
368        let srtp = match self.srtp_rx.as_mut() {
369            Some(v) => v,
370            None => {
371                trace!("Rejecting SRTP while missing SrtpContext");
372                return;
373            }
374        };
375
376        // Both of these unwraps are fine because mid_and_ssrc_for_header guarantees it.
377        let media = self.medias.iter_mut().find(|m| m.mid() == mid).unwrap();
378        let stream = self.streams.stream_rx(&ssrc).unwrap();
379
380        let params = match main_payload_params(&self.codec_config, header.payload_type) {
381            Some(p) => p,
382            None => {
383                trace!(
384                    "No payload params could be found (main or RTX) for {:?}",
385                    header.payload_type
386                );
387                return;
388            }
389        };
390        let clock_rate = params.spec().clock_rate;
391        let pt = params.pt();
392        let is_repair = pt != header.payload_type;
393
394        let max_seq_lookup = make_max_seq_lookup(&self.max_rx_seq_lookup);
395
396        // is_repair controls whether update is updating the main register or the RTX register.
397        // Either way we get a seq_no_outer which is used to decrypt the SRTP.
398        let mut seq_no = stream.extend_seq(&header, is_repair, max_seq_lookup);
399
400        if !stream.is_new_packet(is_repair, seq_no) {
401            // Dupe packet. This could be a potential SRTP replay attack, which means
402            // we should not spend any CPU cycles towards decrypting it.
403            trace!(
404                "Ignoring dupe packet mid: {} seq_no: {} is_repair: {}",
405                mid,
406                seq_no,
407                is_repair
408            );
409            return;
410        }
411
412        let mut data = match srtp.unprotect_rtp(buf, &header, *seq_no) {
413            Some(v) => v,
414            None => {
415                trace!(
416                    "Failed to unprotect SRTP for SSRC: {} pt: {}  mid: {} \
417                    rid: {:?} seq_no: {} is_repair: {}",
418                    header.ssrc,
419                    pt,
420                    stream.mid(),
421                    stream.rid(),
422                    seq_no,
423                    is_repair
424                );
425                return;
426            }
427        };
428
429        if header.has_padding && !RtpHeader::unpad_payload(&mut data) {
430            // Unpadding failed. Broken data?
431            trace!("unpadding of unprotected payload failed");
432            return;
433        }
434
435        if let Some(raw_packets) = &mut self.raw_packets {
436            raw_packets.push_back(Box::new(RawPacket::RtpRx(header.clone(), data.clone())));
437        }
438
439        // Mark as received for TWCC purposes
440        if let Some(transport_cc) = header.ext_vals.transport_cc {
441            let prev = self.twcc_rx_register.max_seq();
442            let extended = extend_u16(Some(*prev), transport_cc);
443            self.twcc_rx_register.update_seq(extended.into(), now);
444        }
445
446        // Store largest seen seq_no for the SSRC. This is used in case we get SSRC changes
447        // like A -> B -> A. When we go back to A, we must keep the ROC.
448        update_max_seq(&mut self.max_rx_seq_lookup, header.ssrc, seq_no);
449
450        // Register reception in nack registers.
451        let receipt_outer = stream.update_register(now, &header, clock_rate, is_repair, seq_no);
452
453        // RTX packets must be rewritten to be a normal packet. This only changes the
454        // the seq_no, however MediaTime might be different when interpreted against the
455        // the "main" register.
456        let receipt = if is_repair {
457            // Drop RTX packets that are just empty padding. The payload here
458            // is empty because we would have done RtpHeader::unpad_payload above.
459            // For unpausing, it's enough with the stream.update() already done above.
460            if data.is_empty() {
461                return;
462            }
463
464            // Rewrite the header, and removes the resent seq_no from the body.
465            stream.un_rtx(&mut header, &mut data, pt);
466
467            let max_seq_lookup = make_max_seq_lookup(&self.max_rx_seq_lookup);
468
469            // Header has changed, which means we extend a new seq_no. This time
470            // without is_repair since this is the wrapped resend. This is the
471            // extended number of the main stream.
472            seq_no = stream.extend_seq(&header, false, max_seq_lookup);
473
474            // header.ssrc is changed by un_rtx() above to be main SSRC.
475            update_max_seq(&mut self.max_rx_seq_lookup, header.ssrc, seq_no);
476
477            // Now update the "main" register with the repaired packet info.
478            stream.update_register(now, &header, clock_rate, false, seq_no)
479        } else {
480            // This is not RTX, the outer seq and time is what we use. The first
481            // stream.update will have updated the main register.
482            receipt_outer
483        };
484
485        let packet = stream.handle_rtp(now, header, data, seq_no, receipt.time);
486
487        if self.rtp_mode {
488            // In RTP mode, we store the packet temporarily here for the next poll_output().
489            // However only if this is a packet not seen before. This filters out spurious
490            // resends for padding.
491            if receipt.is_new_packet {
492                self.pending_packet = Some(packet);
493            }
494        } else {
495            // In non-RTP mode, we let the Media use a Depayloader.
496            media.depayload(
497                stream.rid(),
498                packet,
499                self.reordering_size_audio,
500                self.reordering_size_video,
501                &self.codec_config,
502            );
503        }
504    }
505
506    fn handle_rtcp(&mut self, now: Instant, buf: &[u8]) -> Option<()> {
507        let srtp: &mut SrtpContext = self.srtp_rx.as_mut()?;
508        let unprotected = srtp.unprotect_rtcp(buf)?;
509
510        Rtcp::read_packet(&unprotected, &mut self.feedback_rx);
511        let mut need_configure_pacer = false;
512
513        if let Some(raw_packets) = &mut self.raw_packets {
514            for fb in &self.feedback_rx {
515                raw_packets.push_back(Box::new(RawPacket::RtcpRx(fb.clone())));
516            }
517        }
518
519        for fb in RtcpFb::from_rtcp(self.feedback_rx.drain(..)) {
520            if let RtcpFb::Twcc(twcc) = fb {
521                trace!("Handle TWCC: {:?}", twcc);
522                let maybe_records = self.twcc_tx_register.apply_report(twcc, now);
523
524                if let (Some(maybe_records), Some(bwe)) = (maybe_records, &mut self.bwe) {
525                    bwe.update(maybe_records, now);
526                }
527                need_configure_pacer = true;
528
529                // The funky thing about TWCC reports is that they are never stapled
530                // together with other RTCP packet. If they were though, we want to
531                // handle more packets.
532                continue;
533            }
534
535            if fb.is_for_rx() {
536                let Some(stream) = self.streams.stream_rx(&fb.ssrc()) else {
537                    continue;
538                };
539                stream.handle_rtcp(now, fb);
540            } else {
541                let Some(stream) = self.streams.stream_tx(&fb.ssrc()) else {
542                    continue;
543                };
544                stream.handle_rtcp(now, fb);
545            }
546        }
547
548        // Not in the above if due to lifetime issues, still okay because the method
549        // doesn't do anything when BWE isn't configured.
550        if need_configure_pacer {
551            self.configure_pacer();
552        }
553
554        Some(())
555    }
556
557    pub fn poll_event(&mut self) -> Option<Event> {
558        if let Some(bitrate_estimate) = self.bwe.as_mut().and_then(|bwe| bwe.poll_estimate()) {
559            return Some(Event::EgressBitrateEstimate(BweKind::Twcc(
560                bitrate_estimate,
561            )));
562        }
563
564        // If we're not ready to flow media, don't send any events.
565        if !self.ready_for_srtp() {
566            return None;
567        }
568
569        if let Some(raw_packets) = &mut self.raw_packets {
570            if let Some(p) = raw_packets.pop_front() {
571                return Some(Event::RawPacket(p));
572            }
573        }
574
575        // This must be before pending_packet.take() since we need to emit the unpaused event
576        // before the first packet causing the unpause.
577        if let Some(paused) = self.streams.poll_stream_paused() {
578            return Some(Event::StreamPaused(paused));
579        }
580
581        if self.rtp_mode {
582            if let Some(packet) = self.pending_packet.take() {
583                return Some(Event::RtpPacket(packet));
584            }
585        }
586
587        if let Some(req) = self.streams.poll_keyframe_request() {
588            return Some(Event::KeyframeRequest(req));
589        }
590
591        if let Some((mid, bitrate)) = self.streams.poll_remb_request() {
592            return Some(Event::EgressBitrateEstimate(BweKind::Remb(mid, bitrate)));
593        }
594
595        for media in &mut self.medias {
596            if media.need_open_event {
597                media.need_open_event = false;
598
599                return Some(Event::MediaAdded(MediaAdded {
600                    mid: media.mid(),
601                    kind: media.kind(),
602                    direction: media.direction(),
603                    simulcast: media.simulcast().map(|s| s.clone().into()),
604                }));
605            }
606
607            if media.need_changed_event {
608                media.need_changed_event = false;
609                return Some(Event::MediaChanged(MediaChanged {
610                    mid: media.mid(),
611                    direction: media.direction(),
612                }));
613            }
614        }
615
616        None
617    }
618
619    pub fn poll_event_fallible(&mut self) -> Result<Option<Event>, RtcError> {
620        // Not relevant in rtp_mode, where the packets are picked up by poll_event().
621        if self.rtp_mode {
622            return Ok(None);
623        }
624
625        for media in &mut self.medias {
626            if let Some(e) = media.poll_sample(&self.codec_config)? {
627                return Ok(Some(Event::MediaData(e)));
628            }
629        }
630
631        Ok(None)
632    }
633
634    fn ready_for_srtp(&self) -> bool {
635        self.srtp_rx.is_some() && self.srtp_tx.is_some()
636    }
637
638    pub fn poll_datagram(&mut self, now: Instant) -> Option<net::DatagramSend> {
639        // Time must have progressed forward from start value.
640        if now == already_happened() {
641            return None;
642        }
643
644        let x = None
645            .or_else(|| self.poll_feedback())
646            .or_else(|| self.poll_packet(now));
647
648        if let Some(x) = &x {
649            // In RTP mode we trust the API user feeds the RTP packet sizes they
650            // need for the MTU they are targeting. This warning is only for when
651            // str0m does the RTP packetization.
652            if !self.rtp_mode && x.len() > DATAGRAM_MTU_WARN {
653                warn!("RTP above MTU {}: {}", DATAGRAM_MTU_WARN, x.len());
654            }
655        }
656
657        x
658    }
659
660    /// To be called in lieu of [`Self::poll_datagram`] when the owner is not in a position to transmit any
661    /// generated feedback, and thus such feedback should be dropped.
662    pub fn clear_feedback(&mut self) {
663        self.feedback_rx.clear();
664        self.feedback_tx.clear();
665    }
666
667    fn poll_feedback(&mut self) -> Option<net::DatagramSend> {
668        if self.feedback_tx.is_empty() {
669            return None;
670        }
671
672        // Round to nearest multiple of 4 bytes.
673        const ENCRYPTABLE_MTU: usize = (DATAGRAM_MTU - SRTCP_OVERHEAD) & !3;
674        assert!(ENCRYPTABLE_MTU % 4 == 0);
675
676        let mut data = vec![0_u8; ENCRYPTABLE_MTU];
677
678        let mut raw_packets = self.raw_packets.as_mut();
679        let output = move |fb| {
680            if let Some(raw_packets) = &mut raw_packets {
681                raw_packets.push_back(Box::new(RawPacket::RtcpTx(fb)));
682            }
683        };
684
685        let len = Rtcp::write_packet(&mut self.feedback_tx, &mut data, output);
686
687        if len == 0 {
688            return None;
689        }
690
691        data.truncate(len);
692
693        let srtp = self.srtp_tx.as_mut()?;
694        let protected = srtp.protect_rtcp(&data);
695
696        assert!(
697            protected.len() < DATAGRAM_MTU,
698            "Encrypted SRTCP should be less than MTU"
699        );
700
701        Some(protected.into())
702    }
703
704    fn poll_packet(&mut self, now: Instant) -> Option<DatagramSend> {
705        let srtp_tx = self.srtp_tx.as_mut()?;
706
707        // Figure out which, if any, queue to poll
708        let midrid = self.pacer.poll_queue()?;
709        let media = self
710            .medias
711            .iter()
712            .find(|m| m.mid() == midrid.mid())
713            .expect("index is media");
714
715        let buf = &mut self.poll_packet_buf;
716        let twcc_seq = self.twcc;
717
718        let stream = self.streams.stream_tx_by_midrid(midrid)?;
719
720        let params = &self.codec_config;
721        let exts = media.remote_extmap();
722
723        // TWCC might not be enabled for this m-line. Firefox do use TWCC, but not
724        // for audio. This is indiciated via the SDP.
725        let twcc_enabled = exts.id_of(Extension::TransportSequenceNumber).is_some();
726        let twcc = twcc_enabled.then_some(&mut self.twcc);
727
728        let receipt = stream.poll_packet(now, exts, twcc, params, buf)?;
729
730        let PacketReceipt {
731            header,
732            seq_no,
733            is_padding,
734            payload_size,
735        } = receipt;
736
737        trace!(payload_size, is_padding, "Poll RTP: {:?}", header);
738
739        #[cfg(feature = "_internal_dont_use_log_stats")]
740        {
741            let kind = if is_padding { "padding" } else { "media" };
742
743            crate::log_stat!("PACKET_SENT", header.ssrc, payload_size, kind);
744        }
745
746        self.pacer.register_send(now, payload_size.into(), midrid);
747
748        if let Some(raw_packets) = &mut self.raw_packets {
749            raw_packets.push_back(Box::new(RawPacket::RtpTx(header.clone(), buf.clone())));
750        }
751
752        let protected = srtp_tx.protect_rtp(buf, &header, *seq_no);
753
754        if twcc_enabled {
755            self.twcc_tx_register
756                .register_seq(twcc_seq.into(), now, payload_size);
757        }
758
759        // Technically we should wait for the next handle_timeout, but this speeds things up a bit
760        // avoiding an extra poll_timeout.
761        self.update_queue_state(now);
762
763        Some(protected.into())
764    }
765
766    pub fn poll_timeout(&mut self) -> (Option<Instant>, Reason) {
767        let feedback_at = self.regular_feedback_at();
768        let nack_at = self.nack_at();
769        let twcc_at = self.twcc_at();
770        let pacing_at = self.pacer.poll_timeout();
771        let packetize_at = self.medias.iter().flat_map(|m| m.poll_timeout()).next();
772        let bwe_at = self.bwe.as_ref().map(|bwe| bwe.poll_timeout());
773        let paused_at = self.paused_at();
774        let send_stream_at = self.streams.send_stream();
775
776        (feedback_at, Reason::Feedback)
777            .soonest((nack_at, Reason::Nack))
778            .soonest((twcc_at, Reason::Twcc))
779            .soonest((pacing_at, Reason::Pacing))
780            .soonest((packetize_at, Reason::Packetize))
781            .soonest((bwe_at, Reason::Bwe))
782            .soonest((paused_at, Reason::PauseCheck))
783            .soonest((send_stream_at, Reason::SendStream))
784    }
785
786    pub fn has_mid(&self, mid: Mid) -> bool {
787        self.medias.iter().any(|m| m.mid() == mid)
788    }
789
790    fn regular_feedback_at(&self) -> Option<Instant> {
791        self.streams.regular_feedback_at()
792    }
793
794    fn paused_at(&self) -> Option<Instant> {
795        self.streams.paused_at()
796    }
797
798    fn nack_at(&mut self) -> Option<Instant> {
799        if !self.streams.any_nack_enabled() {
800            return None;
801        }
802
803        Some(self.last_nack + NACK_MIN_INTERVAL)
804    }
805
806    fn twcc_at(&self) -> Option<Instant> {
807        let is_receiving = self.streams.is_receiving();
808        if is_receiving && self.enable_twcc_feedback && self.twcc_rx_register.has_unreported() {
809            Some(self.last_twcc + TWCC_INTERVAL)
810        } else {
811            None
812        }
813    }
814
815    pub fn enable_twcc_feedback(&mut self) {
816        if !self.enable_twcc_feedback {
817            debug!("Enable TWCC feedback");
818            self.enable_twcc_feedback = true;
819        }
820    }
821
822    pub fn visit_stats(&mut self, now: Instant, snapshot: &mut StatsSnapshot) {
823        for stream in self.streams.streams_tx() {
824            stream.visit_stats(snapshot, now);
825        }
826
827        for stream in self.streams.streams_rx() {
828            stream.visit_stats(snapshot, now);
829        }
830
831        snapshot.tx = snapshot.egress.values().map(|s| s.bytes).sum();
832        snapshot.rx = snapshot.ingress.values().map(|s| s.bytes).sum();
833        snapshot.bwe_tx = self.bwe.as_ref().and_then(|bwe| bwe.last_estimate());
834
835        snapshot.egress_loss_fraction = self.twcc_tx_register.loss(Duration::from_secs(1), now);
836        snapshot.rtt = self.twcc_tx_register.rtt();
837        snapshot.ingress_loss_fraction = self.twcc_rx_register.loss();
838    }
839
840    pub fn set_bwe_current_bitrate(&mut self, current_bitrate: Bitrate) {
841        if let Some(bwe) = self.bwe.as_mut() {
842            bwe.current_bitrate = current_bitrate;
843            self.configure_pacer();
844        }
845    }
846
847    pub fn set_bwe_desired_bitrate(&mut self, desired_bitrate: Bitrate) {
848        if let Some(bwe) = self.bwe.as_mut() {
849            bwe.desired_bitrate = desired_bitrate;
850            self.configure_pacer();
851        }
852    }
853
854    pub fn reset_bwe(&mut self, init_bitrate: Bitrate) {
855        if let Some(bwe) = self.bwe.as_mut() {
856            bwe.reset(init_bitrate);
857        }
858    }
859
860    pub fn line_count(&self) -> usize {
861        self.medias.len() + if self.app.is_some() { 1 } else { 0 }
862    }
863
864    pub fn add_media(&mut self, media: Media) {
865        self.medias.push(media);
866    }
867
868    pub fn medias(&self) -> &[Media] {
869        &self.medias
870    }
871
872    pub fn remove_media(&mut self, mid: Mid) {
873        self.medias.retain(|media| media.mid() != mid);
874        self.streams.remove_streams_by_mid(mid);
875    }
876
877    fn configure_pacer(&mut self) {
878        let Some(bwe) = self.bwe.as_ref() else {
879            return;
880        };
881
882        let padding_rate = bwe
883            .last_estimate()
884            .map(|estimate| estimate.min(bwe.desired_bitrate))
885            .unwrap_or(Bitrate::ZERO);
886
887        self.pacer.set_padding_rate(padding_rate);
888
889        // We pad up to the pacing rate, therefore we need to increase pacing if the estimate, and
890        // thus the padding rate, exceeds the current bitrate adjusted with the pacing factor.
891        // Otherwise we can have a case where the current bitrate is 250Kbit/s resulting in a
892        // pacing rate of 275KBit/s which means we'll only ever pad about 25Kbit/s. If the estimate
893        // is actually 600Kbit/s we need to use that for the pacing rate to ensure we send as much as
894        // we think the link capacity can sustain, if not the estimate is a lie.
895        let pacing_rate = (bwe.current_bitrate * PACING_FACTOR).max(padding_rate);
896        self.pacer.set_pacing_rate(pacing_rate);
897    }
898
899    pub fn media_by_mid(&self, mid: Mid) -> Option<&Media> {
900        self.medias.iter().find(|m| m.mid() == mid)
901    }
902
903    pub fn media_by_mid_mut(&mut self, mid: Mid) -> Option<&mut Media> {
904        self.medias.iter_mut().find(|m| m.mid() == mid)
905    }
906
907    fn do_payload(&mut self) -> Result<(), RtcError> {
908        for m in &mut self.medias {
909            m.do_payload(&mut self.streams, &self.codec_config)?;
910        }
911
912        Ok(())
913    }
914
915    pub fn set_direction(&mut self, mid: Mid, direction: Direction) -> bool {
916        let Some(media) = self.media_by_mid_mut(mid) else {
917            return false;
918        };
919        let old_dir = media.direction();
920        if old_dir == direction {
921            return false;
922        }
923
924        media.set_direction(direction);
925
926        if old_dir.is_sending() && !direction.is_sending() {
927            self.streams.reset_buffers_tx(mid);
928        }
929
930        let max_seq_lookup = make_max_seq_lookup(&self.max_rx_seq_lookup);
931        if old_dir.is_receiving() && !direction.is_receiving() {
932            self.streams.reset_buffers_rx(mid, max_seq_lookup);
933        }
934
935        true
936    }
937
938    pub fn is_request_keyframe_possible(&self, kind: KeyframeRequestKind) -> bool {
939        // TODO: It's possible to have different set of feedback enabled for different
940        // payload types. I.e. we could have FIR enabled for H264, but not for VP8.
941        // We might want to make this check more fine grained by testing which PT is
942        // in "active use" right now.
943        self.codec_config.iter().any(|r| match kind {
944            KeyframeRequestKind::Pli => r.fb_pli,
945            KeyframeRequestKind::Fir => r.fb_fir,
946        })
947    }
948
949    /// Checks whether the SRTP contexts are up.
950    pub fn is_connected(&self) -> bool {
951        self.srtp_rx.is_some() && self.srtp_tx.is_some()
952    }
953}
954
955struct Bwe {
956    bwe: SendSideBandwithEstimator,
957    desired_bitrate: Bitrate,
958    current_bitrate: Bitrate,
959
960    last_emitted_estimate: Bitrate,
961}
962
963impl Bwe {
964    fn handle_timeout(&mut self, now: Instant) {
965        self.bwe.handle_timeout(now);
966    }
967
968    fn reset(&mut self, init_bitrate: Bitrate) {
969        self.bwe.reset(init_bitrate);
970    }
971
972    fn update<'t>(
973        &mut self,
974        records: impl Iterator<Item = &'t crate::rtp_::TwccSendRecord>,
975        now: Instant,
976    ) {
977        self.bwe.update(records, now);
978    }
979
980    fn poll_estimate(&mut self) -> Option<Bitrate> {
981        let estimate = self.bwe.last_estimate()?;
982
983        let min = self.last_emitted_estimate * (1.0 - ESTIMATE_TOLERANCE);
984        let max = self.last_emitted_estimate * (1.0 + ESTIMATE_TOLERANCE);
985
986        if estimate < min || estimate > max {
987            self.last_emitted_estimate = estimate;
988            Some(estimate)
989        } else {
990            // Estimate is within tolerances.
991            None
992        }
993    }
994
995    fn poll_timeout(&self) -> Instant {
996        self.bwe.poll_timeout()
997    }
998
999    fn last_estimate(&self) -> Option<Bitrate> {
1000        self.bwe.last_estimate()
1001    }
1002}
1003
1004pub struct PacketReceipt {
1005    pub header: RtpHeader,
1006    pub seq_no: SeqNo,
1007    pub is_padding: bool,
1008    pub payload_size: usize,
1009}
1010
1011/// Find the PayloadParams for the given Pt, either when the Pt is the main Pt for the Codec or
1012/// when it's the RTX Pt.
1013fn main_payload_params(c: &CodecConfig, pt: Pt) -> Option<&PayloadParams> {
1014    c.iter().find(|p| p.pt == pt || p.resend == Some(pt))
1015}
1016
1017fn make_max_seq_lookup(map: &HashMap<Ssrc, SeqNo>) -> impl Fn(Ssrc) -> Option<SeqNo> + '_ {
1018    |ssrc| map.get(&ssrc).cloned()
1019}
1020
1021fn update_max_seq(map: &mut HashMap<Ssrc, SeqNo>, ssrc: Ssrc, seq_no: SeqNo) {
1022    let current = map.entry(ssrc).or_insert(seq_no);
1023    if seq_no > *current {
1024        *current = seq_no;
1025    }
1026}