rtc_rtp/packetizer/
mod.rs1#[cfg(test)]
2mod packetizer_test;
3
4use crate::{extension::abs_send_time_extension::*, header::*, packet::*, sequence::*};
5use shared::{
6 error::Result,
7 marshal::{Marshal, MarshalSize},
8 time::SystemInstant,
9};
10
11use bytes::{Bytes, BytesMut};
12use std::fmt;
13use std::sync::Arc;
14use std::time::Instant;
15
16pub trait Payloader: Send + Sync + fmt::Debug {
18 fn payload(&mut self, mtu: usize, b: &Bytes) -> Result<Vec<Bytes>>;
19 fn clone_to(&self) -> Box<dyn Payloader>;
20}
21
22impl Clone for Box<dyn Payloader> {
23 fn clone(&self) -> Box<dyn Payloader> {
24 self.clone_to()
25 }
26}
27
28pub trait Packetizer: fmt::Debug {
30 fn enable_abs_send_time(&mut self, value: u8);
31 fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>>;
32 fn skip_samples(&mut self, skipped_samples: u32);
33 fn clone_to(&self) -> Box<dyn Packetizer>;
34}
35
36impl Clone for Box<dyn Packetizer> {
37 fn clone(&self) -> Box<dyn Packetizer> {
38 self.clone_to()
39 }
40}
41
42pub trait Depacketizer {
44 fn depacketize(&mut self, b: &Bytes) -> Result<Bytes>;
45
46 fn is_partition_head(&self, payload: &Bytes) -> bool;
50
51 fn is_partition_tail(&self, marker: bool, payload: &Bytes) -> bool;
54}
55
56pub type FnTimeGen = Arc<dyn (Fn() -> Instant) + Send + Sync>;
58
59#[derive(Clone)]
60pub(crate) struct PacketizerImpl {
61 pub(crate) mtu: usize,
62 pub(crate) payload_type: u8,
63 pub(crate) ssrc: u32,
64 pub(crate) payloader: Box<dyn Payloader>,
65 pub(crate) sequencer: Box<dyn Sequencer>,
66 pub(crate) timestamp: u32,
67 pub(crate) clock_rate: u32,
68 pub(crate) abs_send_time_ext_id: u8, pub(crate) time_gen: Option<FnTimeGen>,
70 time_baseline: SystemInstant,
71}
72
73impl fmt::Debug for PacketizerImpl {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 f.debug_struct("PacketizerImpl")
76 .field("mtu", &self.mtu)
77 .field("payload_type", &self.payload_type)
78 .field("ssrc", &self.ssrc)
79 .field("timestamp", &self.timestamp)
80 .field("clock_rate", &self.clock_rate)
81 .field("abs_send_time_ext_id", &self.abs_send_time_ext_id)
82 .finish()
83 }
84}
85
86pub fn new_packetizer(
87 mtu: usize,
88 payload_type: u8,
89 ssrc: u32,
90 payloader: Box<dyn Payloader>,
91 sequencer: Box<dyn Sequencer>,
92 clock_rate: u32,
93) -> impl Packetizer {
94 PacketizerImpl {
95 mtu,
96 payload_type,
97 ssrc,
98 payloader,
99 sequencer,
100 timestamp: rand::random::<u32>(),
101 clock_rate,
102 abs_send_time_ext_id: 0,
103 time_gen: None,
104 time_baseline: SystemInstant::now(),
105 }
106}
107
108impl Packetizer for PacketizerImpl {
109 fn enable_abs_send_time(&mut self, id: u8) {
110 self.abs_send_time_ext_id = id
111 }
112
113 fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>> {
114 let payloads = self.payloader.payload(self.mtu - 12, payload)?;
115 let payloads_len = payloads.len();
116 let mut packets = Vec::with_capacity(payloads_len);
117 for (i, payload) in payloads.into_iter().enumerate() {
118 packets.push(Packet {
119 header: Header {
120 version: 2,
121 padding: false,
122 extension: false,
123 marker: i == payloads_len - 1,
124 payload_type: self.payload_type,
125 sequence_number: self.sequencer.next_sequence_number(),
126 timestamp: self.timestamp, ssrc: self.ssrc,
128 ..Default::default()
129 },
130 payload,
131 });
132 }
133
134 self.timestamp = self.timestamp.wrapping_add(samples);
135
136 if payloads_len != 0 && self.abs_send_time_ext_id != 0 {
137 let now = if let Some(fn_time_gen) = &self.time_gen {
138 fn_time_gen()
139 } else {
140 Instant::now()
141 };
142 let send_time = AbsSendTimeExtension::new(self.time_baseline.ntp(now));
143 let mut raw = BytesMut::with_capacity(send_time.marshal_size());
145 raw.resize(send_time.marshal_size(), 0);
146 let _ = send_time.marshal_to(&mut raw)?;
147 packets[payloads_len - 1]
148 .header
149 .set_extension(self.abs_send_time_ext_id, raw.freeze())?;
150 }
151
152 Ok(packets)
153 }
154
155 fn skip_samples(&mut self, skipped_samples: u32) {
158 self.timestamp = self.timestamp.wrapping_add(skipped_samples);
159 }
160
161 fn clone_to(&self) -> Box<dyn Packetizer> {
162 Box::new(self.clone())
163 }
164}