1use super::define;
2use super::errors::PackerError;
3use super::errors::UnPackerError;
4use super::utils;
5use super::utils::OnFrameFn;
6use super::utils::OnRtpPacketFn;
7use super::utils::OnRtpPacketFn2;
8use super::utils::TPacker;
9use super::utils::TRtpReceiverForRtcp;
10use super::utils::TUnPacker;
11use super::utils::TVideoPacker;
12use super::utils::Unmarshal;
13use super::RtpHeader;
14use super::RtpPacket;
15use async_trait::async_trait;
16use byteorder::BigEndian;
17use bytes::{BufMut, BytesMut};
18use bytesio::bytes_reader::BytesReader;
19use bytesio::bytesio::TNetIO;
20use std::sync::Arc;
21use streamhub::define::FrameData;
22use tokio::sync::Mutex;
23
24pub struct RtpH265Packer {
25 header: RtpHeader,
26 mtu: usize,
27 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
28 on_packet_handler: Option<OnRtpPacketFn>,
29 on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>,
30}
31
32impl RtpH265Packer {
33 pub fn new(
34 payload_type: u8,
35 ssrc: u32,
36 init_seq: u16,
37 mtu: usize,
38 io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
39 ) -> Self {
40 RtpH265Packer {
41 header: RtpHeader {
42 payload_type,
43 seq_number: init_seq,
44 ssrc,
45 version: 2,
46 ..Default::default()
47 },
48 mtu,
49 io,
50 on_packet_handler: None,
51 on_packet_for_rtcp_handler: None,
52 }
53 }
54
55 pub async fn pack_fu(&mut self, nalu: BytesMut) -> Result<(), PackerError> {
56 let mut nalu_reader = BytesReader::new(nalu);
57 let nalu_header_1st_byte = nalu_reader.read_u8()?;
70 let nalu_header_2nd_byte = nalu_reader.read_u8()?;
71
72 let payload_hdr: u16 = ((nalu_header_1st_byte as u16 & 0x81) | ((define::FU as u16) << 1))
74 << 8
75 | nalu_header_2nd_byte as u16;
76 let mut fu_header = (nalu_header_1st_byte >> 1) & 0x3F | define::FU_START;
85
86 let mut left_nalu_bytes: usize = nalu_reader.len();
87 let mut fu_payload_len: usize;
88
89 while left_nalu_bytes > 0 {
90 if left_nalu_bytes + define::RTP_FIXED_HEADER_LEN <= self.mtu - 3 {
92 fu_header = (nalu_header_1st_byte & 0x1F) | define::FU_END;
93 fu_payload_len = left_nalu_bytes;
94 } else {
95 fu_payload_len = self.mtu - define::RTP_FIXED_HEADER_LEN - 3;
96 }
97
98 let fu_payload = nalu_reader.read_bytes(fu_payload_len)?;
99
100 let mut packet = RtpPacket::new(self.header.clone());
101 packet.payload.put_u16(payload_hdr);
102 packet.payload.put_u8(fu_header);
103 packet.payload.put(fu_payload);
104 packet.header.marker = if fu_header & define::FU_END > 0 { 1 } else { 0 };
105
106 if fu_header & define::FU_START > 0 {
107 fu_header &= 0x7F
108 }
109
110 if let Some(f) = &self.on_packet_for_rtcp_handler {
111 f(packet.clone()).await;
112 }
113
114 if let Some(f) = &self.on_packet_handler {
115 f(self.io.clone(), packet).await?;
116 }
117 left_nalu_bytes = nalu_reader.len();
118 self.header.seq_number += 1;
119 }
120
121 Ok(())
122 }
123 pub async fn pack_single(&mut self, nalu: BytesMut) -> Result<(), PackerError> {
124 let mut packet = RtpPacket::new(self.header.clone());
125 packet.header.marker = 1;
126 packet.payload.put(nalu);
127
128 self.header.seq_number += 1;
129
130 if let Some(f) = &self.on_packet_for_rtcp_handler {
131 f(packet.clone()).await;
132 }
133
134 if let Some(f) = &self.on_packet_handler {
135 return f(self.io.clone(), packet).await;
136 }
137 Ok(())
138 }
139}
140
141#[async_trait]
142impl TPacker for RtpH265Packer {
143 async fn pack(&mut self, nalus: &mut BytesMut, timestamp: u32) -> Result<(), PackerError> {
144 self.header.timestamp = timestamp;
145 utils::split_annexb_and_process(nalus, self).await?;
146 Ok(())
147 }
148 fn on_packet_handler(&mut self, f: OnRtpPacketFn) {
149 self.on_packet_handler = Some(f);
150 }
151}
152
153impl TRtpReceiverForRtcp for RtpH265Packer {
154 fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) {
155 self.on_packet_for_rtcp_handler = Some(f);
156 }
157}
158
159#[async_trait]
160impl TVideoPacker for RtpH265Packer {
161 async fn pack_nalu(&mut self, nalu: BytesMut) -> Result<(), PackerError> {
162 if nalu.len() + define::RTP_FIXED_HEADER_LEN <= self.mtu {
163 self.pack_single(nalu).await?;
164 } else {
165 self.pack_fu(nalu).await?;
166 }
167 Ok(())
168 }
169}
170
171#[derive(Default)]
172pub struct RtpH265UnPacker {
173 sequence_number: u16,
174 timestamp: u32,
175 fu_buffer: BytesMut,
176 using_donl_field: bool,
177 on_frame_handler: Option<OnFrameFn>,
178 on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>,
179}
180
181#[async_trait]
182impl TUnPacker for RtpH265UnPacker {
183 async fn unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError> {
184 let rtp_packet = RtpPacket::unmarshal(reader)?;
185
186 if let Some(f) = &self.on_packet_for_rtcp_handler {
187 f(rtp_packet.clone()).await;
188 }
189
190 self.timestamp = rtp_packet.header.timestamp;
191 self.sequence_number = rtp_packet.header.seq_number;
192
193 if let Some(packet_type) = rtp_packet.payload.first() {
194 match *packet_type >> 1 & 0x3F {
195 define::FU => {
196 return self.unpack_fu(rtp_packet.payload.clone());
197 }
198 define::AP => {
199 return self.unpack_ap(rtp_packet.payload);
200 }
201 define::PACI => return Ok(()),
202
203 _ => {
204 return self.unpack_single(rtp_packet.payload.clone());
205 }
206 }
207 }
208
209 Ok(())
210 }
211
212 fn on_frame_handler(&mut self, f: OnFrameFn) {
213 self.on_frame_handler = Some(f);
214 }
215}
216
217impl RtpH265UnPacker {
218 pub fn new() -> Self {
219 RtpH265UnPacker::default()
220 }
221
222 fn unpack_single(&mut self, payload: BytesMut) -> Result<(), UnPackerError> {
223 let mut annexb_payload = BytesMut::new();
224 annexb_payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE);
225 annexb_payload.put(payload);
226
227 if let Some(f) = &self.on_frame_handler {
228 f(FrameData::Video {
229 timestamp: self.timestamp,
230 data: annexb_payload,
231 })?;
232 }
233 Ok(())
234 }
235
236 fn unpack_ap(&mut self, rtp_payload: BytesMut) -> Result<(), UnPackerError> {
261 let mut payload_reader = BytesReader::new(rtp_payload);
262 payload_reader.read_bytes(2)?;
264
265 while !payload_reader.is_empty() {
266 if self.using_donl_field {
267 payload_reader.read_bytes(2)?;
269 }
270 let nalu_len = payload_reader.read_u16::<BigEndian>()? as usize;
272 let nalu = payload_reader.read_bytes(nalu_len)?;
274
275 let mut payload = BytesMut::new();
276 payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE);
277 payload.put(nalu);
278
279 if let Some(f) = &self.on_frame_handler {
280 f(FrameData::Video {
281 timestamp: self.timestamp,
282 data: payload,
283 })?;
284 }
285 }
286
287 Ok(())
288 }
289
290 fn unpack_fu(&mut self, rtp_payload: BytesMut) -> Result<(), UnPackerError> {
324 let mut payload_reader = BytesReader::new(rtp_payload);
325 let payload_header_1st_byte = payload_reader.read_u8()?;
326 let payload_header_2nd_byte = payload_reader.read_u8()?;
327 let fu_header = payload_reader.read_u8()?;
328 if self.using_donl_field {
329 payload_reader.read_bytes(2)?;
330 }
331
332 if utils::is_fu_start(fu_header) {
333 let nal_1st_byte = (payload_header_1st_byte & 0x81) | ((fu_header & 0x3F) << 1);
336 self.fu_buffer.put_u8(nal_1st_byte);
337 self.fu_buffer.put_u8(payload_header_2nd_byte);
338 }
339
340 self.fu_buffer.put(payload_reader.extract_remaining_bytes());
341
342 if utils::is_fu_end(fu_header) {
343 let mut payload = BytesMut::new();
344 payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE);
345 payload.put(self.fu_buffer.clone());
346 self.fu_buffer.clear();
347
348 if let Some(f) = &self.on_frame_handler {
349 f(FrameData::Video {
350 timestamp: self.timestamp,
351 data: payload,
352 })?;
353 }
354 }
355
356 Ok(())
357 }
358}
359
360impl TRtpReceiverForRtcp for RtpH265UnPacker {
361 fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) {
362 self.on_packet_for_rtcp_handler = Some(f);
363 }
364}