xrtsp/rtp/
rtp_h265.rs

1use super::define;
2use super::errors::PackerError;
3use super::errors::UnPackerError;
4use super::utils;
5use super::utils::OnFrameFn;
6use super::utils::OnRtpPacketFn;
7use super::utils::OnRtpPacketFn2;
8use super::utils::TPacker;
9use super::utils::TRtpReceiverForRtcp;
10use super::utils::TUnPacker;
11use super::utils::TVideoPacker;
12use super::utils::Unmarshal;
13use super::RtpHeader;
14use super::RtpPacket;
15use async_trait::async_trait;
16use byteorder::BigEndian;
17use bytes::{BufMut, BytesMut};
18use bytesio::bytes_reader::BytesReader;
19use bytesio::bytesio::TNetIO;
20use std::sync::Arc;
21use streamhub::define::FrameData;
22use tokio::sync::Mutex;
23
24pub struct RtpH265Packer {
25    header: RtpHeader,
26    mtu: usize,
27    io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
28    on_packet_handler: Option<OnRtpPacketFn>,
29    on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>,
30}
31
32impl RtpH265Packer {
33    pub fn new(
34        payload_type: u8,
35        ssrc: u32,
36        init_seq: u16,
37        mtu: usize,
38        io: Arc<Mutex<Box<dyn TNetIO + Send + Sync>>>,
39    ) -> Self {
40        RtpH265Packer {
41            header: RtpHeader {
42                payload_type,
43                seq_number: init_seq,
44                ssrc,
45                version: 2,
46                ..Default::default()
47            },
48            mtu,
49            io,
50            on_packet_handler: None,
51            on_packet_for_rtcp_handler: None,
52        }
53    }
54
55    pub async fn pack_fu(&mut self, nalu: BytesMut) -> Result<(), PackerError> {
56        let mut nalu_reader = BytesReader::new(nalu);
57        /* NALU header
58        0               1
59        0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
60        +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
61        |F|    Type   |  LayerId  | TID |
62        +-------------+-----------------+
63
64        Forbidden zero(F) : 1 bit
65        NAL unit type(Type) : 6 bits
66        NUH layer ID(LayerId) : 6 bits
67        NUH temporal ID plus 1 (TID) : 3 bits
68        */
69        let nalu_header_1st_byte = nalu_reader.read_u8()?;
70        let nalu_header_2nd_byte = nalu_reader.read_u8()?;
71
72        /* The PayloadHdr needs replace Type with the FU type value(49) */
73        let payload_hdr: u16 = ((nalu_header_1st_byte as u16 & 0x81) | ((define::FU as u16) << 1))
74            << 8
75            | nalu_header_2nd_byte as u16;
76        /* FU header
77        +---------------+
78        |0|1|2|3|4|5|6|7|
79        +-+-+-+-+-+-+-+-+
80        |S|E|   FuType  |
81        +---------------+
82        */
83        /*set FuType from NALU header's Type */
84        let mut fu_header = (nalu_header_1st_byte >> 1) & 0x3F | define::FU_START;
85
86        let mut left_nalu_bytes: usize = nalu_reader.len();
87        let mut fu_payload_len: usize;
88
89        while left_nalu_bytes > 0 {
90            /* 3 = PayloadHdr(2 bytes) + FU header(1 byte) */
91            if left_nalu_bytes + define::RTP_FIXED_HEADER_LEN <= self.mtu - 3 {
92                fu_header = (nalu_header_1st_byte & 0x1F) | define::FU_END;
93                fu_payload_len = left_nalu_bytes;
94            } else {
95                fu_payload_len = self.mtu - define::RTP_FIXED_HEADER_LEN - 3;
96            }
97
98            let fu_payload = nalu_reader.read_bytes(fu_payload_len)?;
99
100            let mut packet = RtpPacket::new(self.header.clone());
101            packet.payload.put_u16(payload_hdr);
102            packet.payload.put_u8(fu_header);
103            packet.payload.put(fu_payload);
104            packet.header.marker = if fu_header & define::FU_END > 0 { 1 } else { 0 };
105
106            if fu_header & define::FU_START > 0 {
107                fu_header &= 0x7F
108            }
109
110            if let Some(f) = &self.on_packet_for_rtcp_handler {
111                f(packet.clone()).await;
112            }
113
114            if let Some(f) = &self.on_packet_handler {
115                f(self.io.clone(), packet).await?;
116            }
117            left_nalu_bytes = nalu_reader.len();
118            self.header.seq_number += 1;
119        }
120
121        Ok(())
122    }
123    pub async fn pack_single(&mut self, nalu: BytesMut) -> Result<(), PackerError> {
124        let mut packet = RtpPacket::new(self.header.clone());
125        packet.header.marker = 1;
126        packet.payload.put(nalu);
127
128        self.header.seq_number += 1;
129
130        if let Some(f) = &self.on_packet_for_rtcp_handler {
131            f(packet.clone()).await;
132        }
133
134        if let Some(f) = &self.on_packet_handler {
135            return f(self.io.clone(), packet).await;
136        }
137        Ok(())
138    }
139}
140
141#[async_trait]
142impl TPacker for RtpH265Packer {
143    async fn pack(&mut self, nalus: &mut BytesMut, timestamp: u32) -> Result<(), PackerError> {
144        self.header.timestamp = timestamp;
145        utils::split_annexb_and_process(nalus, self).await?;
146        Ok(())
147    }
148    fn on_packet_handler(&mut self, f: OnRtpPacketFn) {
149        self.on_packet_handler = Some(f);
150    }
151}
152
153impl TRtpReceiverForRtcp for RtpH265Packer {
154    fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) {
155        self.on_packet_for_rtcp_handler = Some(f);
156    }
157}
158
159#[async_trait]
160impl TVideoPacker for RtpH265Packer {
161    async fn pack_nalu(&mut self, nalu: BytesMut) -> Result<(), PackerError> {
162        if nalu.len() + define::RTP_FIXED_HEADER_LEN <= self.mtu {
163            self.pack_single(nalu).await?;
164        } else {
165            self.pack_fu(nalu).await?;
166        }
167        Ok(())
168    }
169}
170
171#[derive(Default)]
172pub struct RtpH265UnPacker {
173    sequence_number: u16,
174    timestamp: u32,
175    fu_buffer: BytesMut,
176    using_donl_field: bool,
177    on_frame_handler: Option<OnFrameFn>,
178    on_packet_for_rtcp_handler: Option<OnRtpPacketFn2>,
179}
180
181#[async_trait]
182impl TUnPacker for RtpH265UnPacker {
183    async fn unpack(&mut self, reader: &mut BytesReader) -> Result<(), UnPackerError> {
184        let rtp_packet = RtpPacket::unmarshal(reader)?;
185
186        if let Some(f) = &self.on_packet_for_rtcp_handler {
187            f(rtp_packet.clone()).await;
188        }
189
190        self.timestamp = rtp_packet.header.timestamp;
191        self.sequence_number = rtp_packet.header.seq_number;
192
193        if let Some(packet_type) = rtp_packet.payload.first() {
194            match *packet_type >> 1 & 0x3F {
195                define::FU => {
196                    return self.unpack_fu(rtp_packet.payload.clone());
197                }
198                define::AP => {
199                    return self.unpack_ap(rtp_packet.payload);
200                }
201                define::PACI => return Ok(()),
202
203                _ => {
204                    return self.unpack_single(rtp_packet.payload.clone());
205                }
206            }
207        }
208
209        Ok(())
210    }
211
212    fn on_frame_handler(&mut self, f: OnFrameFn) {
213        self.on_frame_handler = Some(f);
214    }
215}
216
217impl RtpH265UnPacker {
218    pub fn new() -> Self {
219        RtpH265UnPacker::default()
220    }
221
222    fn unpack_single(&mut self, payload: BytesMut) -> Result<(), UnPackerError> {
223        let mut annexb_payload = BytesMut::new();
224        annexb_payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE);
225        annexb_payload.put(payload);
226
227        if let Some(f) = &self.on_frame_handler {
228            f(FrameData::Video {
229                timestamp: self.timestamp,
230                data: annexb_payload,
231            })?;
232        }
233        Ok(())
234    }
235
236    /*
237     0               1               2               3
238     0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
239    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
240    |                          RTP Header                           |
241    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
242    |      PayloadHdr (Type=48)     |           NALU 1 DONL         |
243    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
244    |           NALU 1 Size         |            NALU 1 HDR         |
245    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
246    |                                                               |
247    |                         NALU 1 Data . . .                     |
248    |                                                               |
249    +     . . .     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
250    |               |  NALU 2 DOND  |            NALU 2 Size        |
251    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
252    |          NALU 2 HDR           |                               |
253    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+            NALU 2 Data        |
254    |                                                               |
255    |         . . .                 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
256    |                               :    ...OPTIONAL RTP padding    |
257    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
258    */
259
260    fn unpack_ap(&mut self, rtp_payload: BytesMut) -> Result<(), UnPackerError> {
261        let mut payload_reader = BytesReader::new(rtp_payload);
262        /*read PayloadHdr*/
263        payload_reader.read_bytes(2)?;
264
265        while !payload_reader.is_empty() {
266            if self.using_donl_field {
267                /*read DONL*/
268                payload_reader.read_bytes(2)?;
269            }
270            /*read NALU Size*/
271            let nalu_len = payload_reader.read_u16::<BigEndian>()? as usize;
272            /*read NALU HDR + Data */
273            let nalu = payload_reader.read_bytes(nalu_len)?;
274
275            let mut payload = BytesMut::new();
276            payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE);
277            payload.put(nalu);
278
279            if let Some(f) = &self.on_frame_handler {
280                f(FrameData::Video {
281                    timestamp: self.timestamp,
282                    data: payload,
283                })?;
284            }
285        }
286
287        Ok(())
288    }
289
290    /*
291    0               1
292    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
293    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
294    |F|    Type   |  LayerId  | TID |
295    +-------------+-----------------+
296
297    Forbidden zero(F) : 1 bit
298    NAL unit type(Type) : 6 bits
299    NUH layer ID(LayerId) : 6 bits
300    NUH temporal ID plus 1 (TID) : 3 bits
301    */
302
303    /*
304     0               1               2               3
305     0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
306    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
307    |     PayloadHdr (Type=49)      |    FU header  |  DONL (cond)  |
308    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-|
309    |  DONL (cond)  |                                               |
310    |-+-+-+-+-+-+-+-+                                               |
311    |                           FU payload                          |
312    |                                                               |
313    |                               +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
314    |                               :    ...OPTIONAL RTP padding    |
315    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
316    /* FU header */
317    +---------------+
318    |0|1|2|3|4|5|6|7|
319    +-+-+-+-+-+-+-+-+
320    |S|E|   FuType  |
321    +---------------+
322    */
323    fn unpack_fu(&mut self, rtp_payload: BytesMut) -> Result<(), UnPackerError> {
324        let mut payload_reader = BytesReader::new(rtp_payload);
325        let payload_header_1st_byte = payload_reader.read_u8()?;
326        let payload_header_2nd_byte = payload_reader.read_u8()?;
327        let fu_header = payload_reader.read_u8()?;
328        if self.using_donl_field {
329            payload_reader.read_bytes(2)?;
330        }
331
332        if utils::is_fu_start(fu_header) {
333            /*set NAL UNIT type 2 bytes */
334            //replace Type of PayloadHdr with the FuType of FU header
335            let nal_1st_byte = (payload_header_1st_byte & 0x81) | ((fu_header & 0x3F) << 1);
336            self.fu_buffer.put_u8(nal_1st_byte);
337            self.fu_buffer.put_u8(payload_header_2nd_byte);
338        }
339
340        self.fu_buffer.put(payload_reader.extract_remaining_bytes());
341
342        if utils::is_fu_end(fu_header) {
343            let mut payload = BytesMut::new();
344            payload.extend_from_slice(&define::ANNEXB_NALU_START_CODE);
345            payload.put(self.fu_buffer.clone());
346            self.fu_buffer.clear();
347
348            if let Some(f) = &self.on_frame_handler {
349                f(FrameData::Video {
350                    timestamp: self.timestamp,
351                    data: payload,
352                })?;
353            }
354        }
355
356        Ok(())
357    }
358}
359
360impl TRtpReceiverForRtcp for RtpH265UnPacker {
361    fn on_packet_for_rtcp_handler(&mut self, f: OnRtpPacketFn2) {
362        self.on_packet_for_rtcp_handler = Some(f);
363    }
364}