msf_rtp/
rtp.rs

1use std::{borrow::Borrow, ops::Deref, time::Instant};
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4
5use crate::InvalidInput;
6
7/// Helper struct.
8#[repr(C, packed)]
9struct RawRtpHeader {
10    options: u16,
11    sequence_number: u16,
12    timestamp: u32,
13    ssrc: u32,
14}
15
16/// RTP header.
17#[derive(Clone)]
18pub struct RtpHeader {
19    options: u16,
20    sequence_number: u16,
21    timestamp: u32,
22    ssrc: u32,
23    csrcs: Vec<u32>,
24    extension: Option<RtpHeaderExtension>,
25}
26
27impl RtpHeader {
28    /// Create a new RTP header.
29    #[inline]
30    pub const fn new() -> Self {
31        Self {
32            options: 2 << 14,
33            sequence_number: 0,
34            timestamp: 0,
35            ssrc: 0,
36            csrcs: Vec::new(),
37            extension: None,
38        }
39    }
40
41    /// Decode an RTP header from given data.
42    pub fn decode(data: &mut Bytes) -> Result<Self, InvalidInput> {
43        let mut buffer = data.clone();
44
45        if buffer.len() < std::mem::size_of::<RawRtpHeader>() {
46            return Err(InvalidInput::new());
47        }
48
49        let ptr = buffer.as_ptr() as *const RawRtpHeader;
50
51        let raw = unsafe { ptr.read_unaligned() };
52
53        let mut res = Self {
54            options: u16::from_be(raw.options),
55            sequence_number: u16::from_be(raw.sequence_number),
56            timestamp: u32::from_be(raw.timestamp),
57            ssrc: u32::from_be(raw.ssrc),
58            csrcs: Vec::new(),
59            extension: None,
60        };
61
62        buffer.advance(std::mem::size_of::<RawRtpHeader>());
63
64        if (res.options >> 14) != 2 {
65            return Err(InvalidInput::new());
66        }
67
68        let csrc_count = ((res.options >> 8) & 0xf) as usize;
69
70        if buffer.len() < (csrc_count << 2) {
71            return Err(InvalidInput::new());
72        }
73
74        res.csrcs = Vec::with_capacity(csrc_count);
75
76        for _ in 0..csrc_count {
77            res.csrcs.push(buffer.get_u32());
78        }
79
80        if (res.options & 0x1000) != 0 {
81            res.extension = Some(RtpHeaderExtension::decode(&mut buffer)?);
82        }
83
84        *data = buffer;
85
86        Ok(res)
87    }
88
89    /// Encode the header.
90    pub fn encode(&self, buf: &mut BytesMut) {
91        buf.reserve(self.raw_size());
92
93        let raw = RawRtpHeader {
94            options: self.options.to_be(),
95            sequence_number: self.sequence_number.to_be(),
96            timestamp: self.timestamp.to_be(),
97            ssrc: self.ssrc.to_be(),
98        };
99
100        let ptr = &raw as *const _ as *const u8;
101
102        let data = unsafe { std::slice::from_raw_parts(ptr, std::mem::size_of::<RawRtpHeader>()) };
103
104        buf.extend_from_slice(data);
105
106        for csrc in &self.csrcs {
107            buf.put_u32(*csrc);
108        }
109
110        if let Some(extension) = self.extension.as_ref() {
111            extension.encode(buf);
112        }
113    }
114
115    /// Check if the RTP packet contains any padding.
116    #[inline]
117    pub fn padding(&self) -> bool {
118        (self.options & 0x2000) != 0
119    }
120
121    /// Set the padding bit.
122    #[inline]
123    pub fn with_padding(mut self, padding: bool) -> Self {
124        self.options &= !0x2000;
125        self.options |= (padding as u16) << 13;
126        self
127    }
128
129    /// Check if there is an RTP header extension.
130    #[inline]
131    pub fn extension(&self) -> Option<&RtpHeaderExtension> {
132        self.extension.as_ref()
133    }
134
135    /// Set the extension bit.
136    #[inline]
137    pub fn with_extension(mut self, extension: Option<RtpHeaderExtension>) -> Self {
138        self.options &= !0x1000;
139        self.options |= (extension.is_some() as u16) << 12;
140        self.extension = extension;
141        self
142    }
143
144    /// Check if the RTP marker bit is set.
145    #[inline]
146    pub fn marker(&self) -> bool {
147        (self.options & 0x0080) != 0
148    }
149
150    /// Set the marker bit.
151    #[inline]
152    pub fn with_marker(mut self, marker: bool) -> Self {
153        self.options &= !0x0080;
154        self.options |= (marker as u16) << 7;
155        self
156    }
157
158    /// Get RTP payload type.
159    ///
160    /// Note: Only the lower 7 bits are used.
161    #[inline]
162    pub fn payload_type(&self) -> u8 {
163        (self.options & 0x7f) as u8
164    }
165
166    /// Set the payload type.
167    ///
168    /// # Panics
169    /// The method panics if the payload type is greater than 127.
170    #[inline]
171    pub fn with_payload_type(mut self, payload_type: u8) -> Self {
172        assert!(payload_type < 128);
173
174        self.options &= !0x7f;
175        self.options |= (payload_type & 0x7f) as u16;
176        self
177    }
178
179    /// Get RTP sequence number.
180    #[inline]
181    pub fn sequence_number(&self) -> u16 {
182        self.sequence_number
183    }
184
185    /// Set the sequence number.
186    #[inline]
187    pub fn with_sequence_number(mut self, n: u16) -> Self {
188        self.sequence_number = n;
189        self
190    }
191
192    /// Get RTP timestamp.
193    #[inline]
194    pub fn timestamp(&self) -> u32 {
195        self.timestamp
196    }
197
198    /// Set RTP timestamp.
199    #[inline]
200    pub fn with_timestamp(mut self, timestamp: u32) -> Self {
201        self.timestamp = timestamp;
202        self
203    }
204
205    /// Get the SSRC identifier.
206    #[inline]
207    pub fn ssrc(&self) -> u32 {
208        self.ssrc
209    }
210
211    /// Set the SSRC identifier.
212    #[inline]
213    pub fn with_ssrc(mut self, ssrc: u32) -> Self {
214        self.ssrc = ssrc;
215        self
216    }
217
218    /// Get a list of CSRC identifiers.
219    #[inline]
220    pub fn csrcs(&self) -> &[u32] {
221        &self.csrcs
222    }
223
224    /// Set the CSRC identifiers.
225    ///
226    /// # Panics
227    /// The method panics if the number of identifiers is greater than 255.
228    pub fn with_csrcs<T>(mut self, csrcs: T) -> Self
229    where
230        T: Into<Vec<u32>>,
231    {
232        let csrcs = csrcs.into();
233
234        assert!(csrcs.len() <= 0xf);
235
236        self.csrcs = csrcs;
237        self.options &= !0xf00;
238        self.options |= (self.csrcs.len() as u16) << 8;
239        self
240    }
241
242    /// Get raw size of the header (i.e. byte length of the encoded header).
243    pub fn raw_size(&self) -> usize {
244        std::mem::size_of::<RawRtpHeader>()
245            + (self.csrcs.len() << 2)
246            + self.extension.as_ref().map(|e| e.raw_size()).unwrap_or(0)
247    }
248}
249
250impl Default for RtpHeader {
251    #[inline]
252    fn default() -> Self {
253        Self::new()
254    }
255}
256
257/// Helper struct.
258#[repr(C, packed)]
259struct RawHeaderExtension {
260    misc: u16,
261    length: u16,
262}
263
264/// RTP header extension.
265#[derive(Clone)]
266pub struct RtpHeaderExtension {
267    misc: u16,
268    data: Bytes,
269}
270
271impl RtpHeaderExtension {
272    /// Create a new header extension.
273    #[inline]
274    pub const fn new() -> Self {
275        Self {
276            misc: 0,
277            data: Bytes::new(),
278        }
279    }
280
281    /// Decode RTP header extension from given data.
282    pub fn decode(data: &mut Bytes) -> Result<Self, InvalidInput> {
283        let mut buffer = data.clone();
284
285        if buffer.len() < std::mem::size_of::<RawHeaderExtension>() {
286            return Err(InvalidInput::new());
287        }
288
289        let ptr = buffer.as_ptr() as *const RawHeaderExtension;
290
291        let raw = unsafe { ptr.read_unaligned() };
292
293        let extension_length = (u16::from_be(raw.length) as usize) << 2;
294        let misc = u16::from_be(raw.misc);
295
296        buffer.advance(std::mem::size_of::<RawHeaderExtension>());
297
298        if buffer.len() < extension_length {
299            return Err(InvalidInput::new());
300        }
301
302        let res = Self {
303            misc,
304            data: buffer.split_to(extension_length),
305        };
306
307        *data = buffer;
308
309        Ok(res)
310    }
311
312    /// Encode the header extension.
313    pub fn encode(&self, buf: &mut BytesMut) {
314        buf.reserve(self.raw_size());
315
316        let length = (self.data.len() >> 2) as u16;
317
318        let raw = RawHeaderExtension {
319            misc: self.misc.to_be(),
320            length: length.to_be(),
321        };
322
323        let ptr = &raw as *const _ as *const u8;
324
325        let header =
326            unsafe { std::slice::from_raw_parts(ptr, std::mem::size_of::<RawHeaderExtension>()) };
327
328        buf.extend_from_slice(header);
329        buf.extend_from_slice(&self.data);
330    }
331
332    /// Get the first 16 bits of the header extension.
333    #[inline]
334    pub fn misc(&self) -> u16 {
335        self.misc
336    }
337
338    /// Set the first 16 bits of the header extension.
339    #[inline]
340    pub fn with_misc(mut self, misc: u16) -> Self {
341        self.misc = misc;
342        self
343    }
344
345    /// Get header extension data.
346    #[inline]
347    pub fn data(&self) -> &Bytes {
348        &self.data
349    }
350
351    /// Set the extension data.
352    ///
353    /// # Panics
354    /// The method panics if the length of the data is not a multiple of four
355    /// or if the length is greater than 262140.
356    #[inline]
357    pub fn with_data(mut self, data: Bytes) -> Self {
358        assert_eq!(data.len() & 3, 0);
359
360        let words = data.len() >> 2;
361
362        assert!(words <= (u16::MAX as usize));
363
364        self.data = data;
365        self
366    }
367
368    /// Get raw size of the header extension (i.e. byte length of the encoded
369    /// header extension).
370    #[inline]
371    pub fn raw_size(&self) -> usize {
372        std::mem::size_of::<RawHeaderExtension>() + self.data.len()
373    }
374}
375
376impl Default for RtpHeaderExtension {
377    #[inline]
378    fn default() -> Self {
379        Self::new()
380    }
381}
382
383/// RTP packet.
384#[derive(Clone)]
385pub struct RtpPacket {
386    header: RtpHeader,
387    payload: Bytes,
388}
389
390impl RtpPacket {
391    /// Create a new RTP packet.
392    #[inline]
393    pub const fn new() -> Self {
394        Self {
395            header: RtpHeader::new(),
396            payload: Bytes::new(),
397        }
398    }
399
400    /// Create a new RTP packets from given parts.
401    pub fn from_parts(header: RtpHeader, payload: Bytes) -> Result<Self, InvalidInput> {
402        if header.padding() {
403            let padding_len = payload.last().copied().ok_or_else(InvalidInput::new)? as usize;
404
405            if padding_len == 0 || payload.len() < padding_len {
406                return Err(InvalidInput::new());
407            }
408        }
409
410        let res = Self { header, payload };
411
412        Ok(res)
413    }
414
415    /// Deconstruct the packet into its parts.
416    #[inline]
417    pub fn deconstruct(self) -> (RtpHeader, Bytes) {
418        (self.header, self.payload)
419    }
420
421    /// Decode RTP packet from given data frame.
422    pub fn decode(mut frame: Bytes) -> Result<Self, InvalidInput> {
423        let header = RtpHeader::decode(&mut frame)?;
424
425        let payload = frame;
426
427        Self::from_parts(header, payload)
428    }
429
430    /// Encode the packet.
431    pub fn encode(&self, buf: &mut BytesMut) {
432        buf.reserve(self.raw_size());
433
434        self.header.encode(buf);
435
436        buf.extend_from_slice(&self.payload);
437    }
438
439    /// Get the RTP header.
440    #[inline]
441    pub fn header(&self) -> &RtpHeader {
442        &self.header
443    }
444
445    /// Get the marker bit value.
446    #[inline]
447    pub fn marker(&self) -> bool {
448        self.header.marker()
449    }
450
451    /// Set the marker bit.
452    #[inline]
453    pub fn with_marker(mut self, marker: bool) -> Self {
454        self.header = self.header.with_marker(marker);
455        self
456    }
457
458    /// Get the payload type.
459    ///
460    /// Note: Only the lower 7 bits are used.
461    #[inline]
462    pub fn payload_type(&self) -> u8 {
463        self.header.payload_type()
464    }
465
466    /// Set the payload type.
467    ///
468    /// # Panics
469    /// The method panics if the payload type is greater than 127.
470    #[inline]
471    pub fn with_payload_type(mut self, payload_type: u8) -> Self {
472        self.header = self.header.with_payload_type(payload_type);
473        self
474    }
475
476    /// Get the RTP sequence number.
477    #[inline]
478    pub fn sequence_number(&self) -> u16 {
479        self.header.sequence_number()
480    }
481
482    /// Set the RTP sequence number.
483    #[inline]
484    pub fn with_sequence_number(mut self, sequence_number: u16) -> Self {
485        self.header = self.header.with_sequence_number(sequence_number);
486        self
487    }
488
489    /// Get the RTP timestamp.
490    #[inline]
491    pub fn timestamp(&self) -> u32 {
492        self.header.timestamp()
493    }
494
495    /// Set the RTP timestamp.
496    #[inline]
497    pub fn with_timestamp(mut self, timestamp: u32) -> Self {
498        self.header = self.header.with_timestamp(timestamp);
499        self
500    }
501
502    /// Get the SSRC identifier.
503    #[inline]
504    pub fn ssrc(&self) -> u32 {
505        self.header.ssrc()
506    }
507
508    /// Set the SSRC identifier.
509    #[inline]
510    pub fn with_ssrc(mut self, ssrc: u32) -> Self {
511        self.header = self.header.with_ssrc(ssrc);
512        self
513    }
514
515    /// Get the CSRC identifiers.
516    #[inline]
517    pub fn csrcs(&self) -> &[u32] {
518        self.header.csrcs()
519    }
520
521    /// Set the CSRC identifiers.
522    ///
523    /// # Panics
524    /// The method panics if the number of identifiers is greater than 255.
525    pub fn with_csrcs<T>(mut self, csrcs: T) -> Self
526    where
527        T: Into<Vec<u32>>,
528    {
529        self.header = self.header.with_csrcs(csrcs);
530        self
531    }
532
533    /// Get length of the optional padding.
534    ///
535    /// Zero means that the padding is not used at all.
536    #[inline]
537    pub fn padding(&self) -> u8 {
538        if self.header.padding() {
539            *self.payload.last().unwrap()
540        } else {
541            0
542        }
543    }
544
545    /// Get the packet payload including the optional padding.
546    #[inline]
547    pub fn payload(&self) -> &Bytes {
548        &self.payload
549    }
550
551    /// Get the packet payload without any padding.
552    #[inline]
553    pub fn stripped_payload(&self) -> Bytes {
554        let payload_len = self.payload.len();
555        let padding_len = self.padding() as usize;
556
557        let len = payload_len - padding_len;
558
559        self.payload.slice(..len)
560    }
561
562    /// Set the payload and add padding of a given length.
563    ///
564    /// If the padding is zero, no padding will be added and the padding bit in
565    /// the RTP header will be set to zero.
566    pub fn with_payload(mut self, payload: Bytes, padding: u8) -> Self {
567        if padding > 0 {
568            let len = payload.len() + (padding as usize);
569
570            let mut buffer = BytesMut::with_capacity(len);
571
572            buffer.extend_from_slice(&payload);
573            buffer.resize(len, 0);
574
575            buffer[len - 1] = padding;
576
577            self.header = self.header.with_padding(true);
578            self.payload = buffer.freeze();
579        } else {
580            self.header = self.header.with_padding(false);
581            self.payload = payload;
582        }
583
584        self
585    }
586
587    /// Set the payload that already includes padding.
588    ///
589    /// # Panics
590    /// The method panics if the given payload is empty, if the last byte is
591    /// zero or if the length of the padding is greater than the length of the
592    /// payload.
593    pub fn with_padded_payload(mut self, payload: Bytes) -> Self {
594        let padding_len = payload.last().copied().expect("empty payload") as usize;
595
596        assert!(padding_len > 0 && payload.len() >= padding_len);
597
598        self.header = self.header.with_padding(true);
599        self.payload = payload;
600        self
601    }
602
603    /// Get raw size of the packet (i.e. byte length of the encoded packet).
604    #[inline]
605    pub fn raw_size(&self) -> usize {
606        self.header.raw_size() + self.payload.len()
607    }
608}
609
610impl Default for RtpPacket {
611    #[inline]
612    fn default() -> Self {
613        Self::new()
614    }
615}
616
617/// RTP packet wrapper containing also the instant when the packet was
618/// received.
619#[derive(Clone)]
620pub struct IncomingRtpPacket {
621    inner: RtpPacket,
622    received_at: Instant,
623}
624
625impl IncomingRtpPacket {
626    /// Create a new incoming RTP packet.
627    #[inline]
628    pub const fn new(packet: RtpPacket, received_at: Instant) -> Self {
629        Self {
630            inner: packet,
631            received_at,
632        }
633    }
634
635    /// Get the instant when the packet was received.
636    #[inline]
637    pub fn received_at(&self) -> Instant {
638        self.received_at
639    }
640}
641
642impl AsRef<RtpPacket> for IncomingRtpPacket {
643    #[inline]
644    fn as_ref(&self) -> &RtpPacket {
645        &self.inner
646    }
647}
648
649impl Borrow<RtpPacket> for IncomingRtpPacket {
650    #[inline]
651    fn borrow(&self) -> &RtpPacket {
652        &self.inner
653    }
654}
655
656impl Deref for IncomingRtpPacket {
657    type Target = RtpPacket;
658
659    #[inline]
660    fn deref(&self) -> &Self::Target {
661        &self.inner
662    }
663}
664
665impl From<IncomingRtpPacket> for RtpPacket {
666    #[inline]
667    fn from(packet: IncomingRtpPacket) -> Self {
668        packet.inner
669    }
670}
671
672/// Ordered RTP packet.
673#[derive(Clone)]
674pub struct OrderedRtpPacket {
675    inner: IncomingRtpPacket,
676    index: u64,
677}
678
679impl OrderedRtpPacket {
680    /// Create a new ordered RTP packet.
681    #[inline]
682    pub const fn new(inner: IncomingRtpPacket, index: u64) -> Self {
683        Self { inner, index }
684    }
685
686    /// Get the estimated packet index (a.k.a. extended sequence number).
687    #[inline]
688    pub fn index(&self) -> u64 {
689        self.index
690    }
691}
692
693impl AsRef<RtpPacket> for OrderedRtpPacket {
694    #[inline]
695    fn as_ref(&self) -> &RtpPacket {
696        &self.inner
697    }
698}
699
700impl AsRef<IncomingRtpPacket> for OrderedRtpPacket {
701    #[inline]
702    fn as_ref(&self) -> &IncomingRtpPacket {
703        &self.inner
704    }
705}
706
707impl Borrow<RtpPacket> for OrderedRtpPacket {
708    #[inline]
709    fn borrow(&self) -> &RtpPacket {
710        &self.inner
711    }
712}
713
714impl Borrow<IncomingRtpPacket> for OrderedRtpPacket {
715    #[inline]
716    fn borrow(&self) -> &IncomingRtpPacket {
717        &self.inner
718    }
719}
720
721impl Deref for OrderedRtpPacket {
722    type Target = IncomingRtpPacket;
723
724    #[inline]
725    fn deref(&self) -> &Self::Target {
726        &self.inner
727    }
728}
729
730impl From<OrderedRtpPacket> for RtpPacket {
731    #[inline]
732    fn from(packet: OrderedRtpPacket) -> Self {
733        packet.inner.into()
734    }
735}
736
737impl From<OrderedRtpPacket> for IncomingRtpPacket {
738    #[inline]
739    fn from(packet: OrderedRtpPacket) -> Self {
740        packet.inner
741    }
742}