1use std::{borrow::Borrow, ops::Deref, time::Instant};
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use zerocopy::{
5 byteorder::network_endian::{U16, U32},
6 FromBytes, Immutable, IntoBytes, KnownLayout, SizeError, Unaligned,
7};
8
9use crate::InvalidInput;
10
11#[derive(Copy, Clone, KnownLayout, Immutable, Unaligned, IntoBytes, FromBytes)]
13#[repr(C)]
14struct RawRtpHeader {
15 options: U16,
16 sequence_number: U16,
17 timestamp: U32,
18 ssrc: U32,
19}
20
21#[derive(Clone)]
23pub struct RtpHeader {
24 options: u16,
25 sequence_number: u16,
26 timestamp: u32,
27 ssrc: u32,
28 csrcs: Vec<u32>,
29 extension: Option<RtpHeaderExtension>,
30}
31
32impl RtpHeader {
33 #[inline]
35 pub const fn new() -> Self {
36 Self {
37 options: 2 << 14,
38 sequence_number: 0,
39 timestamp: 0,
40 ssrc: 0,
41 csrcs: Vec::new(),
42 extension: None,
43 }
44 }
45
46 pub fn decode(data: &mut Bytes) -> Result<Self, InvalidInput> {
48 let mut buffer = data.clone();
49
50 let (raw, _) = RawRtpHeader::ref_from_prefix(&buffer)
51 .map_err(SizeError::from)
52 .map_err(|_| InvalidInput::new())?;
53
54 let mut res = Self {
55 options: raw.options.get(),
56 sequence_number: raw.sequence_number.get(),
57 timestamp: raw.timestamp.get(),
58 ssrc: raw.ssrc.get(),
59 csrcs: Vec::new(),
60 extension: None,
61 };
62
63 buffer.advance(std::mem::size_of::<RawRtpHeader>());
64
65 if (res.options >> 14) != 2 {
66 return Err(InvalidInput::new());
67 }
68
69 let csrc_count = ((res.options >> 8) & 0xf) as usize;
70
71 let (csrcs, _) = <[U32]>::ref_from_prefix_with_elems(&buffer, csrc_count)
72 .map_err(SizeError::from)
73 .map_err(|_| InvalidInput::new())?;
74
75 res.csrcs = csrcs.iter().copied().map(U32::get).collect();
76
77 buffer.advance(csrc_count << 2);
78
79 if (res.options & 0x1000) != 0 {
80 res.extension = Some(RtpHeaderExtension::decode(&mut buffer)?);
81 }
82
83 *data = buffer;
84
85 Ok(res)
86 }
87
88 pub fn encode(&self, buf: &mut BytesMut) {
90 buf.reserve(self.raw_size());
91
92 let raw = RawRtpHeader {
93 options: U16::new(self.options),
94 sequence_number: U16::new(self.sequence_number),
95 timestamp: U32::new(self.timestamp),
96 ssrc: U32::new(self.ssrc),
97 };
98
99 buf.extend_from_slice(raw.as_bytes());
100
101 for csrc in &self.csrcs {
102 buf.put_u32(*csrc);
103 }
104
105 if let Some(extension) = self.extension.as_ref() {
106 extension.encode(buf);
107 }
108 }
109
110 #[inline]
112 pub fn padding(&self) -> bool {
113 (self.options & 0x2000) != 0
114 }
115
116 #[inline]
118 pub fn with_padding(mut self, padding: bool) -> Self {
119 self.options &= !0x2000;
120 self.options |= (padding as u16) << 13;
121 self
122 }
123
124 #[inline]
126 pub fn extension(&self) -> Option<&RtpHeaderExtension> {
127 self.extension.as_ref()
128 }
129
130 #[inline]
132 pub fn with_extension(mut self, extension: Option<RtpHeaderExtension>) -> Self {
133 self.options &= !0x1000;
134 self.options |= (extension.is_some() as u16) << 12;
135 self.extension = extension;
136 self
137 }
138
139 #[inline]
141 pub fn marker(&self) -> bool {
142 (self.options & 0x0080) != 0
143 }
144
145 #[inline]
147 pub fn with_marker(mut self, marker: bool) -> Self {
148 self.options &= !0x0080;
149 self.options |= (marker as u16) << 7;
150 self
151 }
152
153 #[inline]
157 pub fn payload_type(&self) -> u8 {
158 (self.options & 0x7f) as u8
159 }
160
161 #[inline]
166 pub fn with_payload_type(mut self, payload_type: u8) -> Self {
167 assert!(payload_type < 128);
168
169 self.options &= !0x7f;
170 self.options |= (payload_type & 0x7f) as u16;
171 self
172 }
173
174 #[inline]
176 pub fn sequence_number(&self) -> u16 {
177 self.sequence_number
178 }
179
180 #[inline]
182 pub fn with_sequence_number(mut self, n: u16) -> Self {
183 self.sequence_number = n;
184 self
185 }
186
187 #[inline]
189 pub fn timestamp(&self) -> u32 {
190 self.timestamp
191 }
192
193 #[inline]
195 pub fn with_timestamp(mut self, timestamp: u32) -> Self {
196 self.timestamp = timestamp;
197 self
198 }
199
200 #[inline]
202 pub fn ssrc(&self) -> u32 {
203 self.ssrc
204 }
205
206 #[inline]
208 pub fn with_ssrc(mut self, ssrc: u32) -> Self {
209 self.ssrc = ssrc;
210 self
211 }
212
213 #[inline]
215 pub fn csrcs(&self) -> &[u32] {
216 &self.csrcs
217 }
218
219 pub fn with_csrcs<T>(mut self, csrcs: T) -> Self
224 where
225 T: Into<Vec<u32>>,
226 {
227 let csrcs = csrcs.into();
228
229 assert!(csrcs.len() <= 0xf);
230
231 self.csrcs = csrcs;
232 self.options &= !0xf00;
233 self.options |= (self.csrcs.len() as u16) << 8;
234 self
235 }
236
237 pub fn raw_size(&self) -> usize {
239 std::mem::size_of::<RawRtpHeader>()
240 + (self.csrcs.len() << 2)
241 + self.extension.as_ref().map(|e| e.raw_size()).unwrap_or(0)
242 }
243}
244
245impl Default for RtpHeader {
246 #[inline]
247 fn default() -> Self {
248 Self::new()
249 }
250}
251
252#[derive(Copy, Clone, KnownLayout, Immutable, Unaligned, IntoBytes, FromBytes)]
254#[repr(C)]
255struct RawHeaderExtension {
256 misc: U16,
257 length: U16,
258}
259
260#[derive(Clone)]
262pub struct RtpHeaderExtension {
263 misc: u16,
264 data: Bytes,
265}
266
267impl RtpHeaderExtension {
268 #[inline]
270 pub const fn new() -> Self {
271 Self {
272 misc: 0,
273 data: Bytes::new(),
274 }
275 }
276
277 pub fn decode(data: &mut Bytes) -> Result<Self, InvalidInput> {
279 let mut buffer = data.clone();
280
281 let (raw, _) = RawHeaderExtension::ref_from_prefix(&buffer)
282 .map_err(SizeError::from)
283 .map_err(|_| InvalidInput::new())?;
284
285 let extension_length = (raw.length.get() as usize) << 2;
286 let misc = raw.misc.get();
287
288 buffer.advance(std::mem::size_of::<RawHeaderExtension>());
289
290 if buffer.len() < extension_length {
291 return Err(InvalidInput::new());
292 }
293
294 let res = Self {
295 misc,
296 data: buffer.split_to(extension_length),
297 };
298
299 *data = buffer;
300
301 Ok(res)
302 }
303
304 pub fn encode(&self, buf: &mut BytesMut) {
306 buf.reserve(self.raw_size());
307
308 let length = (self.data.len() >> 2) as u16;
309
310 let raw = RawHeaderExtension {
311 misc: U16::new(self.misc),
312 length: U16::new(length),
313 };
314
315 buf.extend_from_slice(raw.as_bytes());
316 buf.extend_from_slice(&self.data);
317 }
318
319 #[inline]
321 pub fn misc(&self) -> u16 {
322 self.misc
323 }
324
325 #[inline]
327 pub fn with_misc(mut self, misc: u16) -> Self {
328 self.misc = misc;
329 self
330 }
331
332 #[inline]
334 pub fn data(&self) -> &Bytes {
335 &self.data
336 }
337
338 #[inline]
344 pub fn with_data(mut self, data: Bytes) -> Self {
345 assert_eq!(data.len() & 3, 0);
346
347 let words = data.len() >> 2;
348
349 assert!(words <= (u16::MAX as usize));
350
351 self.data = data;
352 self
353 }
354
355 #[inline]
358 pub fn raw_size(&self) -> usize {
359 std::mem::size_of::<RawHeaderExtension>() + self.data.len()
360 }
361}
362
363impl Default for RtpHeaderExtension {
364 #[inline]
365 fn default() -> Self {
366 Self::new()
367 }
368}
369
370#[derive(Clone)]
372pub struct RtpPacket {
373 header: RtpHeader,
374 payload: Bytes,
375}
376
377impl RtpPacket {
378 #[inline]
380 pub const fn new() -> Self {
381 Self {
382 header: RtpHeader::new(),
383 payload: Bytes::new(),
384 }
385 }
386
387 pub fn from_parts(header: RtpHeader, payload: Bytes) -> Result<Self, InvalidInput> {
389 if header.padding() {
390 let padding_len = payload.last().copied().ok_or_else(InvalidInput::new)? as usize;
391
392 if padding_len == 0 || payload.len() < padding_len {
393 return Err(InvalidInput::new());
394 }
395 }
396
397 let res = Self { header, payload };
398
399 Ok(res)
400 }
401
402 #[inline]
404 pub fn deconstruct(self) -> (RtpHeader, Bytes) {
405 (self.header, self.payload)
406 }
407
408 pub fn decode(mut frame: Bytes) -> Result<Self, InvalidInput> {
410 let header = RtpHeader::decode(&mut frame)?;
411
412 let payload = frame;
413
414 Self::from_parts(header, payload)
415 }
416
417 pub fn encode(&self, buf: &mut BytesMut) {
419 buf.reserve(self.raw_size());
420
421 self.header.encode(buf);
422
423 buf.extend_from_slice(&self.payload);
424 }
425
426 #[inline]
428 pub fn header(&self) -> &RtpHeader {
429 &self.header
430 }
431
432 #[inline]
434 pub fn marker(&self) -> bool {
435 self.header.marker()
436 }
437
438 #[inline]
440 pub fn with_marker(mut self, marker: bool) -> Self {
441 self.header = self.header.with_marker(marker);
442 self
443 }
444
445 #[inline]
449 pub fn payload_type(&self) -> u8 {
450 self.header.payload_type()
451 }
452
453 #[inline]
458 pub fn with_payload_type(mut self, payload_type: u8) -> Self {
459 self.header = self.header.with_payload_type(payload_type);
460 self
461 }
462
463 #[inline]
465 pub fn sequence_number(&self) -> u16 {
466 self.header.sequence_number()
467 }
468
469 #[inline]
471 pub fn with_sequence_number(mut self, sequence_number: u16) -> Self {
472 self.header = self.header.with_sequence_number(sequence_number);
473 self
474 }
475
476 #[inline]
478 pub fn timestamp(&self) -> u32 {
479 self.header.timestamp()
480 }
481
482 #[inline]
484 pub fn with_timestamp(mut self, timestamp: u32) -> Self {
485 self.header = self.header.with_timestamp(timestamp);
486 self
487 }
488
489 #[inline]
491 pub fn ssrc(&self) -> u32 {
492 self.header.ssrc()
493 }
494
495 #[inline]
497 pub fn with_ssrc(mut self, ssrc: u32) -> Self {
498 self.header = self.header.with_ssrc(ssrc);
499 self
500 }
501
502 #[inline]
504 pub fn csrcs(&self) -> &[u32] {
505 self.header.csrcs()
506 }
507
508 pub fn with_csrcs<T>(mut self, csrcs: T) -> Self
513 where
514 T: Into<Vec<u32>>,
515 {
516 self.header = self.header.with_csrcs(csrcs);
517 self
518 }
519
520 #[inline]
524 pub fn padding(&self) -> u8 {
525 if self.header.padding() {
526 *self.payload.last().unwrap()
527 } else {
528 0
529 }
530 }
531
532 #[inline]
534 pub fn payload(&self) -> &Bytes {
535 &self.payload
536 }
537
538 #[inline]
540 pub fn stripped_payload(&self) -> Bytes {
541 let payload_len = self.payload.len();
542 let padding_len = self.padding() as usize;
543
544 let len = payload_len - padding_len;
545
546 self.payload.slice(..len)
547 }
548
549 pub fn with_payload(mut self, payload: Bytes, padding: u8) -> Self {
554 if padding > 0 {
555 let len = payload.len() + (padding as usize);
556
557 let mut buffer = BytesMut::with_capacity(len);
558
559 buffer.extend_from_slice(&payload);
560 buffer.resize(len, 0);
561
562 buffer[len - 1] = padding;
563
564 self.header = self.header.with_padding(true);
565 self.payload = buffer.freeze();
566 } else {
567 self.header = self.header.with_padding(false);
568 self.payload = payload;
569 }
570
571 self
572 }
573
574 pub fn with_padded_payload(mut self, payload: Bytes) -> Self {
581 let padding_len = payload.last().copied().expect("empty payload") as usize;
582
583 assert!(padding_len > 0 && payload.len() >= padding_len);
584
585 self.header = self.header.with_padding(true);
586 self.payload = payload;
587 self
588 }
589
590 #[inline]
592 pub fn raw_size(&self) -> usize {
593 self.header.raw_size() + self.payload.len()
594 }
595}
596
597impl Default for RtpPacket {
598 #[inline]
599 fn default() -> Self {
600 Self::new()
601 }
602}
603
604#[derive(Clone)]
607pub struct IncomingRtpPacket {
608 inner: RtpPacket,
609 received_at: Instant,
610}
611
612impl IncomingRtpPacket {
613 #[inline]
615 pub const fn new(packet: RtpPacket, received_at: Instant) -> Self {
616 Self {
617 inner: packet,
618 received_at,
619 }
620 }
621
622 #[inline]
624 pub fn received_at(&self) -> Instant {
625 self.received_at
626 }
627}
628
629impl AsRef<RtpPacket> for IncomingRtpPacket {
630 #[inline]
631 fn as_ref(&self) -> &RtpPacket {
632 &self.inner
633 }
634}
635
636impl Borrow<RtpPacket> for IncomingRtpPacket {
637 #[inline]
638 fn borrow(&self) -> &RtpPacket {
639 &self.inner
640 }
641}
642
643impl Deref for IncomingRtpPacket {
644 type Target = RtpPacket;
645
646 #[inline]
647 fn deref(&self) -> &Self::Target {
648 &self.inner
649 }
650}
651
652impl From<IncomingRtpPacket> for RtpPacket {
653 #[inline]
654 fn from(packet: IncomingRtpPacket) -> Self {
655 packet.inner
656 }
657}
658
659#[derive(Clone)]
661pub struct OrderedRtpPacket {
662 inner: IncomingRtpPacket,
663 index: u64,
664}
665
666impl OrderedRtpPacket {
667 #[inline]
669 pub const fn new(inner: IncomingRtpPacket, index: u64) -> Self {
670 Self { inner, index }
671 }
672
673 #[inline]
675 pub fn index(&self) -> u64 {
676 self.index
677 }
678}
679
680impl AsRef<RtpPacket> for OrderedRtpPacket {
681 #[inline]
682 fn as_ref(&self) -> &RtpPacket {
683 &self.inner
684 }
685}
686
687impl AsRef<IncomingRtpPacket> for OrderedRtpPacket {
688 #[inline]
689 fn as_ref(&self) -> &IncomingRtpPacket {
690 &self.inner
691 }
692}
693
694impl Borrow<RtpPacket> for OrderedRtpPacket {
695 #[inline]
696 fn borrow(&self) -> &RtpPacket {
697 &self.inner
698 }
699}
700
701impl Borrow<IncomingRtpPacket> for OrderedRtpPacket {
702 #[inline]
703 fn borrow(&self) -> &IncomingRtpPacket {
704 &self.inner
705 }
706}
707
708impl Deref for OrderedRtpPacket {
709 type Target = IncomingRtpPacket;
710
711 #[inline]
712 fn deref(&self) -> &Self::Target {
713 &self.inner
714 }
715}
716
717impl From<OrderedRtpPacket> for RtpPacket {
718 #[inline]
719 fn from(packet: OrderedRtpPacket) -> Self {
720 packet.inner.into()
721 }
722}
723
724impl From<OrderedRtpPacket> for IncomingRtpPacket {
725 #[inline]
726 fn from(packet: OrderedRtpPacket) -> Self {
727 packet.inner
728 }
729}