1use std::{
4 collections::{HashMap, VecDeque},
5 convert::Infallible,
6 ops::{Deref, DerefMut},
7 pin::Pin,
8 sync::Arc,
9 task::{Context, Poll},
10 time::Instant,
11};
12
13use futures::{ready, Sink, Stream};
14
15use crate::{
16 rtcp::{RtcpContext, RtcpContextHandle},
17 rtp::{IncomingRtpPacket, OrderedRtpPacket, RtpPacket},
18 utils::reorder::{ReorderingError, ReorderingMultiBuffer},
19};
20
21pub trait RtpTransceiver {
23 fn rtcp_context(&self) -> RtcpContextHandle;
30}
31
32#[derive(Debug, Copy, Clone, PartialEq, Eq)]
34pub enum SSRCMode {
35 Ignore,
41
42 Any,
44
45 Specific,
47}
48
49#[derive(Clone)]
51pub struct SSRC2ClockRate {
52 inner: HashMap<u32, u32>,
53}
54
55impl SSRC2ClockRate {
56 fn new() -> Self {
58 Self {
59 inner: HashMap::new(),
60 }
61 }
62
63 fn from_iter<T>(items: T) -> Self
65 where
66 T: IntoIterator<Item = (u32, u32)>,
67 {
68 Self {
69 inner: HashMap::from_iter(items),
70 }
71 }
72
73 #[inline]
75 pub fn len(&self) -> usize {
76 self.inner.len()
77 }
78
79 #[inline]
81 pub fn is_empty(&self) -> bool {
82 self.inner.is_empty()
83 }
84
85 #[inline]
87 pub fn clock_rate(&self, ssrc: u32) -> Option<u32> {
88 self.inner.get(&ssrc).copied()
89 }
90
91 #[inline]
93 pub fn contains(&self, ssrc: u32) -> bool {
94 self.inner.contains_key(&ssrc)
95 }
96
97 pub fn iter(&self) -> impl Iterator<Item = (u32, u32)> + use<'_> {
99 self.inner
100 .iter()
101 .map(|(&ssrc, &clock_rate)| (ssrc, clock_rate))
102 }
103}
104
105#[derive(Clone)]
107pub struct RtpTransceiverOptions {
108 inner: ArcInnerTransceiverOptions,
109}
110
111impl RtpTransceiverOptions {
112 pub fn new() -> Self {
114 Self {
115 inner: ArcInnerTransceiverOptions::new(InnerTransceiverOptions::new()),
116 }
117 }
118
119 #[inline]
121 pub fn primary_sender_ssrc(&self) -> u32 {
122 self.inner.primary_sender_ssrc
123 }
124
125 pub fn with_primary_sender_ssrc(mut self, ssrc: u32) -> Self {
130 self.inner.primary_sender_ssrc = ssrc;
131 self
132 }
133
134 #[inline]
136 pub fn reordering_buffer_depth(&self) -> usize {
137 self.inner.reordering_buffer_depth
138 }
139
140 pub fn with_reordering_buffer_depth(mut self, depth: usize) -> Self {
144 self.inner.reordering_buffer_depth = depth;
145 self
146 }
147
148 #[inline]
150 pub fn default_clock_rate(&self) -> u32 {
151 self.inner.default_clock_rate
152 }
153
154 pub fn with_default_clock_rate(mut self, clock_rate: u32) -> Self {
160 self.inner.default_clock_rate = clock_rate;
161 self
162 }
163
164 #[inline]
166 pub fn input_ssrc_mode(&self) -> SSRCMode {
167 self.inner.input_ssrc_mode
168 }
169
170 pub fn with_input_ssrc_mode(mut self, mode: SSRCMode) -> Self {
174 self.inner.input_ssrc_mode = mode;
175 self
176 }
177
178 #[inline]
180 pub fn max_input_ssrcs(&self) -> Option<usize> {
181 self.inner.max_input_ssrcs
182 }
183
184 pub fn with_max_input_ssrcs(mut self, max: Option<usize>) -> Self {
194 self.inner.max_input_ssrcs = max;
195 self
196 }
197
198 #[inline]
200 pub fn input_ssrcs(&self) -> &SSRC2ClockRate {
201 &self.inner.input_ssrcs
202 }
203
204 pub fn with_input_ssrcs<T>(mut self, ssrcs: T) -> Self
214 where
215 T: IntoIterator<Item = (u32, u32)>,
216 {
217 self.inner.input_ssrcs = SSRC2ClockRate::from_iter(ssrcs);
218 self
219 }
220
221 #[inline]
223 pub fn output_ssrcs(&self) -> &SSRC2ClockRate {
224 &self.inner.output_ssrcs
225 }
226
227 pub fn with_output_ssrcs<T>(mut self, ssrcs: T) -> Self
237 where
238 T: IntoIterator<Item = (u32, u32)>,
239 {
240 self.inner.output_ssrcs = SSRC2ClockRate::from_iter(ssrcs);
241 self
242 }
243
244 #[inline]
246 pub fn max_rtcp_packet_size(&self) -> usize {
247 self.inner.max_rtcp_packet_size
248 }
249
250 pub fn with_max_rtcp_packet_size(mut self, size: usize) -> Self {
257 self.inner.max_rtcp_packet_size = size;
258 self
259 }
260}
261
262impl Default for RtpTransceiverOptions {
263 #[inline]
264 fn default() -> Self {
265 Self::new()
266 }
267}
268
269#[derive(Clone)]
273struct ArcInnerTransceiverOptions {
274 inner: Arc<InnerTransceiverOptions>,
275}
276
277impl ArcInnerTransceiverOptions {
278 fn new(inner: InnerTransceiverOptions) -> Self {
280 Self {
281 inner: Arc::new(inner),
282 }
283 }
284}
285
286impl Deref for ArcInnerTransceiverOptions {
287 type Target = InnerTransceiverOptions;
288
289 #[inline]
290 fn deref(&self) -> &Self::Target {
291 &self.inner
292 }
293}
294
295impl DerefMut for ArcInnerTransceiverOptions {
296 fn deref_mut(&mut self) -> &mut Self::Target {
297 Arc::make_mut(&mut self.inner)
298 }
299}
300
301#[derive(Clone)]
303struct InnerTransceiverOptions {
304 primary_sender_ssrc: u32,
305 reordering_buffer_depth: usize,
306 default_clock_rate: u32,
307 input_ssrc_mode: SSRCMode,
308 max_input_ssrcs: Option<usize>,
309 input_ssrcs: SSRC2ClockRate,
310 output_ssrcs: SSRC2ClockRate,
311 max_rtcp_packet_size: usize,
312}
313
314impl InnerTransceiverOptions {
315 fn new() -> Self {
317 Self {
318 primary_sender_ssrc: rand::random(),
319 reordering_buffer_depth: 64,
320 default_clock_rate: 90000,
321 input_ssrc_mode: SSRCMode::Any,
322 max_input_ssrcs: Some(64),
323 input_ssrcs: SSRC2ClockRate::new(),
324 output_ssrcs: SSRC2ClockRate::new(),
325 max_rtcp_packet_size: 1200,
326 }
327 }
328}
329
330pin_project_lite::pin_project! {
331 pub struct DefaultRtpTransceiver<T, E = Infallible> {
333 #[pin]
334 inner: T,
335 context: TransceiverContext,
336 error: Option<E>,
337 eof: bool,
338 }
339}
340
341impl<T, E> DefaultRtpTransceiver<T, E> {
342 pub fn new(stream: T, options: RtpTransceiverOptions) -> Self {
344 Self {
345 inner: stream,
346 context: TransceiverContext::new(options),
347 error: None,
348 eof: false,
349 }
350 }
351}
352
353impl<T, E> Stream for DefaultRtpTransceiver<T, E>
354where
355 T: Stream<Item = Result<RtpPacket, E>>,
356{
357 type Item = Result<OrderedRtpPacket, E>;
358
359 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
360 let mut this = self.project();
361
362 loop {
363 if let Some(packet) = this.context.poll_next_ordered_packet() {
364 return Poll::Ready(Some(Ok(packet)));
365 }
366
367 let inner = this.inner.as_mut();
368
369 if !*this.eof {
370 let ready = if this.context.end_of_stream() {
371 None
372 } else {
373 ready!(inner.poll_next(cx))
374 };
375
376 match ready {
377 Some(Ok(packet)) => this.context.process_incoming_packet(packet),
378 other => {
379 if let Some(Err(err)) = other {
380 *this.error = Some(err);
381 }
382
383 *this.eof = true;
384 }
385 }
386 } else if let Some(packet) = this.context.take_next_ordered_packet() {
387 return Poll::Ready(Some(Ok(packet)));
388 } else if let Some(err) = this.error.take() {
389 return Poll::Ready(Some(Err(err)));
390 } else {
391 return Poll::Ready(None);
392 }
393 }
394 }
395}
396
397impl<T, E> Sink<RtpPacket> for DefaultRtpTransceiver<T, E>
398where
399 T: Sink<RtpPacket>,
400{
401 type Error = T::Error;
402
403 #[inline]
404 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
405 let this = self.project();
406
407 this.inner.poll_ready(cx)
408 }
409
410 fn start_send(self: Pin<&mut Self>, packet: RtpPacket) -> Result<(), Self::Error> {
411 let this = self.project();
412
413 this.context.process_outgoing_packet(&packet);
414
415 this.inner.start_send(packet)?;
416
417 Ok(())
418 }
419
420 #[inline]
421 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
422 let this = self.project();
423
424 this.inner.poll_flush(cx)
425 }
426
427 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
428 let this = self.project();
429
430 ready!(this.inner.poll_close(cx))?;
431
432 this.context.close();
433
434 Poll::Ready(Ok(()))
435 }
436}
437
438impl<T, E> RtpTransceiver for DefaultRtpTransceiver<T, E> {
439 #[inline]
440 fn rtcp_context(&self) -> RtcpContextHandle {
441 self.context.rtcp_context()
442 }
443}
444
445struct TransceiverContext {
447 options: RtpTransceiverOptions,
448 rtcp: RtcpContext,
449 buffer: ReorderingMultiBuffer,
450 output: VecDeque<OrderedRtpPacket>,
451}
452
453impl TransceiverContext {
454 fn new(options: RtpTransceiverOptions) -> Self {
456 let input_ssrcs = options.input_ssrcs();
457 let expected_ssrcs = input_ssrcs.len();
458
459 let max_input_ssrcs = options.max_input_ssrcs().map(|max| max.max(expected_ssrcs));
460
461 let max_ssrc_buffers = match options.input_ssrc_mode() {
462 SSRCMode::Ignore => Some(1),
463 SSRCMode::Any => max_input_ssrcs,
464 SSRCMode::Specific => Some(expected_ssrcs),
465 };
466
467 let reordering_buffer_depth = options.reordering_buffer_depth();
468
469 Self {
470 options: options.clone(),
471 rtcp: RtcpContext::new(options),
472 buffer: ReorderingMultiBuffer::new(reordering_buffer_depth, max_ssrc_buffers),
473 output: VecDeque::new(),
474 }
475 }
476
477 fn process_outgoing_packet(&mut self, packet: &RtpPacket) {
479 self.rtcp.process_outgoing_rtp_packet(packet);
480 }
481
482 fn process_incoming_packet(&mut self, packet: RtpPacket) {
484 let ssrc = packet.ssrc();
485
486 let input_ssrcs = self.options.input_ssrcs();
487 let input_ssrc_mode = self.options.input_ssrc_mode();
488
489 if input_ssrc_mode == SSRCMode::Specific && !input_ssrcs.contains(ssrc) {
490 return;
491 }
492
493 let now = Instant::now();
494
495 let packet = IncomingRtpPacket::new(packet, now);
496
497 self.rtcp.process_incoming_rtp_packet(&packet);
499
500 let mut packet = RtpPacket::from(packet);
501
502 if input_ssrc_mode == SSRCMode::Ignore {
504 packet = packet.with_ssrc(0);
505 }
506
507 let mut packet = IncomingRtpPacket::new(packet, now);
508
509 while let Err(ReorderingError::BufferFull(tmp)) = self.buffer.push(packet) {
512 if let Some(p) = self.buffer.take() {
513 self.process_ordered_packet(p);
514 }
515
516 packet = tmp;
517 }
518
519 while let Some(p) = self.buffer.next() {
521 self.process_ordered_packet(p);
522 }
523 }
524
525 fn process_ordered_packet(&mut self, packet: OrderedRtpPacket) {
527 self.rtcp.process_ordered_rtp_packet(&packet);
528 self.output.push_back(packet);
529 }
530
531 fn poll_next_ordered_packet(&mut self) -> Option<OrderedRtpPacket> {
533 self.output.pop_front()
534 }
535
536 fn take_next_ordered_packet(&mut self) -> Option<OrderedRtpPacket> {
540 while self.output.is_empty() {
541 if self.buffer.is_empty() {
542 break;
543 } else if let Some(packet) = self.buffer.take() {
544 self.process_ordered_packet(packet);
545 }
546 }
547
548 self.output.pop_front()
549 }
550
551 fn end_of_stream(&self) -> bool {
554 self.rtcp.end_of_stream()
555 }
556
557 fn close(&mut self) {
559 self.rtcp.close();
560 }
561
562 #[inline]
564 fn rtcp_context(&self) -> RtcpContextHandle {
565 self.rtcp.handle()
566 }
567}