rtc_rtp/packetizer/
mod.rs1#[cfg(test)]
2mod packetizer_test;
3
4use crate::{extension::abs_send_time_extension::*, header::*, packet::*, sequence::*};
5use shared::{
6 error::Result,
7 marshal::{Marshal, MarshalSize},
8};
9
10use bytes::{Bytes, BytesMut};
11use std::fmt;
12use std::sync::Arc;
13use std::time::SystemTime;
14
15pub trait Payloader: fmt::Debug {
17 fn payload(&mut self, mtu: usize, b: &Bytes) -> Result<Vec<Bytes>>;
18 fn clone_to(&self) -> Box<dyn Payloader>;
19}
20
21impl Clone for Box<dyn Payloader> {
22 fn clone(&self) -> Box<dyn Payloader> {
23 self.clone_to()
24 }
25}
26
27pub trait Packetizer: fmt::Debug {
29 fn enable_abs_send_time(&mut self, value: u8);
30 fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>>;
31 fn skip_samples(&mut self, skipped_samples: u32);
32 fn clone_to(&self) -> Box<dyn Packetizer>;
33}
34
35impl Clone for Box<dyn Packetizer> {
36 fn clone(&self) -> Box<dyn Packetizer> {
37 self.clone_to()
38 }
39}
40
41pub trait Depacketizer {
43 fn depacketize(&mut self, b: &Bytes) -> Result<Bytes>;
44
45 fn is_partition_head(&self, payload: &Bytes) -> bool;
49
50 fn is_partition_tail(&self, marker: bool, payload: &Bytes) -> bool;
53}
54
55pub type FnTimeGen = Arc<dyn (Fn() -> SystemTime)>;
59
60#[derive(Clone)]
61pub(crate) struct PacketizerImpl {
62 pub(crate) mtu: usize,
63 pub(crate) payload_type: u8,
64 pub(crate) ssrc: u32,
65 pub(crate) payloader: Box<dyn Payloader>,
66 pub(crate) sequencer: Box<dyn Sequencer>,
67 pub(crate) timestamp: u32,
68 pub(crate) clock_rate: u32,
69 pub(crate) abs_send_time: u8, pub(crate) time_gen: Option<FnTimeGen>,
71}
72
73impl fmt::Debug for PacketizerImpl {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 f.debug_struct("PacketizerImpl")
76 .field("mtu", &self.mtu)
77 .field("payload_type", &self.payload_type)
78 .field("ssrc", &self.ssrc)
79 .field("timestamp", &self.timestamp)
80 .field("clock_rate", &self.clock_rate)
81 .field("abs_send_time", &self.abs_send_time)
82 .finish()
83 }
84}
85
86pub fn new_packetizer(
87 mtu: usize,
88 payload_type: u8,
89 ssrc: u32,
90 payloader: Box<dyn Payloader>,
91 sequencer: Box<dyn Sequencer>,
92 clock_rate: u32,
93) -> impl Packetizer {
94 PacketizerImpl {
95 mtu,
96 payload_type,
97 ssrc,
98 payloader,
99 sequencer,
100 timestamp: rand::random::<u32>(),
101 clock_rate,
102 abs_send_time: 0,
103 time_gen: None,
104 }
105}
106
107impl Packetizer for PacketizerImpl {
108 fn enable_abs_send_time(&mut self, value: u8) {
109 self.abs_send_time = value
110 }
111
112 fn packetize(&mut self, payload: &Bytes, samples: u32) -> Result<Vec<Packet>> {
113 let payloads = self.payloader.payload(self.mtu - 12, payload)?;
114 let payloads_len = payloads.len();
115 let mut packets = Vec::with_capacity(payloads_len);
116 for (i, payload) in payloads.into_iter().enumerate() {
117 packets.push(Packet {
118 header: Header {
119 version: 2,
120 padding: false,
121 extension: false,
122 marker: i == payloads_len - 1,
123 payload_type: self.payload_type,
124 sequence_number: self.sequencer.next_sequence_number(),
125 timestamp: self.timestamp, ssrc: self.ssrc,
127 ..Default::default()
128 },
129 payload,
130 });
131 }
132
133 self.timestamp = self.timestamp.wrapping_add(samples);
134
135 if payloads_len != 0 && self.abs_send_time != 0 {
136 let st = if let Some(fn_time_gen) = &self.time_gen {
137 fn_time_gen()
138 } else {
139 SystemTime::now()
140 };
141 let send_time = AbsSendTimeExtension::new(st);
142 let mut raw = BytesMut::with_capacity(send_time.marshal_size());
144 raw.resize(send_time.marshal_size(), 0);
145 let _ = send_time.marshal_to(&mut raw)?;
146 packets[payloads_len - 1]
147 .header
148 .set_extension(self.abs_send_time, raw.freeze())?;
149 }
150
151 Ok(packets)
152 }
153
154 fn skip_samples(&mut self, skipped_samples: u32) {
157 self.timestamp = self.timestamp.wrapping_add(skipped_samples);
158 }
159
160 fn clone_to(&self) -> Box<dyn Packetizer> {
161 Box::new(self.clone())
162 }
163}