Skip to main content

msf_rtp/
rtp.rs

1use std::{borrow::Borrow, ops::Deref, time::Instant};
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use zerocopy::{
5    byteorder::network_endian::{U16, U32},
6    FromBytes, Immutable, IntoBytes, KnownLayout, SizeError, Unaligned,
7};
8
9use crate::InvalidInput;
10
11/// Helper struct.
12#[derive(Copy, Clone, KnownLayout, Immutable, Unaligned, IntoBytes, FromBytes)]
13#[repr(C)]
14struct RawRtpHeader {
15    options: U16,
16    sequence_number: U16,
17    timestamp: U32,
18    ssrc: U32,
19}
20
21/// RTP header.
22#[derive(Clone)]
23pub struct RtpHeader {
24    options: u16,
25    sequence_number: u16,
26    timestamp: u32,
27    ssrc: u32,
28    csrcs: Vec<u32>,
29    extension: Option<RtpHeaderExtension>,
30}
31
32impl RtpHeader {
33    /// Create a new RTP header.
34    #[inline]
35    pub const fn new() -> Self {
36        Self {
37            options: 2 << 14,
38            sequence_number: 0,
39            timestamp: 0,
40            ssrc: 0,
41            csrcs: Vec::new(),
42            extension: None,
43        }
44    }
45
46    /// Decode an RTP header from given data.
47    pub fn decode(data: &mut Bytes) -> Result<Self, InvalidInput> {
48        let mut buffer = data.clone();
49
50        let (raw, _) = RawRtpHeader::ref_from_prefix(&buffer)
51            .map_err(SizeError::from)
52            .map_err(|_| InvalidInput::new())?;
53
54        let mut res = Self {
55            options: raw.options.get(),
56            sequence_number: raw.sequence_number.get(),
57            timestamp: raw.timestamp.get(),
58            ssrc: raw.ssrc.get(),
59            csrcs: Vec::new(),
60            extension: None,
61        };
62
63        buffer.advance(std::mem::size_of::<RawRtpHeader>());
64
65        if (res.options >> 14) != 2 {
66            return Err(InvalidInput::new());
67        }
68
69        let csrc_count = ((res.options >> 8) & 0xf) as usize;
70
71        let (csrcs, _) = <[U32]>::ref_from_prefix_with_elems(&buffer, csrc_count)
72            .map_err(SizeError::from)
73            .map_err(|_| InvalidInput::new())?;
74
75        res.csrcs = csrcs.iter().copied().map(U32::get).collect();
76
77        buffer.advance(csrc_count << 2);
78
79        if (res.options & 0x1000) != 0 {
80            res.extension = Some(RtpHeaderExtension::decode(&mut buffer)?);
81        }
82
83        *data = buffer;
84
85        Ok(res)
86    }
87
88    /// Encode the header.
89    pub fn encode(&self, buf: &mut BytesMut) {
90        buf.reserve(self.raw_size());
91
92        let raw = RawRtpHeader {
93            options: U16::new(self.options),
94            sequence_number: U16::new(self.sequence_number),
95            timestamp: U32::new(self.timestamp),
96            ssrc: U32::new(self.ssrc),
97        };
98
99        buf.extend_from_slice(raw.as_bytes());
100
101        for csrc in &self.csrcs {
102            buf.put_u32(*csrc);
103        }
104
105        if let Some(extension) = self.extension.as_ref() {
106            extension.encode(buf);
107        }
108    }
109
110    /// Check if the RTP packet contains any padding.
111    #[inline]
112    pub fn padding(&self) -> bool {
113        (self.options & 0x2000) != 0
114    }
115
116    /// Set the padding bit.
117    #[inline]
118    pub fn with_padding(mut self, padding: bool) -> Self {
119        self.options &= !0x2000;
120        self.options |= (padding as u16) << 13;
121        self
122    }
123
124    /// Check if there is an RTP header extension.
125    #[inline]
126    pub fn extension(&self) -> Option<&RtpHeaderExtension> {
127        self.extension.as_ref()
128    }
129
130    /// Set the extension bit.
131    #[inline]
132    pub fn with_extension(mut self, extension: Option<RtpHeaderExtension>) -> Self {
133        self.options &= !0x1000;
134        self.options |= (extension.is_some() as u16) << 12;
135        self.extension = extension;
136        self
137    }
138
139    /// Check if the RTP marker bit is set.
140    #[inline]
141    pub fn marker(&self) -> bool {
142        (self.options & 0x0080) != 0
143    }
144
145    /// Set the marker bit.
146    #[inline]
147    pub fn with_marker(mut self, marker: bool) -> Self {
148        self.options &= !0x0080;
149        self.options |= (marker as u16) << 7;
150        self
151    }
152
153    /// Get RTP payload type.
154    ///
155    /// Note: Only the lower 7 bits are used.
156    #[inline]
157    pub fn payload_type(&self) -> u8 {
158        (self.options & 0x7f) as u8
159    }
160
161    /// Set the payload type.
162    ///
163    /// # Panics
164    /// The method panics if the payload type is greater than 127.
165    #[inline]
166    pub fn with_payload_type(mut self, payload_type: u8) -> Self {
167        assert!(payload_type < 128);
168
169        self.options &= !0x7f;
170        self.options |= (payload_type & 0x7f) as u16;
171        self
172    }
173
174    /// Get RTP sequence number.
175    #[inline]
176    pub fn sequence_number(&self) -> u16 {
177        self.sequence_number
178    }
179
180    /// Set the sequence number.
181    #[inline]
182    pub fn with_sequence_number(mut self, n: u16) -> Self {
183        self.sequence_number = n;
184        self
185    }
186
187    /// Get RTP timestamp.
188    #[inline]
189    pub fn timestamp(&self) -> u32 {
190        self.timestamp
191    }
192
193    /// Set RTP timestamp.
194    #[inline]
195    pub fn with_timestamp(mut self, timestamp: u32) -> Self {
196        self.timestamp = timestamp;
197        self
198    }
199
200    /// Get the SSRC identifier.
201    #[inline]
202    pub fn ssrc(&self) -> u32 {
203        self.ssrc
204    }
205
206    /// Set the SSRC identifier.
207    #[inline]
208    pub fn with_ssrc(mut self, ssrc: u32) -> Self {
209        self.ssrc = ssrc;
210        self
211    }
212
213    /// Get a list of CSRC identifiers.
214    #[inline]
215    pub fn csrcs(&self) -> &[u32] {
216        &self.csrcs
217    }
218
219    /// Set the CSRC identifiers.
220    ///
221    /// # Panics
222    /// The method panics if the number of identifiers is greater than 255.
223    pub fn with_csrcs<T>(mut self, csrcs: T) -> Self
224    where
225        T: Into<Vec<u32>>,
226    {
227        let csrcs = csrcs.into();
228
229        assert!(csrcs.len() <= 0xf);
230
231        self.csrcs = csrcs;
232        self.options &= !0xf00;
233        self.options |= (self.csrcs.len() as u16) << 8;
234        self
235    }
236
237    /// Get raw size of the header (i.e. byte length of the encoded header).
238    pub fn raw_size(&self) -> usize {
239        std::mem::size_of::<RawRtpHeader>()
240            + (self.csrcs.len() << 2)
241            + self.extension.as_ref().map(|e| e.raw_size()).unwrap_or(0)
242    }
243}
244
245impl Default for RtpHeader {
246    #[inline]
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252/// Helper struct.
253#[derive(Copy, Clone, KnownLayout, Immutable, Unaligned, IntoBytes, FromBytes)]
254#[repr(C)]
255struct RawHeaderExtension {
256    misc: U16,
257    length: U16,
258}
259
260/// RTP header extension.
261#[derive(Clone)]
262pub struct RtpHeaderExtension {
263    misc: u16,
264    data: Bytes,
265}
266
267impl RtpHeaderExtension {
268    /// Create a new header extension.
269    #[inline]
270    pub const fn new() -> Self {
271        Self {
272            misc: 0,
273            data: Bytes::new(),
274        }
275    }
276
277    /// Decode RTP header extension from given data.
278    pub fn decode(data: &mut Bytes) -> Result<Self, InvalidInput> {
279        let mut buffer = data.clone();
280
281        let (raw, _) = RawHeaderExtension::ref_from_prefix(&buffer)
282            .map_err(SizeError::from)
283            .map_err(|_| InvalidInput::new())?;
284
285        let extension_length = (raw.length.get() as usize) << 2;
286        let misc = raw.misc.get();
287
288        buffer.advance(std::mem::size_of::<RawHeaderExtension>());
289
290        if buffer.len() < extension_length {
291            return Err(InvalidInput::new());
292        }
293
294        let res = Self {
295            misc,
296            data: buffer.split_to(extension_length),
297        };
298
299        *data = buffer;
300
301        Ok(res)
302    }
303
304    /// Encode the header extension.
305    pub fn encode(&self, buf: &mut BytesMut) {
306        buf.reserve(self.raw_size());
307
308        let length = (self.data.len() >> 2) as u16;
309
310        let raw = RawHeaderExtension {
311            misc: U16::new(self.misc),
312            length: U16::new(length),
313        };
314
315        buf.extend_from_slice(raw.as_bytes());
316        buf.extend_from_slice(&self.data);
317    }
318
319    /// Get the first 16 bits of the header extension.
320    #[inline]
321    pub fn misc(&self) -> u16 {
322        self.misc
323    }
324
325    /// Set the first 16 bits of the header extension.
326    #[inline]
327    pub fn with_misc(mut self, misc: u16) -> Self {
328        self.misc = misc;
329        self
330    }
331
332    /// Get header extension data.
333    #[inline]
334    pub fn data(&self) -> &Bytes {
335        &self.data
336    }
337
338    /// Set the extension data.
339    ///
340    /// # Panics
341    /// The method panics if the length of the data is not a multiple of four
342    /// or if the length is greater than 262140.
343    #[inline]
344    pub fn with_data(mut self, data: Bytes) -> Self {
345        assert_eq!(data.len() & 3, 0);
346
347        let words = data.len() >> 2;
348
349        assert!(words <= (u16::MAX as usize));
350
351        self.data = data;
352        self
353    }
354
355    /// Get raw size of the header extension (i.e. byte length of the encoded
356    /// header extension).
357    #[inline]
358    pub fn raw_size(&self) -> usize {
359        std::mem::size_of::<RawHeaderExtension>() + self.data.len()
360    }
361}
362
363impl Default for RtpHeaderExtension {
364    #[inline]
365    fn default() -> Self {
366        Self::new()
367    }
368}
369
370/// RTP packet.
371#[derive(Clone)]
372pub struct RtpPacket {
373    header: RtpHeader,
374    payload: Bytes,
375}
376
377impl RtpPacket {
378    /// Create a new RTP packet.
379    #[inline]
380    pub const fn new() -> Self {
381        Self {
382            header: RtpHeader::new(),
383            payload: Bytes::new(),
384        }
385    }
386
387    /// Create a new RTP packets from given parts.
388    pub fn from_parts(header: RtpHeader, payload: Bytes) -> Result<Self, InvalidInput> {
389        if header.padding() {
390            let padding_len = payload.last().copied().ok_or_else(InvalidInput::new)? as usize;
391
392            if padding_len == 0 || payload.len() < padding_len {
393                return Err(InvalidInput::new());
394            }
395        }
396
397        let res = Self { header, payload };
398
399        Ok(res)
400    }
401
402    /// Deconstruct the packet into its parts.
403    #[inline]
404    pub fn deconstruct(self) -> (RtpHeader, Bytes) {
405        (self.header, self.payload)
406    }
407
408    /// Decode RTP packet from given data frame.
409    pub fn decode(mut frame: Bytes) -> Result<Self, InvalidInput> {
410        let header = RtpHeader::decode(&mut frame)?;
411
412        let payload = frame;
413
414        Self::from_parts(header, payload)
415    }
416
417    /// Encode the packet.
418    pub fn encode(&self, buf: &mut BytesMut) {
419        buf.reserve(self.raw_size());
420
421        self.header.encode(buf);
422
423        buf.extend_from_slice(&self.payload);
424    }
425
426    /// Get the RTP header.
427    #[inline]
428    pub fn header(&self) -> &RtpHeader {
429        &self.header
430    }
431
432    /// Get the marker bit value.
433    #[inline]
434    pub fn marker(&self) -> bool {
435        self.header.marker()
436    }
437
438    /// Set the marker bit.
439    #[inline]
440    pub fn with_marker(mut self, marker: bool) -> Self {
441        self.header = self.header.with_marker(marker);
442        self
443    }
444
445    /// Get the payload type.
446    ///
447    /// Note: Only the lower 7 bits are used.
448    #[inline]
449    pub fn payload_type(&self) -> u8 {
450        self.header.payload_type()
451    }
452
453    /// Set the payload type.
454    ///
455    /// # Panics
456    /// The method panics if the payload type is greater than 127.
457    #[inline]
458    pub fn with_payload_type(mut self, payload_type: u8) -> Self {
459        self.header = self.header.with_payload_type(payload_type);
460        self
461    }
462
463    /// Get the RTP sequence number.
464    #[inline]
465    pub fn sequence_number(&self) -> u16 {
466        self.header.sequence_number()
467    }
468
469    /// Set the RTP sequence number.
470    #[inline]
471    pub fn with_sequence_number(mut self, sequence_number: u16) -> Self {
472        self.header = self.header.with_sequence_number(sequence_number);
473        self
474    }
475
476    /// Get the RTP timestamp.
477    #[inline]
478    pub fn timestamp(&self) -> u32 {
479        self.header.timestamp()
480    }
481
482    /// Set the RTP timestamp.
483    #[inline]
484    pub fn with_timestamp(mut self, timestamp: u32) -> Self {
485        self.header = self.header.with_timestamp(timestamp);
486        self
487    }
488
489    /// Get the SSRC identifier.
490    #[inline]
491    pub fn ssrc(&self) -> u32 {
492        self.header.ssrc()
493    }
494
495    /// Set the SSRC identifier.
496    #[inline]
497    pub fn with_ssrc(mut self, ssrc: u32) -> Self {
498        self.header = self.header.with_ssrc(ssrc);
499        self
500    }
501
502    /// Get the CSRC identifiers.
503    #[inline]
504    pub fn csrcs(&self) -> &[u32] {
505        self.header.csrcs()
506    }
507
508    /// Set the CSRC identifiers.
509    ///
510    /// # Panics
511    /// The method panics if the number of identifiers is greater than 255.
512    pub fn with_csrcs<T>(mut self, csrcs: T) -> Self
513    where
514        T: Into<Vec<u32>>,
515    {
516        self.header = self.header.with_csrcs(csrcs);
517        self
518    }
519
520    /// Get length of the optional padding.
521    ///
522    /// Zero means that the padding is not used at all.
523    #[inline]
524    pub fn padding(&self) -> u8 {
525        if self.header.padding() {
526            *self.payload.last().unwrap()
527        } else {
528            0
529        }
530    }
531
532    /// Get the packet payload including the optional padding.
533    #[inline]
534    pub fn payload(&self) -> &Bytes {
535        &self.payload
536    }
537
538    /// Get the packet payload without any padding.
539    #[inline]
540    pub fn stripped_payload(&self) -> Bytes {
541        let payload_len = self.payload.len();
542        let padding_len = self.padding() as usize;
543
544        let len = payload_len - padding_len;
545
546        self.payload.slice(..len)
547    }
548
549    /// Set the payload and add padding of a given length.
550    ///
551    /// If the padding is zero, no padding will be added and the padding bit in
552    /// the RTP header will be set to zero.
553    pub fn with_payload(mut self, payload: Bytes, padding: u8) -> Self {
554        if padding > 0 {
555            let len = payload.len() + (padding as usize);
556
557            let mut buffer = BytesMut::with_capacity(len);
558
559            buffer.extend_from_slice(&payload);
560            buffer.resize(len, 0);
561
562            buffer[len - 1] = padding;
563
564            self.header = self.header.with_padding(true);
565            self.payload = buffer.freeze();
566        } else {
567            self.header = self.header.with_padding(false);
568            self.payload = payload;
569        }
570
571        self
572    }
573
574    /// Set the payload that already includes padding.
575    ///
576    /// # Panics
577    /// The method panics if the given payload is empty, if the last byte is
578    /// zero or if the length of the padding is greater than the length of the
579    /// payload.
580    pub fn with_padded_payload(mut self, payload: Bytes) -> Self {
581        let padding_len = payload.last().copied().expect("empty payload") as usize;
582
583        assert!(padding_len > 0 && payload.len() >= padding_len);
584
585        self.header = self.header.with_padding(true);
586        self.payload = payload;
587        self
588    }
589
590    /// Get raw size of the packet (i.e. byte length of the encoded packet).
591    #[inline]
592    pub fn raw_size(&self) -> usize {
593        self.header.raw_size() + self.payload.len()
594    }
595}
596
597impl Default for RtpPacket {
598    #[inline]
599    fn default() -> Self {
600        Self::new()
601    }
602}
603
604/// RTP packet wrapper containing also the instant when the packet was
605/// received.
606#[derive(Clone)]
607pub struct IncomingRtpPacket {
608    inner: RtpPacket,
609    received_at: Instant,
610}
611
612impl IncomingRtpPacket {
613    /// Create a new incoming RTP packet.
614    #[inline]
615    pub const fn new(packet: RtpPacket, received_at: Instant) -> Self {
616        Self {
617            inner: packet,
618            received_at,
619        }
620    }
621
622    /// Get the instant when the packet was received.
623    #[inline]
624    pub fn received_at(&self) -> Instant {
625        self.received_at
626    }
627}
628
629impl AsRef<RtpPacket> for IncomingRtpPacket {
630    #[inline]
631    fn as_ref(&self) -> &RtpPacket {
632        &self.inner
633    }
634}
635
636impl Borrow<RtpPacket> for IncomingRtpPacket {
637    #[inline]
638    fn borrow(&self) -> &RtpPacket {
639        &self.inner
640    }
641}
642
643impl Deref for IncomingRtpPacket {
644    type Target = RtpPacket;
645
646    #[inline]
647    fn deref(&self) -> &Self::Target {
648        &self.inner
649    }
650}
651
652impl From<IncomingRtpPacket> for RtpPacket {
653    #[inline]
654    fn from(packet: IncomingRtpPacket) -> Self {
655        packet.inner
656    }
657}
658
659/// Ordered RTP packet.
660#[derive(Clone)]
661pub struct OrderedRtpPacket {
662    inner: IncomingRtpPacket,
663    index: u64,
664}
665
666impl OrderedRtpPacket {
667    /// Create a new ordered RTP packet.
668    #[inline]
669    pub const fn new(inner: IncomingRtpPacket, index: u64) -> Self {
670        Self { inner, index }
671    }
672
673    /// Get the estimated packet index (a.k.a. extended sequence number).
674    #[inline]
675    pub fn index(&self) -> u64 {
676        self.index
677    }
678}
679
680impl AsRef<RtpPacket> for OrderedRtpPacket {
681    #[inline]
682    fn as_ref(&self) -> &RtpPacket {
683        &self.inner
684    }
685}
686
687impl AsRef<IncomingRtpPacket> for OrderedRtpPacket {
688    #[inline]
689    fn as_ref(&self) -> &IncomingRtpPacket {
690        &self.inner
691    }
692}
693
694impl Borrow<RtpPacket> for OrderedRtpPacket {
695    #[inline]
696    fn borrow(&self) -> &RtpPacket {
697        &self.inner
698    }
699}
700
701impl Borrow<IncomingRtpPacket> for OrderedRtpPacket {
702    #[inline]
703    fn borrow(&self) -> &IncomingRtpPacket {
704        &self.inner
705    }
706}
707
708impl Deref for OrderedRtpPacket {
709    type Target = IncomingRtpPacket;
710
711    #[inline]
712    fn deref(&self) -> &Self::Target {
713        &self.inner
714    }
715}
716
717impl From<OrderedRtpPacket> for RtpPacket {
718    #[inline]
719    fn from(packet: OrderedRtpPacket) -> Self {
720        packet.inner.into()
721    }
722}
723
724impl From<OrderedRtpPacket> for IncomingRtpPacket {
725    #[inline]
726    fn from(packet: OrderedRtpPacket) -> Self {
727        packet.inner
728    }
729}