1use std::collections::{HashMap, VecDeque};
2use std::time::{Duration, Instant};
3
4use crate::bwe::BweKind;
5use crate::crypto::SrtpProfile;
6use crate::crypto::{KeyingMaterial, SrtpCrypto};
7use crate::format::CodecConfig;
8use crate::format::PayloadParams;
9use crate::io::{DatagramSend, DATAGRAM_MTU, DATAGRAM_MTU_WARN};
10use crate::media::KeyframeRequestKind;
11use crate::media::Media;
12use crate::media::{MediaAdded, MediaChanged};
13use crate::packet::SendSideBandwithEstimator;
14use crate::packet::{LeakyBucketPacer, NullPacer, Pacer, PacerImpl};
15use crate::rtp::{Extension, RawPacket};
16use crate::rtp_::Direction;
17use crate::rtp_::MidRid;
18use crate::rtp_::Pt;
19use crate::rtp_::SeqNo;
20use crate::rtp_::SRTCP_OVERHEAD;
21use crate::rtp_::{extend_u16, RtpHeader, SessionId, TwccRecvRegister, TwccSendRegister};
22use crate::rtp_::{Bitrate, ExtensionMap, Mid, Rtcp, RtcpFb};
23use crate::rtp_::{SrtpContext, Ssrc};
24use crate::stats::StatsSnapshot;
25use crate::streams::{RtpPacket, Streams};
26use crate::util::{already_happened, not_happening, Soonest};
27use crate::Event;
28use crate::{net, Reason};
29use crate::{RtcConfig, RtcError};
30
31const NACK_MIN_INTERVAL: Duration = Duration::from_millis(33);
35
36const TWCC_INTERVAL: Duration = Duration::from_millis(100);
38
39const PACING_FACTOR: f64 = 1.1;
41
42const ESTIMATE_TOLERANCE: f64 = 0.05;
45
46pub(crate) struct Session {
47 id: SessionId,
48
49 pub medias: Vec<Media>,
53
54 pub streams: Streams,
56
57 app: Option<(Mid, usize)>,
59
60 reordering_size_audio: usize,
61 reordering_size_video: usize,
62 pub send_buffer_audio: usize,
63 pub send_buffer_video: usize,
64
65 pub exts: ExtensionMap,
68
69 pub codec_config: CodecConfig,
71
72 srtp_rx: Option<SrtpContext>,
73 srtp_tx: Option<SrtpContext>,
74 last_nack: Instant,
75 last_twcc: Instant,
76 twcc: u64,
77 twcc_rx_register: TwccRecvRegister,
78 twcc_tx_register: TwccSendRegister,
79 max_rx_seq_lookup: HashMap<Ssrc, SeqNo>,
80
81 bwe: Option<Bwe>,
82
83 enable_twcc_feedback: bool,
84
85 pacer: PacerImpl,
87
88 poll_packet_buf: Vec<u8>,
90
91 pending_packet: Option<RtpPacket>,
93
94 pub ice_lite: bool,
95
96 pub rtp_mode: bool,
98
99 feedback_tx: VecDeque<Rtcp>,
100 feedback_rx: VecDeque<Rtcp>,
101
102 raw_packets: Option<VecDeque<Box<RawPacket>>>,
103}
104
105impl Session {
106 pub fn new(config: &RtcConfig) -> Self {
107 let mut id = SessionId::new();
108 const MAX_ID: u64 = 2_u64.pow(62) - 1;
110 while *id > MAX_ID {
111 id = (*id >> 1).into();
112 }
113 let (pacer, bwe) = if let Some(config) = &config.bwe_config {
114 let rate = config.initial_bitrate;
115 let pacer = PacerImpl::LeakyBucket(LeakyBucketPacer::new(rate * PACING_FACTOR * 2.0));
116
117 let send_side_bwe = SendSideBandwithEstimator::new(rate, config.enable_loss_controller);
118 let bwe = Bwe {
119 bwe: send_side_bwe,
120 desired_bitrate: Bitrate::ZERO,
121 current_bitrate: rate,
122
123 last_emitted_estimate: Bitrate::ZERO,
124 };
125
126 (pacer, Some(bwe))
127 } else {
128 (PacerImpl::Null(NullPacer::default()), None)
129 };
130
131 let enable_stats = config.stats_interval.is_some();
132
133 Session {
134 id,
135 medias: vec![],
136 streams: Streams::new(enable_stats),
137 app: None,
138 reordering_size_audio: config.reordering_size_audio,
139 reordering_size_video: config.reordering_size_video,
140 send_buffer_audio: config.send_buffer_audio,
141 send_buffer_video: config.send_buffer_video,
142 exts: config.exts.clone(),
143
144 codec_config: config.codec_config.clone(),
147
148 srtp_rx: None,
149 srtp_tx: None,
150 last_nack: already_happened(),
151 last_twcc: already_happened(),
152 twcc: 0,
153 twcc_rx_register: TwccRecvRegister::new(100),
154 twcc_tx_register: TwccSendRegister::new(1000),
155 max_rx_seq_lookup: HashMap::new(),
156 bwe,
157 enable_twcc_feedback: false,
158 pacer,
159 poll_packet_buf: vec![0; 2000],
160 pending_packet: None,
161 ice_lite: config.ice_lite,
162 rtp_mode: config.rtp_mode,
163 feedback_tx: VecDeque::new(),
164 feedback_rx: VecDeque::new(),
165 raw_packets: if config.enable_raw_packets {
166 Some(VecDeque::new())
167 } else {
168 None
169 },
170 }
171 }
172
173 pub fn id(&self) -> SessionId {
174 self.id
175 }
176
177 pub fn set_app(&mut self, mid: Mid, index: usize) -> Result<(), String> {
178 if let Some((mid_existing, index_existing)) = self.app {
179 if mid_existing != mid {
180 return Err(format!("App mid changed {} != {}", mid, mid_existing,));
181 }
182 if index_existing != index {
183 return Err(format!("App index changed {} != {}", index, index_existing,));
184 }
185 } else {
186 self.app = Some((mid, index));
187 }
188 Ok(())
189 }
190
191 pub fn app(&self) -> &Option<(Mid, usize)> {
192 &self.app
193 }
194
195 pub fn set_keying_material(
196 &mut self,
197 mat: KeyingMaterial,
198 srtp_crypto: &SrtpCrypto,
199 srtp_profile: SrtpProfile,
200 active: bool,
201 ) {
202 let left = active;
206
207 self.srtp_rx = Some(SrtpContext::new(srtp_crypto, srtp_profile, &mat, !left));
208 self.srtp_tx = Some(SrtpContext::new(srtp_crypto, srtp_profile, &mat, left));
209 }
210
211 pub fn handle_timeout(&mut self, now: Instant) -> Result<(), RtcError> {
212 self.do_payload()?;
214
215 let sender_ssrc = self.streams.first_ssrc_local();
216
217 let do_nack = now >= self.nack_at().unwrap_or(not_happening());
218
219 self.streams.handle_timeout(
220 now,
221 sender_ssrc,
222 do_nack,
223 &self.medias,
224 &self.codec_config,
225 &mut self.feedback_tx,
226 );
227
228 if do_nack {
229 self.last_nack = now;
230 }
231
232 self.update_queue_state(now);
233
234 if let Some(twcc_at) = self.twcc_at() {
235 if now >= twcc_at {
236 self.create_twcc_feedback(sender_ssrc, now);
237 }
238 }
239
240 if let Some(bwe) = self.bwe.as_mut() {
241 bwe.handle_timeout(now);
242 }
243
244 Ok(())
245 }
246
247 fn update_queue_state(&mut self, now: Instant) {
248 let iter = self.streams.streams_tx().map(|m| m.queue_state(now));
249
250 let Some(padding_request) = self.pacer.handle_timeout(now, iter) else {
251 return;
252 };
253
254 let stream = self
255 .streams
256 .stream_tx_by_midrid(padding_request.midrid)
257 .expect("pacer to use an existing stream");
258
259 stream.generate_padding(padding_request.padding);
260 }
261
262 fn create_twcc_feedback(&mut self, sender_ssrc: Ssrc, now: Instant) -> Option<()> {
263 self.last_twcc = now;
264 let mut twcc = self.twcc_rx_register.build_report(DATAGRAM_MTU - 100)?;
265
266 twcc.sender_ssrc = sender_ssrc;
269 twcc.ssrc = self.streams.first_ssrc_remote();
270
271 trace!("Created feedback TWCC: {:?}", twcc);
272 self.feedback_tx.push_front(Rtcp::Twcc(twcc));
273 Some(())
274 }
275
276 pub fn handle_rtp_receive(&mut self, now: Instant, message: &[u8]) {
277 let Some(header) = RtpHeader::parse(message, &self.exts) else {
278 trace!("Failed to parse RTP header");
279 return;
280 };
281
282 self.handle_rtp(now, header, message);
283 }
284
285 pub fn handle_rtcp_receive(&mut self, now: Instant, message: &[u8]) {
286 self.handle_rtcp(now, message);
290 }
291
292 fn mid_and_ssrc_for_header(&mut self, now: Instant, header: &RtpHeader) -> Option<(Mid, Ssrc)> {
293 let ssrc_header = header.ssrc;
294
295 if let Some(r) = self.streams.mid_ssrc_rx_by_ssrc_or_rtx(now, ssrc_header) {
296 return Some(r);
297 }
298
299 self.map_dynamic(header);
301
302 self.streams.mid_ssrc_rx_by_ssrc_or_rtx(now, ssrc_header)
304 }
305
306 fn map_dynamic(&mut self, header: &RtpHeader) {
307 let Some(mid) = header.ext_vals.mid else {
316 return;
317 };
318 let rid = header.ext_vals.rid.or(header.ext_vals.rid_repair);
319
320 let Some(media) = self.medias.iter_mut().find(|m| m.mid() == mid) else {
323 return;
324 };
325
326 let maybe_payload = self
328 .codec_config
329 .iter()
330 .find(|p| p.pt() == header.payload_type || p.resend() == Some(header.payload_type));
331
332 let Some(payload) = maybe_payload else {
334 return;
335 };
336
337 if let Some(rid) = rid {
338 let is_main = header.ext_vals.rid.is_some();
340
341 let midrid = MidRid(mid, Some(rid));
342
343 self.streams
344 .map_dynamic_by_rid(header.ssrc, midrid, media, *payload, is_main);
345 } else {
346 let is_main = payload.pt() == header.payload_type;
348
349 let midrid = MidRid(mid, None);
350
351 self.streams
352 .map_dynamic_by_pt(header.ssrc, midrid, media, *payload, is_main);
353 }
354 }
355
356 pub(crate) fn handle_rtp(&mut self, now: Instant, mut header: RtpHeader, buf: &[u8]) {
357 header.ext_vals.update_absolute_send_time(now);
359
360 trace!("Handle RTP: {:?}", header);
361
362 let Some((mid, ssrc)) = self.mid_and_ssrc_for_header(now, &header) else {
364 debug!("No mid/SSRC for header: {:?}", header);
365 return;
366 };
367
368 let srtp = match self.srtp_rx.as_mut() {
369 Some(v) => v,
370 None => {
371 trace!("Rejecting SRTP while missing SrtpContext");
372 return;
373 }
374 };
375
376 let media = self.medias.iter_mut().find(|m| m.mid() == mid).unwrap();
378 let stream = self.streams.stream_rx(&ssrc).unwrap();
379
380 let params = match main_payload_params(&self.codec_config, header.payload_type) {
381 Some(p) => p,
382 None => {
383 trace!(
384 "No payload params could be found (main or RTX) for {:?}",
385 header.payload_type
386 );
387 return;
388 }
389 };
390 let clock_rate = params.spec().clock_rate;
391 let pt = params.pt();
392 let is_repair = pt != header.payload_type;
393
394 let max_seq_lookup = make_max_seq_lookup(&self.max_rx_seq_lookup);
395
396 let mut seq_no = stream.extend_seq(&header, is_repair, max_seq_lookup);
399
400 if !stream.is_new_packet(is_repair, seq_no) {
401 trace!(
404 "Ignoring dupe packet mid: {} seq_no: {} is_repair: {}",
405 mid,
406 seq_no,
407 is_repair
408 );
409 return;
410 }
411
412 let mut data = match srtp.unprotect_rtp(buf, &header, *seq_no) {
413 Some(v) => v,
414 None => {
415 trace!(
416 "Failed to unprotect SRTP for SSRC: {} pt: {} mid: {} \
417 rid: {:?} seq_no: {} is_repair: {}",
418 header.ssrc,
419 pt,
420 stream.mid(),
421 stream.rid(),
422 seq_no,
423 is_repair
424 );
425 return;
426 }
427 };
428
429 if header.has_padding && !RtpHeader::unpad_payload(&mut data) {
430 trace!("unpadding of unprotected payload failed");
432 return;
433 }
434
435 if let Some(raw_packets) = &mut self.raw_packets {
436 raw_packets.push_back(Box::new(RawPacket::RtpRx(header.clone(), data.clone())));
437 }
438
439 if let Some(transport_cc) = header.ext_vals.transport_cc {
441 let prev = self.twcc_rx_register.max_seq();
442 let extended = extend_u16(Some(*prev), transport_cc);
443 self.twcc_rx_register.update_seq(extended.into(), now);
444 }
445
446 update_max_seq(&mut self.max_rx_seq_lookup, header.ssrc, seq_no);
449
450 let receipt_outer = stream.update_register(now, &header, clock_rate, is_repair, seq_no);
452
453 let receipt = if is_repair {
457 if data.is_empty() {
461 return;
462 }
463
464 stream.un_rtx(&mut header, &mut data, pt);
466
467 let max_seq_lookup = make_max_seq_lookup(&self.max_rx_seq_lookup);
468
469 seq_no = stream.extend_seq(&header, false, max_seq_lookup);
473
474 update_max_seq(&mut self.max_rx_seq_lookup, header.ssrc, seq_no);
476
477 stream.update_register(now, &header, clock_rate, false, seq_no)
479 } else {
480 receipt_outer
483 };
484
485 let packet = stream.handle_rtp(now, header, data, seq_no, receipt.time);
486
487 if self.rtp_mode {
488 if receipt.is_new_packet {
492 self.pending_packet = Some(packet);
493 }
494 } else {
495 media.depayload(
497 stream.rid(),
498 packet,
499 self.reordering_size_audio,
500 self.reordering_size_video,
501 &self.codec_config,
502 );
503 }
504 }
505
506 fn handle_rtcp(&mut self, now: Instant, buf: &[u8]) -> Option<()> {
507 let srtp: &mut SrtpContext = self.srtp_rx.as_mut()?;
508 let unprotected = srtp.unprotect_rtcp(buf)?;
509
510 Rtcp::read_packet(&unprotected, &mut self.feedback_rx);
511 let mut need_configure_pacer = false;
512
513 if let Some(raw_packets) = &mut self.raw_packets {
514 for fb in &self.feedback_rx {
515 raw_packets.push_back(Box::new(RawPacket::RtcpRx(fb.clone())));
516 }
517 }
518
519 for fb in RtcpFb::from_rtcp(self.feedback_rx.drain(..)) {
520 if let RtcpFb::Twcc(twcc) = fb {
521 trace!("Handle TWCC: {:?}", twcc);
522 let maybe_records = self.twcc_tx_register.apply_report(twcc, now);
523
524 if let (Some(maybe_records), Some(bwe)) = (maybe_records, &mut self.bwe) {
525 bwe.update(maybe_records, now);
526 }
527 need_configure_pacer = true;
528
529 continue;
533 }
534
535 if fb.is_for_rx() {
536 let Some(stream) = self.streams.stream_rx(&fb.ssrc()) else {
537 continue;
538 };
539 stream.handle_rtcp(now, fb);
540 } else {
541 let Some(stream) = self.streams.stream_tx(&fb.ssrc()) else {
542 continue;
543 };
544 stream.handle_rtcp(now, fb);
545 }
546 }
547
548 if need_configure_pacer {
551 self.configure_pacer();
552 }
553
554 Some(())
555 }
556
557 pub fn poll_event(&mut self) -> Option<Event> {
558 if let Some(bitrate_estimate) = self.bwe.as_mut().and_then(|bwe| bwe.poll_estimate()) {
559 return Some(Event::EgressBitrateEstimate(BweKind::Twcc(
560 bitrate_estimate,
561 )));
562 }
563
564 if !self.ready_for_srtp() {
566 return None;
567 }
568
569 if let Some(raw_packets) = &mut self.raw_packets {
570 if let Some(p) = raw_packets.pop_front() {
571 return Some(Event::RawPacket(p));
572 }
573 }
574
575 if let Some(paused) = self.streams.poll_stream_paused() {
578 return Some(Event::StreamPaused(paused));
579 }
580
581 if self.rtp_mode {
582 if let Some(packet) = self.pending_packet.take() {
583 return Some(Event::RtpPacket(packet));
584 }
585 }
586
587 if let Some(req) = self.streams.poll_keyframe_request() {
588 return Some(Event::KeyframeRequest(req));
589 }
590
591 if let Some((mid, bitrate)) = self.streams.poll_remb_request() {
592 return Some(Event::EgressBitrateEstimate(BweKind::Remb(mid, bitrate)));
593 }
594
595 for media in &mut self.medias {
596 if media.need_open_event {
597 media.need_open_event = false;
598
599 return Some(Event::MediaAdded(MediaAdded {
600 mid: media.mid(),
601 kind: media.kind(),
602 direction: media.direction(),
603 simulcast: media.simulcast().map(|s| s.clone().into()),
604 }));
605 }
606
607 if media.need_changed_event {
608 media.need_changed_event = false;
609 return Some(Event::MediaChanged(MediaChanged {
610 mid: media.mid(),
611 direction: media.direction(),
612 }));
613 }
614 }
615
616 None
617 }
618
619 pub fn poll_event_fallible(&mut self) -> Result<Option<Event>, RtcError> {
620 if self.rtp_mode {
622 return Ok(None);
623 }
624
625 for media in &mut self.medias {
626 if let Some(e) = media.poll_sample(&self.codec_config)? {
627 return Ok(Some(Event::MediaData(e)));
628 }
629 }
630
631 Ok(None)
632 }
633
634 fn ready_for_srtp(&self) -> bool {
635 self.srtp_rx.is_some() && self.srtp_tx.is_some()
636 }
637
638 pub fn poll_datagram(&mut self, now: Instant) -> Option<net::DatagramSend> {
639 if now == already_happened() {
641 return None;
642 }
643
644 let x = None
645 .or_else(|| self.poll_feedback())
646 .or_else(|| self.poll_packet(now));
647
648 if let Some(x) = &x {
649 if !self.rtp_mode && x.len() > DATAGRAM_MTU_WARN {
653 warn!("RTP above MTU {}: {}", DATAGRAM_MTU_WARN, x.len());
654 }
655 }
656
657 x
658 }
659
660 pub fn clear_feedback(&mut self) {
663 self.feedback_rx.clear();
664 self.feedback_tx.clear();
665 }
666
667 fn poll_feedback(&mut self) -> Option<net::DatagramSend> {
668 if self.feedback_tx.is_empty() {
669 return None;
670 }
671
672 const ENCRYPTABLE_MTU: usize = (DATAGRAM_MTU - SRTCP_OVERHEAD) & !3;
674 assert!(ENCRYPTABLE_MTU % 4 == 0);
675
676 let mut data = vec![0_u8; ENCRYPTABLE_MTU];
677
678 let mut raw_packets = self.raw_packets.as_mut();
679 let output = move |fb| {
680 if let Some(raw_packets) = &mut raw_packets {
681 raw_packets.push_back(Box::new(RawPacket::RtcpTx(fb)));
682 }
683 };
684
685 let len = Rtcp::write_packet(&mut self.feedback_tx, &mut data, output);
686
687 if len == 0 {
688 return None;
689 }
690
691 data.truncate(len);
692
693 let srtp = self.srtp_tx.as_mut()?;
694 let protected = srtp.protect_rtcp(&data);
695
696 assert!(
697 protected.len() < DATAGRAM_MTU,
698 "Encrypted SRTCP should be less than MTU"
699 );
700
701 Some(protected.into())
702 }
703
704 fn poll_packet(&mut self, now: Instant) -> Option<DatagramSend> {
705 let srtp_tx = self.srtp_tx.as_mut()?;
706
707 let midrid = self.pacer.poll_queue()?;
709 let media = self
710 .medias
711 .iter()
712 .find(|m| m.mid() == midrid.mid())
713 .expect("index is media");
714
715 let buf = &mut self.poll_packet_buf;
716 let twcc_seq = self.twcc;
717
718 let stream = self.streams.stream_tx_by_midrid(midrid)?;
719
720 let params = &self.codec_config;
721 let exts = media.remote_extmap();
722
723 let twcc_enabled = exts.id_of(Extension::TransportSequenceNumber).is_some();
726 let twcc = twcc_enabled.then_some(&mut self.twcc);
727
728 let receipt = stream.poll_packet(now, exts, twcc, params, buf)?;
729
730 let PacketReceipt {
731 header,
732 seq_no,
733 is_padding,
734 payload_size,
735 } = receipt;
736
737 trace!(payload_size, is_padding, "Poll RTP: {:?}", header);
738
739 #[cfg(feature = "_internal_dont_use_log_stats")]
740 {
741 let kind = if is_padding { "padding" } else { "media" };
742
743 crate::log_stat!("PACKET_SENT", header.ssrc, payload_size, kind);
744 }
745
746 self.pacer.register_send(now, payload_size.into(), midrid);
747
748 if let Some(raw_packets) = &mut self.raw_packets {
749 raw_packets.push_back(Box::new(RawPacket::RtpTx(header.clone(), buf.clone())));
750 }
751
752 let protected = srtp_tx.protect_rtp(buf, &header, *seq_no);
753
754 if twcc_enabled {
755 self.twcc_tx_register
756 .register_seq(twcc_seq.into(), now, payload_size);
757 }
758
759 self.update_queue_state(now);
762
763 Some(protected.into())
764 }
765
766 pub fn poll_timeout(&mut self) -> (Option<Instant>, Reason) {
767 let feedback_at = self.regular_feedback_at();
768 let nack_at = self.nack_at();
769 let twcc_at = self.twcc_at();
770 let pacing_at = self.pacer.poll_timeout();
771 let packetize_at = self.medias.iter().flat_map(|m| m.poll_timeout()).next();
772 let bwe_at = self.bwe.as_ref().map(|bwe| bwe.poll_timeout());
773 let paused_at = self.paused_at();
774 let send_stream_at = self.streams.send_stream();
775
776 (feedback_at, Reason::Feedback)
777 .soonest((nack_at, Reason::Nack))
778 .soonest((twcc_at, Reason::Twcc))
779 .soonest((pacing_at, Reason::Pacing))
780 .soonest((packetize_at, Reason::Packetize))
781 .soonest((bwe_at, Reason::Bwe))
782 .soonest((paused_at, Reason::PauseCheck))
783 .soonest((send_stream_at, Reason::SendStream))
784 }
785
786 pub fn has_mid(&self, mid: Mid) -> bool {
787 self.medias.iter().any(|m| m.mid() == mid)
788 }
789
790 fn regular_feedback_at(&self) -> Option<Instant> {
791 self.streams.regular_feedback_at()
792 }
793
794 fn paused_at(&self) -> Option<Instant> {
795 self.streams.paused_at()
796 }
797
798 fn nack_at(&mut self) -> Option<Instant> {
799 if !self.streams.any_nack_enabled() {
800 return None;
801 }
802
803 Some(self.last_nack + NACK_MIN_INTERVAL)
804 }
805
806 fn twcc_at(&self) -> Option<Instant> {
807 let is_receiving = self.streams.is_receiving();
808 if is_receiving && self.enable_twcc_feedback && self.twcc_rx_register.has_unreported() {
809 Some(self.last_twcc + TWCC_INTERVAL)
810 } else {
811 None
812 }
813 }
814
815 pub fn enable_twcc_feedback(&mut self) {
816 if !self.enable_twcc_feedback {
817 debug!("Enable TWCC feedback");
818 self.enable_twcc_feedback = true;
819 }
820 }
821
822 pub fn visit_stats(&mut self, now: Instant, snapshot: &mut StatsSnapshot) {
823 for stream in self.streams.streams_tx() {
824 stream.visit_stats(snapshot, now);
825 }
826
827 for stream in self.streams.streams_rx() {
828 stream.visit_stats(snapshot, now);
829 }
830
831 snapshot.tx = snapshot.egress.values().map(|s| s.bytes).sum();
832 snapshot.rx = snapshot.ingress.values().map(|s| s.bytes).sum();
833 snapshot.bwe_tx = self.bwe.as_ref().and_then(|bwe| bwe.last_estimate());
834
835 snapshot.egress_loss_fraction = self.twcc_tx_register.loss(Duration::from_secs(1), now);
836 snapshot.rtt = self.twcc_tx_register.rtt();
837 snapshot.ingress_loss_fraction = self.twcc_rx_register.loss();
838 }
839
840 pub fn set_bwe_current_bitrate(&mut self, current_bitrate: Bitrate) {
841 if let Some(bwe) = self.bwe.as_mut() {
842 bwe.current_bitrate = current_bitrate;
843 self.configure_pacer();
844 }
845 }
846
847 pub fn set_bwe_desired_bitrate(&mut self, desired_bitrate: Bitrate) {
848 if let Some(bwe) = self.bwe.as_mut() {
849 bwe.desired_bitrate = desired_bitrate;
850 self.configure_pacer();
851 }
852 }
853
854 pub fn reset_bwe(&mut self, init_bitrate: Bitrate) {
855 if let Some(bwe) = self.bwe.as_mut() {
856 bwe.reset(init_bitrate);
857 }
858 }
859
860 pub fn line_count(&self) -> usize {
861 self.medias.len() + if self.app.is_some() { 1 } else { 0 }
862 }
863
864 pub fn add_media(&mut self, media: Media) {
865 self.medias.push(media);
866 }
867
868 pub fn medias(&self) -> &[Media] {
869 &self.medias
870 }
871
872 pub fn remove_media(&mut self, mid: Mid) {
873 self.medias.retain(|media| media.mid() != mid);
874 self.streams.remove_streams_by_mid(mid);
875 }
876
877 fn configure_pacer(&mut self) {
878 let Some(bwe) = self.bwe.as_ref() else {
879 return;
880 };
881
882 let padding_rate = bwe
883 .last_estimate()
884 .map(|estimate| estimate.min(bwe.desired_bitrate))
885 .unwrap_or(Bitrate::ZERO);
886
887 self.pacer.set_padding_rate(padding_rate);
888
889 let pacing_rate = (bwe.current_bitrate * PACING_FACTOR).max(padding_rate);
896 self.pacer.set_pacing_rate(pacing_rate);
897 }
898
899 pub fn media_by_mid(&self, mid: Mid) -> Option<&Media> {
900 self.medias.iter().find(|m| m.mid() == mid)
901 }
902
903 pub fn media_by_mid_mut(&mut self, mid: Mid) -> Option<&mut Media> {
904 self.medias.iter_mut().find(|m| m.mid() == mid)
905 }
906
907 fn do_payload(&mut self) -> Result<(), RtcError> {
908 for m in &mut self.medias {
909 m.do_payload(&mut self.streams, &self.codec_config)?;
910 }
911
912 Ok(())
913 }
914
915 pub fn set_direction(&mut self, mid: Mid, direction: Direction) -> bool {
916 let Some(media) = self.media_by_mid_mut(mid) else {
917 return false;
918 };
919 let old_dir = media.direction();
920 if old_dir == direction {
921 return false;
922 }
923
924 media.set_direction(direction);
925
926 if old_dir.is_sending() && !direction.is_sending() {
927 self.streams.reset_buffers_tx(mid);
928 }
929
930 let max_seq_lookup = make_max_seq_lookup(&self.max_rx_seq_lookup);
931 if old_dir.is_receiving() && !direction.is_receiving() {
932 self.streams.reset_buffers_rx(mid, max_seq_lookup);
933 }
934
935 true
936 }
937
938 pub fn is_request_keyframe_possible(&self, kind: KeyframeRequestKind) -> bool {
939 self.codec_config.iter().any(|r| match kind {
944 KeyframeRequestKind::Pli => r.fb_pli,
945 KeyframeRequestKind::Fir => r.fb_fir,
946 })
947 }
948
949 pub fn is_connected(&self) -> bool {
951 self.srtp_rx.is_some() && self.srtp_tx.is_some()
952 }
953}
954
955struct Bwe {
956 bwe: SendSideBandwithEstimator,
957 desired_bitrate: Bitrate,
958 current_bitrate: Bitrate,
959
960 last_emitted_estimate: Bitrate,
961}
962
963impl Bwe {
964 fn handle_timeout(&mut self, now: Instant) {
965 self.bwe.handle_timeout(now);
966 }
967
968 fn reset(&mut self, init_bitrate: Bitrate) {
969 self.bwe.reset(init_bitrate);
970 }
971
972 fn update<'t>(
973 &mut self,
974 records: impl Iterator<Item = &'t crate::rtp_::TwccSendRecord>,
975 now: Instant,
976 ) {
977 self.bwe.update(records, now);
978 }
979
980 fn poll_estimate(&mut self) -> Option<Bitrate> {
981 let estimate = self.bwe.last_estimate()?;
982
983 let min = self.last_emitted_estimate * (1.0 - ESTIMATE_TOLERANCE);
984 let max = self.last_emitted_estimate * (1.0 + ESTIMATE_TOLERANCE);
985
986 if estimate < min || estimate > max {
987 self.last_emitted_estimate = estimate;
988 Some(estimate)
989 } else {
990 None
992 }
993 }
994
995 fn poll_timeout(&self) -> Instant {
996 self.bwe.poll_timeout()
997 }
998
999 fn last_estimate(&self) -> Option<Bitrate> {
1000 self.bwe.last_estimate()
1001 }
1002}
1003
1004pub struct PacketReceipt {
1005 pub header: RtpHeader,
1006 pub seq_no: SeqNo,
1007 pub is_padding: bool,
1008 pub payload_size: usize,
1009}
1010
1011fn main_payload_params(c: &CodecConfig, pt: Pt) -> Option<&PayloadParams> {
1014 c.iter().find(|p| p.pt == pt || p.resend == Some(pt))
1015}
1016
1017fn make_max_seq_lookup(map: &HashMap<Ssrc, SeqNo>) -> impl Fn(Ssrc) -> Option<SeqNo> + '_ {
1018 |ssrc| map.get(&ssrc).cloned()
1019}
1020
1021fn update_max_seq(map: &mut HashMap<Ssrc, SeqNo>, ssrc: Ssrc, seq_no: SeqNo) {
1022 let current = map.entry(ssrc).or_insert(seq_no);
1023 if seq_no > *current {
1024 *current = seq_no;
1025 }
1026}