rtc_rtp/packetizer/
mod.rs

1#[cfg(test)]
2mod packetizer_test;
3
4use crate::{extension::abs_send_time_extension::*, header::*, packet::*, sequence::*};
5use shared::{
6    error::Result,
7    marshal::{Marshal, MarshalSize},
8};
9
10use bytes::{Bytes, BytesMut};
11use std::fmt;
12use std::sync::Arc;
13use std::time::SystemTime;
14
15/// Payloader payloads a byte array for use as rtp.Packet payloads
16pub trait Payloader: fmt::Debug {
17    fn payload(&mut self, mtu: usize, b: &Bytes) -> Result<Vec<Bytes>>;
18    fn clone_to(&self) -> Box<dyn Payloader>;
19}
20
21impl Clone for Box<dyn Payloader> {
22    fn clone(&self) -> Box<dyn Payloader> {
23        self.clone_to()
24    }
25}
26
27/// Packetizer packetizes a payload
28pub trait Packetizer: fmt::Debug {
29    fn enable_abs_send_time(&mut self, value: u8);
30    fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>>;
31    fn skip_samples(&mut self, skipped_samples: u32);
32    fn clone_to(&self) -> Box<dyn Packetizer>;
33}
34
35impl Clone for Box<dyn Packetizer> {
36    fn clone(&self) -> Box<dyn Packetizer> {
37        self.clone_to()
38    }
39}
40
41/// Depacketizer depacketizes a RTP payload, removing any RTP specific data from the payload
42pub trait Depacketizer {
43    fn depacketize(&mut self, b: &Bytes) -> Result<Bytes>;
44
45    /// Checks if the packet is at the beginning of a partition.  This
46    /// should return false if the result could not be determined, in
47    /// which case the caller will detect timestamp discontinuities.
48    fn is_partition_head(&self, payload: &Bytes) -> bool;
49
50    /// Checks if the packet is at the end of a partition.  This should
51    /// return false if the result could not be determined.
52    fn is_partition_tail(&self, marker: bool, payload: &Bytes) -> bool;
53}
54
55//TODO: SystemTime vs Instant?
56// non-monotonic clock vs monotonically non-decreasing clock
57/// FnTimeGen provides current SystemTime
58pub type FnTimeGen = Arc<dyn (Fn() -> SystemTime)>;
59
60#[derive(Clone)]
61pub(crate) struct PacketizerImpl {
62    pub(crate) mtu: usize,
63    pub(crate) payload_type: u8,
64    pub(crate) ssrc: u32,
65    pub(crate) payloader: Box<dyn Payloader>,
66    pub(crate) sequencer: Box<dyn Sequencer>,
67    pub(crate) timestamp: u32,
68    pub(crate) clock_rate: u32,
69    pub(crate) abs_send_time: u8, //http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time
70    pub(crate) time_gen: Option<FnTimeGen>,
71}
72
73impl fmt::Debug for PacketizerImpl {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        f.debug_struct("PacketizerImpl")
76            .field("mtu", &self.mtu)
77            .field("payload_type", &self.payload_type)
78            .field("ssrc", &self.ssrc)
79            .field("timestamp", &self.timestamp)
80            .field("clock_rate", &self.clock_rate)
81            .field("abs_send_time", &self.abs_send_time)
82            .finish()
83    }
84}
85
86pub fn new_packetizer(
87    mtu: usize,
88    payload_type: u8,
89    ssrc: u32,
90    payloader: Box<dyn Payloader>,
91    sequencer: Box<dyn Sequencer>,
92    clock_rate: u32,
93) -> impl Packetizer {
94    PacketizerImpl {
95        mtu,
96        payload_type,
97        ssrc,
98        payloader,
99        sequencer,
100        timestamp: rand::random::<u32>(),
101        clock_rate,
102        abs_send_time: 0,
103        time_gen: None,
104    }
105}
106
107impl Packetizer for PacketizerImpl {
108    fn enable_abs_send_time(&mut self, value: u8) {
109        self.abs_send_time = value
110    }
111
112    fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>> {
113        let payloads = self.payloader.payload(self.mtu - 12, payload)?;
114        let payloads_len = payloads.len();
115        let mut packets = Vec::with_capacity(payloads_len);
116        for (i, payload) in payloads.into_iter().enumerate() {
117            packets.push(Packet {
118                header: Header {
119                    version: 2,
120                    padding: false,
121                    extension: false,
122                    marker: i == payloads_len - 1,
123                    payload_type: self.payload_type,
124                    sequence_number: self.sequencer.next_sequence_number(),
125                    timestamp: self.timestamp, //TODO: Figure out how to do timestamps
126                    ssrc: self.ssrc,
127                    ..Default::default()
128                },
129                payload,
130            });
131        }
132
133        self.timestamp = self.timestamp.wrapping_add(samples);
134
135        if payloads_len != 0 && self.abs_send_time != 0 {
136            let st = if let Some(fn_time_gen) = &self.time_gen {
137                fn_time_gen()
138            } else {
139                SystemTime::now()
140            };
141            let send_time = AbsSendTimeExtension::new(st);
142            //apply http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time
143            let mut raw = BytesMut::with_capacity(send_time.marshal_size());
144            raw.resize(send_time.marshal_size(), 0);
145            let _ = send_time.marshal_to(&mut raw)?;
146            packets[payloads_len - 1]
147                .header
148                .set_extension(self.abs_send_time, raw.freeze())?;
149        }
150
151        Ok(packets)
152    }
153
154    /// skip_samples causes a gap in sample count between Packetize requests so the
155    /// RTP payloads produced have a gap in timestamps
156    fn skip_samples(&mut self, skipped_samples: u32) {
157        self.timestamp = self.timestamp.wrapping_add(skipped_samples);
158    }
159
160    fn clone_to(&self) -> Box<dyn Packetizer> {
161        Box::new(self.clone())
162    }
163}