rtc_rtp/
header.rs

1use shared::{
2    error::{Error, Result},
3    marshal::{Marshal, MarshalSize, Unmarshal},
4};
5
6use bytes::{Buf, BufMut, Bytes};
7
8pub const HEADER_LENGTH: usize = 4;
9pub const VERSION_SHIFT: u8 = 6;
10pub const VERSION_MASK: u8 = 0x3;
11pub const PADDING_SHIFT: u8 = 5;
12pub const PADDING_MASK: u8 = 0x1;
13pub const EXTENSION_SHIFT: u8 = 4;
14pub const EXTENSION_MASK: u8 = 0x1;
15pub const EXTENSION_PROFILE_ONE_BYTE: u16 = 0xBEDE;
16pub const EXTENSION_PROFILE_TWO_BYTE: u16 = 0x1000;
17pub const EXTENSION_ID_RESERVED: u8 = 0xF;
18pub const CC_MASK: u8 = 0xF;
19pub const MARKER_SHIFT: u8 = 7;
20pub const MARKER_MASK: u8 = 0x1;
21pub const PT_MASK: u8 = 0x7F;
22pub const SEQ_NUM_OFFSET: usize = 2;
23pub const SEQ_NUM_LENGTH: usize = 2;
24pub const TIMESTAMP_OFFSET: usize = 4;
25pub const TIMESTAMP_LENGTH: usize = 4;
26pub const SSRC_OFFSET: usize = 8;
27pub const SSRC_LENGTH: usize = 4;
28pub const CSRC_OFFSET: usize = 12;
29pub const CSRC_LENGTH: usize = 4;
30
31#[derive(Debug, Eq, PartialEq, Default, Clone)]
32pub struct Extension {
33    pub id: u8,
34    pub payload: Bytes,
35}
36
37/// Header represents an RTP packet header
38/// NOTE: PayloadOffset is populated by Marshal/Unmarshal and should not be modified
39#[derive(Debug, Eq, PartialEq, Default, Clone)]
40pub struct Header {
41    pub version: u8,
42    pub padding: bool,
43    pub extension: bool,
44    pub marker: bool,
45    pub payload_type: u8,
46    pub sequence_number: u16,
47    pub timestamp: u32,
48    pub ssrc: u32,
49    pub csrc: Vec<u32>,
50    pub extension_profile: u16,
51    pub extensions: Vec<Extension>,
52}
53
54impl Unmarshal for Header {
55    /// Unmarshal parses the passed byte slice and stores the result in the Header this method is called upon
56    fn unmarshal<B>(raw_packet: &mut B) -> Result<Self>
57    where
58        Self: Sized,
59        B: Buf,
60    {
61        let raw_packet_len = raw_packet.remaining();
62        if raw_packet_len < HEADER_LENGTH {
63            return Err(Error::ErrHeaderSizeInsufficient);
64        }
65        /*
66         *  0                   1                   2                   3
67         *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
68         * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
69         * |V=2|P|X|  CC   |M|     PT      |       sequence number         |
70         * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
71         * |                           timestamp                           |
72         * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
73         * |           synchronization source (SSRC) identifier            |
74         * +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
75         * |            contributing source (CSRC) identifiers             |
76         * |                             ....                              |
77         * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
78         */
79        let b0 = raw_packet.get_u8();
80        let version = b0 >> VERSION_SHIFT & VERSION_MASK;
81        let padding = (b0 >> PADDING_SHIFT & PADDING_MASK) > 0;
82        let extension = (b0 >> EXTENSION_SHIFT & EXTENSION_MASK) > 0;
83        let cc = (b0 & CC_MASK) as usize;
84
85        let mut curr_offset = CSRC_OFFSET + (cc * CSRC_LENGTH);
86        if raw_packet_len < curr_offset {
87            return Err(Error::ErrHeaderSizeInsufficient);
88        }
89
90        let b1 = raw_packet.get_u8();
91        let marker = (b1 >> MARKER_SHIFT & MARKER_MASK) > 0;
92        let payload_type = b1 & PT_MASK;
93
94        let sequence_number = raw_packet.get_u16();
95        let timestamp = raw_packet.get_u32();
96        let ssrc = raw_packet.get_u32();
97
98        let mut csrc = Vec::with_capacity(cc);
99        for _ in 0..cc {
100            csrc.push(raw_packet.get_u32());
101        }
102
103        let (extension_profile, extensions) = if extension {
104            let expected = curr_offset + 4;
105            if raw_packet_len < expected {
106                return Err(Error::ErrHeaderSizeInsufficientForExtension);
107            }
108            let extension_profile = raw_packet.get_u16();
109            curr_offset += 2;
110            let extension_length = raw_packet.get_u16() as usize * 4;
111            curr_offset += 2;
112
113            let expected = curr_offset + extension_length;
114            if raw_packet_len < expected {
115                return Err(Error::ErrHeaderSizeInsufficientForExtension);
116            }
117
118            let mut extensions = vec![];
119            match extension_profile {
120                // RFC 8285 RTP One Byte Header Extension
121                EXTENSION_PROFILE_ONE_BYTE => {
122                    let end = curr_offset + extension_length;
123                    while curr_offset < end {
124                        let b = raw_packet.get_u8();
125                        if b == 0x00 {
126                            // padding
127                            curr_offset += 1;
128                            continue;
129                        }
130
131                        let extid = b >> 4;
132                        let len = ((b & (0xFF ^ 0xF0)) + 1) as usize;
133                        curr_offset += 1;
134
135                        if extid == EXTENSION_ID_RESERVED {
136                            break;
137                        }
138
139                        extensions.push(Extension {
140                            id: extid,
141                            payload: raw_packet.copy_to_bytes(len),
142                        });
143                        curr_offset += len;
144                    }
145                }
146                // RFC 8285 RTP Two Byte Header Extension
147                EXTENSION_PROFILE_TWO_BYTE => {
148                    let end = curr_offset + extension_length;
149                    while curr_offset < end {
150                        let b = raw_packet.get_u8();
151                        if b == 0x00 {
152                            // padding
153                            curr_offset += 1;
154                            continue;
155                        }
156
157                        let extid = b;
158                        curr_offset += 1;
159
160                        let len = raw_packet.get_u8() as usize;
161                        curr_offset += 1;
162
163                        extensions.push(Extension {
164                            id: extid,
165                            payload: raw_packet.copy_to_bytes(len),
166                        });
167                        curr_offset += len;
168                    }
169                }
170                // RFC3550 Extension
171                _ => {
172                    if raw_packet_len < curr_offset + extension_length {
173                        return Err(Error::ErrHeaderSizeInsufficientForExtension);
174                    }
175                    extensions.push(Extension {
176                        id: 0,
177                        payload: raw_packet.copy_to_bytes(extension_length),
178                    });
179                }
180            };
181
182            (extension_profile, extensions)
183        } else {
184            (0, vec![])
185        };
186
187        Ok(Header {
188            version,
189            padding,
190            extension,
191            marker,
192            payload_type,
193            sequence_number,
194            timestamp,
195            ssrc,
196            csrc,
197            extension_profile,
198            extensions,
199        })
200    }
201}
202
203impl MarshalSize for Header {
204    /// MarshalSize returns the size of the packet once marshaled.
205    fn marshal_size(&self) -> usize {
206        let mut head_size = 12 + (self.csrc.len() * CSRC_LENGTH);
207        if self.extension {
208            let extension_payload_len = self.get_extension_payload_len();
209            let extension_payload_size = (extension_payload_len + 3) / 4;
210            head_size += 4 + extension_payload_size * 4;
211        }
212        head_size
213    }
214}
215
216impl Marshal for Header {
217    /// Marshal serializes the header and writes to the buffer.
218    fn marshal_to(&self, mut buf: &mut [u8]) -> Result<usize> {
219        /*
220         *  0                   1                   2                   3
221         *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
222         * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
223         * |V=2|P|X|  CC   |M|     PT      |       sequence number         |
224         * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
225         * |                           timestamp                           |
226         * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
227         * |           synchronization source (SSRC) identifier            |
228         * +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
229         * |            contributing source (CSRC) identifiers             |
230         * |                             ....                              |
231         * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
232         */
233        let remaining_before = buf.remaining_mut();
234        if remaining_before < self.marshal_size() {
235            return Err(Error::ErrBufferTooSmall);
236        }
237
238        // The first byte contains the version, padding bit, extension bit, and csrc size
239        let mut b0 = (self.version << VERSION_SHIFT) | self.csrc.len() as u8;
240        if self.padding {
241            b0 |= 1 << PADDING_SHIFT;
242        }
243
244        if self.extension {
245            b0 |= 1 << EXTENSION_SHIFT;
246        }
247        buf.put_u8(b0);
248
249        // The second byte contains the marker bit and payload type.
250        let mut b1 = self.payload_type;
251        if self.marker {
252            b1 |= 1 << MARKER_SHIFT;
253        }
254        buf.put_u8(b1);
255
256        buf.put_u16(self.sequence_number);
257        buf.put_u32(self.timestamp);
258        buf.put_u32(self.ssrc);
259
260        for csrc in &self.csrc {
261            buf.put_u32(*csrc);
262        }
263
264        if self.extension {
265            buf.put_u16(self.extension_profile);
266
267            // calculate extensions size and round to 4 bytes boundaries
268            let extension_payload_len = self.get_extension_payload_len();
269            if self.extension_profile != EXTENSION_PROFILE_ONE_BYTE
270                && self.extension_profile != EXTENSION_PROFILE_TWO_BYTE
271                && extension_payload_len % 4 != 0
272            {
273                //the payload must be in 32-bit words.
274                return Err(Error::HeaderExtensionPayloadNot32BitWords);
275            }
276            let extension_payload_size = (extension_payload_len as u16 + 3) / 4;
277            buf.put_u16(extension_payload_size);
278
279            match self.extension_profile {
280                // RFC 8285 RTP One Byte Header Extension
281                EXTENSION_PROFILE_ONE_BYTE => {
282                    for extension in &self.extensions {
283                        buf.put_u8((extension.id << 4) | (extension.payload.len() as u8 - 1));
284                        buf.put(&*extension.payload);
285                    }
286                }
287                // RFC 8285 RTP Two Byte Header Extension
288                EXTENSION_PROFILE_TWO_BYTE => {
289                    for extension in &self.extensions {
290                        buf.put_u8(extension.id);
291                        buf.put_u8(extension.payload.len() as u8);
292                        buf.put(&*extension.payload);
293                    }
294                }
295                // RFC3550 Extension
296                _ => {
297                    if self.extensions.len() != 1 {
298                        return Err(Error::ErrRfc3550headerIdrange);
299                    }
300
301                    if let Some(extension) = self.extensions.first() {
302                        let ext_len = extension.payload.len();
303                        if ext_len % 4 != 0 {
304                            return Err(Error::HeaderExtensionPayloadNot32BitWords);
305                        }
306                        buf.put(&*extension.payload);
307                    }
308                }
309            };
310
311            // add padding to reach 4 bytes boundaries
312            for _ in extension_payload_len..extension_payload_size as usize * 4 {
313                buf.put_u8(0);
314            }
315        }
316
317        let remaining_after = buf.remaining_mut();
318        Ok(remaining_before - remaining_after)
319    }
320}
321
322impl Header {
323    pub fn get_extension_payload_len(&self) -> usize {
324        let payload_len: usize = self
325            .extensions
326            .iter()
327            .map(|extension| extension.payload.len())
328            .sum();
329
330        let profile_len = self.extensions.len()
331            * match self.extension_profile {
332                EXTENSION_PROFILE_ONE_BYTE => 1,
333                EXTENSION_PROFILE_TWO_BYTE => 2,
334                _ => 0,
335            };
336
337        payload_len + profile_len
338    }
339
340    /// SetExtension sets an RTP header extension
341    pub fn set_extension(&mut self, id: u8, payload: Bytes) -> Result<()> {
342        if self.extension {
343            match self.extension_profile {
344                EXTENSION_PROFILE_ONE_BYTE => {
345                    if !(1..=14).contains(&id) {
346                        return Err(Error::ErrRfc8285oneByteHeaderIdrange);
347                    }
348                    if payload.len() > 16 {
349                        return Err(Error::ErrRfc8285oneByteHeaderSize);
350                    }
351                }
352                EXTENSION_PROFILE_TWO_BYTE => {
353                    if id < 1 {
354                        return Err(Error::ErrRfc8285twoByteHeaderIdrange);
355                    }
356                    if payload.len() > 255 {
357                        return Err(Error::ErrRfc8285twoByteHeaderSize);
358                    }
359                }
360                _ => {
361                    if id != 0 {
362                        return Err(Error::ErrRfc3550headerIdrange);
363                    }
364                }
365            };
366
367            // Update existing if it exists else add new extension
368            if let Some(extension) = self
369                .extensions
370                .iter_mut()
371                .find(|extension| extension.id == id)
372            {
373                extension.payload = payload;
374            } else {
375                self.extensions.push(Extension { id, payload });
376            }
377        } else {
378            // No existing header extensions
379            self.extension = true;
380
381            self.extension_profile = match payload.len() {
382                0..=16 => EXTENSION_PROFILE_ONE_BYTE,
383                17..=255 => EXTENSION_PROFILE_TWO_BYTE,
384                _ => self.extension_profile,
385            };
386
387            self.extensions.push(Extension { id, payload });
388        }
389        Ok(())
390    }
391
392    /// returns an extension id array
393    pub fn get_extension_ids(&self) -> Vec<u8> {
394        if self.extension {
395            self.extensions.iter().map(|e| e.id).collect()
396        } else {
397            vec![]
398        }
399    }
400
401    /// returns an RTP header extension
402    pub fn get_extension(&self, id: u8) -> Option<Bytes> {
403        if self.extension {
404            self.extensions
405                .iter()
406                .find(|extension| extension.id == id)
407                .map(|extension| extension.payload.clone())
408        } else {
409            None
410        }
411    }
412
413    /// Removes an RTP Header extension
414    pub fn del_extension(&mut self, id: u8) -> Result<()> {
415        if self.extension {
416            if let Some(index) = self
417                .extensions
418                .iter()
419                .position(|extension| extension.id == id)
420            {
421                self.extensions.remove(index);
422                Ok(())
423            } else {
424                Err(Error::ErrHeaderExtensionNotFound)
425            }
426        } else {
427            Err(Error::ErrHeaderExtensionsNotEnabled)
428        }
429    }
430}