rtc_rtp/packetizer/
mod.rs

1#[cfg(test)]
2mod packetizer_test;
3
4use crate::{extension::abs_send_time_extension::*, header::*, packet::*, sequence::*};
5use shared::{
6    error::Result,
7    marshal::{Marshal, MarshalSize},
8    time::SystemInstant,
9};
10
11use bytes::{Bytes, BytesMut};
12use std::fmt;
13use std::sync::Arc;
14use std::time::Instant;
15
16/// Payloader payloads a byte array for use as rtp.Packet payloads
17pub trait Payloader: Send + Sync + fmt::Debug {
18    fn payload(&mut self, mtu: usize, b: &Bytes) -> Result<Vec<Bytes>>;
19    fn clone_to(&self) -> Box<dyn Payloader>;
20}
21
22impl Clone for Box<dyn Payloader> {
23    fn clone(&self) -> Box<dyn Payloader> {
24        self.clone_to()
25    }
26}
27
28/// Packetizer packetizes a payload
29pub trait Packetizer: fmt::Debug {
30    fn enable_abs_send_time(&mut self, value: u8);
31    fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>>;
32    fn skip_samples(&mut self, skipped_samples: u32);
33    fn clone_to(&self) -> Box<dyn Packetizer>;
34}
35
36impl Clone for Box<dyn Packetizer> {
37    fn clone(&self) -> Box<dyn Packetizer> {
38        self.clone_to()
39    }
40}
41
42/// Depacketizer depacketizes a RTP payload, removing any RTP specific data from the payload
43pub trait Depacketizer {
44    fn depacketize(&mut self, b: &Bytes) -> Result<Bytes>;
45
46    /// Checks if the packet is at the beginning of a partition.  This
47    /// should return false if the result could not be determined, in
48    /// which case the caller will detect timestamp discontinuities.
49    fn is_partition_head(&self, payload: &Bytes) -> bool;
50
51    /// Checks if the packet is at the end of a partition.  This should
52    /// return false if the result could not be determined.
53    fn is_partition_tail(&self, marker: bool, payload: &Bytes) -> bool;
54}
55
56/// FnTimeGen provides current time (Instant)
57pub type FnTimeGen = Arc<dyn (Fn() -> Instant) + Send + Sync>;
58
59#[derive(Clone)]
60pub(crate) struct PacketizerImpl {
61    pub(crate) mtu: usize,
62    pub(crate) payload_type: u8,
63    pub(crate) ssrc: u32,
64    pub(crate) payloader: Box<dyn Payloader>,
65    pub(crate) sequencer: Box<dyn Sequencer>,
66    pub(crate) timestamp: u32,
67    pub(crate) clock_rate: u32,
68    pub(crate) abs_send_time_ext_id: u8, //http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time
69    pub(crate) time_gen: Option<FnTimeGen>,
70    time_baseline: SystemInstant,
71}
72
73impl fmt::Debug for PacketizerImpl {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        f.debug_struct("PacketizerImpl")
76            .field("mtu", &self.mtu)
77            .field("payload_type", &self.payload_type)
78            .field("ssrc", &self.ssrc)
79            .field("timestamp", &self.timestamp)
80            .field("clock_rate", &self.clock_rate)
81            .field("abs_send_time_ext_id", &self.abs_send_time_ext_id)
82            .finish()
83    }
84}
85
86pub fn new_packetizer(
87    mtu: usize,
88    payload_type: u8,
89    ssrc: u32,
90    payloader: Box<dyn Payloader>,
91    sequencer: Box<dyn Sequencer>,
92    clock_rate: u32,
93) -> impl Packetizer {
94    PacketizerImpl {
95        mtu,
96        payload_type,
97        ssrc,
98        payloader,
99        sequencer,
100        timestamp: rand::random::<u32>(),
101        clock_rate,
102        abs_send_time_ext_id: 0,
103        time_gen: None,
104        time_baseline: SystemInstant::now(),
105    }
106}
107
108impl Packetizer for PacketizerImpl {
109    fn enable_abs_send_time(&mut self, id: u8) {
110        self.abs_send_time_ext_id = id
111    }
112
113    fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>> {
114        let payloads = self.payloader.payload(self.mtu - 12, payload)?;
115        let payloads_len = payloads.len();
116        let mut packets = Vec::with_capacity(payloads_len);
117        for (i, payload) in payloads.into_iter().enumerate() {
118            packets.push(Packet {
119                header: Header {
120                    version: 2,
121                    padding: false,
122                    extension: false,
123                    marker: i == payloads_len - 1,
124                    payload_type: self.payload_type,
125                    sequence_number: self.sequencer.next_sequence_number(),
126                    timestamp: self.timestamp, //TODO: Figure out how to do timestamps
127                    ssrc: self.ssrc,
128                    ..Default::default()
129                },
130                payload,
131            });
132        }
133
134        self.timestamp = self.timestamp.wrapping_add(samples);
135
136        if payloads_len != 0 && self.abs_send_time_ext_id != 0 {
137            let now = if let Some(fn_time_gen) = &self.time_gen {
138                fn_time_gen()
139            } else {
140                Instant::now()
141            };
142            let send_time = AbsSendTimeExtension::new(self.time_baseline.ntp(now));
143            //apply http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time
144            let mut raw = BytesMut::with_capacity(send_time.marshal_size());
145            raw.resize(send_time.marshal_size(), 0);
146            let _ = send_time.marshal_to(&mut raw)?;
147            packets[payloads_len - 1]
148                .header
149                .set_extension(self.abs_send_time_ext_id, raw.freeze())?;
150        }
151
152        Ok(packets)
153    }
154
155    /// skip_samples causes a gap in sample count between Packetize requests so the
156    /// RTP payloads produced have a gap in timestamps
157    fn skip_samples(&mut self, skipped_samples: u32) {
158        self.timestamp = self.timestamp.wrapping_add(skipped_samples);
159    }
160
161    fn clone_to(&self) -> Box<dyn Packetizer> {
162        Box::new(self.clone())
163    }
164}