1use std::{borrow::Borrow, ops::Deref, time::Instant};
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4
5use crate::InvalidInput;
6
7#[repr(C, packed)]
9struct RawRtpHeader {
10 options: u16,
11 sequence_number: u16,
12 timestamp: u32,
13 ssrc: u32,
14}
15
16#[derive(Clone)]
18pub struct RtpHeader {
19 options: u16,
20 sequence_number: u16,
21 timestamp: u32,
22 ssrc: u32,
23 csrcs: Vec<u32>,
24 extension: Option<RtpHeaderExtension>,
25}
26
27impl RtpHeader {
28 #[inline]
30 pub const fn new() -> Self {
31 Self {
32 options: 2 << 14,
33 sequence_number: 0,
34 timestamp: 0,
35 ssrc: 0,
36 csrcs: Vec::new(),
37 extension: None,
38 }
39 }
40
41 pub fn decode(data: &mut Bytes) -> Result<Self, InvalidInput> {
43 let mut buffer = data.clone();
44
45 if buffer.len() < std::mem::size_of::<RawRtpHeader>() {
46 return Err(InvalidInput::new());
47 }
48
49 let ptr = buffer.as_ptr() as *const RawRtpHeader;
50
51 let raw = unsafe { ptr.read_unaligned() };
52
53 let mut res = Self {
54 options: u16::from_be(raw.options),
55 sequence_number: u16::from_be(raw.sequence_number),
56 timestamp: u32::from_be(raw.timestamp),
57 ssrc: u32::from_be(raw.ssrc),
58 csrcs: Vec::new(),
59 extension: None,
60 };
61
62 buffer.advance(std::mem::size_of::<RawRtpHeader>());
63
64 if (res.options >> 14) != 2 {
65 return Err(InvalidInput::new());
66 }
67
68 let csrc_count = ((res.options >> 8) & 0xf) as usize;
69
70 if buffer.len() < (csrc_count << 2) {
71 return Err(InvalidInput::new());
72 }
73
74 res.csrcs = Vec::with_capacity(csrc_count);
75
76 for _ in 0..csrc_count {
77 res.csrcs.push(buffer.get_u32());
78 }
79
80 if (res.options & 0x1000) != 0 {
81 res.extension = Some(RtpHeaderExtension::decode(&mut buffer)?);
82 }
83
84 *data = buffer;
85
86 Ok(res)
87 }
88
89 pub fn encode(&self, buf: &mut BytesMut) {
91 buf.reserve(self.raw_size());
92
93 let raw = RawRtpHeader {
94 options: self.options.to_be(),
95 sequence_number: self.sequence_number.to_be(),
96 timestamp: self.timestamp.to_be(),
97 ssrc: self.ssrc.to_be(),
98 };
99
100 let ptr = &raw as *const _ as *const u8;
101
102 let data = unsafe { std::slice::from_raw_parts(ptr, std::mem::size_of::<RawRtpHeader>()) };
103
104 buf.extend_from_slice(data);
105
106 for csrc in &self.csrcs {
107 buf.put_u32(*csrc);
108 }
109
110 if let Some(extension) = self.extension.as_ref() {
111 extension.encode(buf);
112 }
113 }
114
115 #[inline]
117 pub fn padding(&self) -> bool {
118 (self.options & 0x2000) != 0
119 }
120
121 #[inline]
123 pub fn with_padding(mut self, padding: bool) -> Self {
124 self.options &= !0x2000;
125 self.options |= (padding as u16) << 13;
126 self
127 }
128
129 #[inline]
131 pub fn extension(&self) -> Option<&RtpHeaderExtension> {
132 self.extension.as_ref()
133 }
134
135 #[inline]
137 pub fn with_extension(mut self, extension: Option<RtpHeaderExtension>) -> Self {
138 self.options &= !0x1000;
139 self.options |= (extension.is_some() as u16) << 12;
140 self.extension = extension;
141 self
142 }
143
144 #[inline]
146 pub fn marker(&self) -> bool {
147 (self.options & 0x0080) != 0
148 }
149
150 #[inline]
152 pub fn with_marker(mut self, marker: bool) -> Self {
153 self.options &= !0x0080;
154 self.options |= (marker as u16) << 7;
155 self
156 }
157
158 #[inline]
162 pub fn payload_type(&self) -> u8 {
163 (self.options & 0x7f) as u8
164 }
165
166 #[inline]
171 pub fn with_payload_type(mut self, payload_type: u8) -> Self {
172 assert!(payload_type < 128);
173
174 self.options &= !0x7f;
175 self.options |= (payload_type & 0x7f) as u16;
176 self
177 }
178
179 #[inline]
181 pub fn sequence_number(&self) -> u16 {
182 self.sequence_number
183 }
184
185 #[inline]
187 pub fn with_sequence_number(mut self, n: u16) -> Self {
188 self.sequence_number = n;
189 self
190 }
191
192 #[inline]
194 pub fn timestamp(&self) -> u32 {
195 self.timestamp
196 }
197
198 #[inline]
200 pub fn with_timestamp(mut self, timestamp: u32) -> Self {
201 self.timestamp = timestamp;
202 self
203 }
204
205 #[inline]
207 pub fn ssrc(&self) -> u32 {
208 self.ssrc
209 }
210
211 #[inline]
213 pub fn with_ssrc(mut self, ssrc: u32) -> Self {
214 self.ssrc = ssrc;
215 self
216 }
217
218 #[inline]
220 pub fn csrcs(&self) -> &[u32] {
221 &self.csrcs
222 }
223
224 pub fn with_csrcs<T>(mut self, csrcs: T) -> Self
229 where
230 T: Into<Vec<u32>>,
231 {
232 let csrcs = csrcs.into();
233
234 assert!(csrcs.len() <= 0xf);
235
236 self.csrcs = csrcs;
237 self.options &= !0xf00;
238 self.options |= (self.csrcs.len() as u16) << 8;
239 self
240 }
241
242 pub fn raw_size(&self) -> usize {
244 std::mem::size_of::<RawRtpHeader>()
245 + (self.csrcs.len() << 2)
246 + self.extension.as_ref().map(|e| e.raw_size()).unwrap_or(0)
247 }
248}
249
250impl Default for RtpHeader {
251 #[inline]
252 fn default() -> Self {
253 Self::new()
254 }
255}
256
257#[repr(C, packed)]
259struct RawHeaderExtension {
260 misc: u16,
261 length: u16,
262}
263
264#[derive(Clone)]
266pub struct RtpHeaderExtension {
267 misc: u16,
268 data: Bytes,
269}
270
271impl RtpHeaderExtension {
272 #[inline]
274 pub const fn new() -> Self {
275 Self {
276 misc: 0,
277 data: Bytes::new(),
278 }
279 }
280
281 pub fn decode(data: &mut Bytes) -> Result<Self, InvalidInput> {
283 let mut buffer = data.clone();
284
285 if buffer.len() < std::mem::size_of::<RawHeaderExtension>() {
286 return Err(InvalidInput::new());
287 }
288
289 let ptr = buffer.as_ptr() as *const RawHeaderExtension;
290
291 let raw = unsafe { ptr.read_unaligned() };
292
293 let extension_length = (u16::from_be(raw.length) as usize) << 2;
294 let misc = u16::from_be(raw.misc);
295
296 buffer.advance(std::mem::size_of::<RawHeaderExtension>());
297
298 if buffer.len() < extension_length {
299 return Err(InvalidInput::new());
300 }
301
302 let res = Self {
303 misc,
304 data: buffer.split_to(extension_length),
305 };
306
307 *data = buffer;
308
309 Ok(res)
310 }
311
312 pub fn encode(&self, buf: &mut BytesMut) {
314 buf.reserve(self.raw_size());
315
316 let length = (self.data.len() >> 2) as u16;
317
318 let raw = RawHeaderExtension {
319 misc: self.misc.to_be(),
320 length: length.to_be(),
321 };
322
323 let ptr = &raw as *const _ as *const u8;
324
325 let header =
326 unsafe { std::slice::from_raw_parts(ptr, std::mem::size_of::<RawHeaderExtension>()) };
327
328 buf.extend_from_slice(header);
329 buf.extend_from_slice(&self.data);
330 }
331
332 #[inline]
334 pub fn misc(&self) -> u16 {
335 self.misc
336 }
337
338 #[inline]
340 pub fn with_misc(mut self, misc: u16) -> Self {
341 self.misc = misc;
342 self
343 }
344
345 #[inline]
347 pub fn data(&self) -> &Bytes {
348 &self.data
349 }
350
351 #[inline]
357 pub fn with_data(mut self, data: Bytes) -> Self {
358 assert_eq!(data.len() & 3, 0);
359
360 let words = data.len() >> 2;
361
362 assert!(words <= (u16::MAX as usize));
363
364 self.data = data;
365 self
366 }
367
368 #[inline]
371 pub fn raw_size(&self) -> usize {
372 std::mem::size_of::<RawHeaderExtension>() + self.data.len()
373 }
374}
375
376impl Default for RtpHeaderExtension {
377 #[inline]
378 fn default() -> Self {
379 Self::new()
380 }
381}
382
383#[derive(Clone)]
385pub struct RtpPacket {
386 header: RtpHeader,
387 payload: Bytes,
388}
389
390impl RtpPacket {
391 #[inline]
393 pub const fn new() -> Self {
394 Self {
395 header: RtpHeader::new(),
396 payload: Bytes::new(),
397 }
398 }
399
400 pub fn from_parts(header: RtpHeader, payload: Bytes) -> Result<Self, InvalidInput> {
402 if header.padding() {
403 let padding_len = payload.last().copied().ok_or_else(InvalidInput::new)? as usize;
404
405 if padding_len == 0 || payload.len() < padding_len {
406 return Err(InvalidInput::new());
407 }
408 }
409
410 let res = Self { header, payload };
411
412 Ok(res)
413 }
414
415 #[inline]
417 pub fn deconstruct(self) -> (RtpHeader, Bytes) {
418 (self.header, self.payload)
419 }
420
421 pub fn decode(mut frame: Bytes) -> Result<Self, InvalidInput> {
423 let header = RtpHeader::decode(&mut frame)?;
424
425 let payload = frame;
426
427 Self::from_parts(header, payload)
428 }
429
430 pub fn encode(&self, buf: &mut BytesMut) {
432 buf.reserve(self.raw_size());
433
434 self.header.encode(buf);
435
436 buf.extend_from_slice(&self.payload);
437 }
438
439 #[inline]
441 pub fn header(&self) -> &RtpHeader {
442 &self.header
443 }
444
445 #[inline]
447 pub fn marker(&self) -> bool {
448 self.header.marker()
449 }
450
451 #[inline]
453 pub fn with_marker(mut self, marker: bool) -> Self {
454 self.header = self.header.with_marker(marker);
455 self
456 }
457
458 #[inline]
462 pub fn payload_type(&self) -> u8 {
463 self.header.payload_type()
464 }
465
466 #[inline]
471 pub fn with_payload_type(mut self, payload_type: u8) -> Self {
472 self.header = self.header.with_payload_type(payload_type);
473 self
474 }
475
476 #[inline]
478 pub fn sequence_number(&self) -> u16 {
479 self.header.sequence_number()
480 }
481
482 #[inline]
484 pub fn with_sequence_number(mut self, sequence_number: u16) -> Self {
485 self.header = self.header.with_sequence_number(sequence_number);
486 self
487 }
488
489 #[inline]
491 pub fn timestamp(&self) -> u32 {
492 self.header.timestamp()
493 }
494
495 #[inline]
497 pub fn with_timestamp(mut self, timestamp: u32) -> Self {
498 self.header = self.header.with_timestamp(timestamp);
499 self
500 }
501
502 #[inline]
504 pub fn ssrc(&self) -> u32 {
505 self.header.ssrc()
506 }
507
508 #[inline]
510 pub fn with_ssrc(mut self, ssrc: u32) -> Self {
511 self.header = self.header.with_ssrc(ssrc);
512 self
513 }
514
515 #[inline]
517 pub fn csrcs(&self) -> &[u32] {
518 self.header.csrcs()
519 }
520
521 pub fn with_csrcs<T>(mut self, csrcs: T) -> Self
526 where
527 T: Into<Vec<u32>>,
528 {
529 self.header = self.header.with_csrcs(csrcs);
530 self
531 }
532
533 #[inline]
537 pub fn padding(&self) -> u8 {
538 if self.header.padding() {
539 *self.payload.last().unwrap()
540 } else {
541 0
542 }
543 }
544
545 #[inline]
547 pub fn payload(&self) -> &Bytes {
548 &self.payload
549 }
550
551 #[inline]
553 pub fn stripped_payload(&self) -> Bytes {
554 let payload_len = self.payload.len();
555 let padding_len = self.padding() as usize;
556
557 let len = payload_len - padding_len;
558
559 self.payload.slice(..len)
560 }
561
562 pub fn with_payload(mut self, payload: Bytes, padding: u8) -> Self {
567 if padding > 0 {
568 let len = payload.len() + (padding as usize);
569
570 let mut buffer = BytesMut::with_capacity(len);
571
572 buffer.extend_from_slice(&payload);
573 buffer.resize(len, 0);
574
575 buffer[len - 1] = padding;
576
577 self.header = self.header.with_padding(true);
578 self.payload = buffer.freeze();
579 } else {
580 self.header = self.header.with_padding(false);
581 self.payload = payload;
582 }
583
584 self
585 }
586
587 pub fn with_padded_payload(mut self, payload: Bytes) -> Self {
594 let padding_len = payload.last().copied().expect("empty payload") as usize;
595
596 assert!(padding_len > 0 && payload.len() >= padding_len);
597
598 self.header = self.header.with_padding(true);
599 self.payload = payload;
600 self
601 }
602
603 #[inline]
605 pub fn raw_size(&self) -> usize {
606 self.header.raw_size() + self.payload.len()
607 }
608}
609
610impl Default for RtpPacket {
611 #[inline]
612 fn default() -> Self {
613 Self::new()
614 }
615}
616
617#[derive(Clone)]
620pub struct IncomingRtpPacket {
621 inner: RtpPacket,
622 received_at: Instant,
623}
624
625impl IncomingRtpPacket {
626 #[inline]
628 pub const fn new(packet: RtpPacket, received_at: Instant) -> Self {
629 Self {
630 inner: packet,
631 received_at,
632 }
633 }
634
635 #[inline]
637 pub fn received_at(&self) -> Instant {
638 self.received_at
639 }
640}
641
642impl AsRef<RtpPacket> for IncomingRtpPacket {
643 #[inline]
644 fn as_ref(&self) -> &RtpPacket {
645 &self.inner
646 }
647}
648
649impl Borrow<RtpPacket> for IncomingRtpPacket {
650 #[inline]
651 fn borrow(&self) -> &RtpPacket {
652 &self.inner
653 }
654}
655
656impl Deref for IncomingRtpPacket {
657 type Target = RtpPacket;
658
659 #[inline]
660 fn deref(&self) -> &Self::Target {
661 &self.inner
662 }
663}
664
665impl From<IncomingRtpPacket> for RtpPacket {
666 #[inline]
667 fn from(packet: IncomingRtpPacket) -> Self {
668 packet.inner
669 }
670}
671
672#[derive(Clone)]
674pub struct OrderedRtpPacket {
675 inner: IncomingRtpPacket,
676 index: u64,
677}
678
679impl OrderedRtpPacket {
680 #[inline]
682 pub const fn new(inner: IncomingRtpPacket, index: u64) -> Self {
683 Self { inner, index }
684 }
685
686 #[inline]
688 pub fn index(&self) -> u64 {
689 self.index
690 }
691}
692
693impl AsRef<RtpPacket> for OrderedRtpPacket {
694 #[inline]
695 fn as_ref(&self) -> &RtpPacket {
696 &self.inner
697 }
698}
699
700impl AsRef<IncomingRtpPacket> for OrderedRtpPacket {
701 #[inline]
702 fn as_ref(&self) -> &IncomingRtpPacket {
703 &self.inner
704 }
705}
706
707impl Borrow<RtpPacket> for OrderedRtpPacket {
708 #[inline]
709 fn borrow(&self) -> &RtpPacket {
710 &self.inner
711 }
712}
713
714impl Borrow<IncomingRtpPacket> for OrderedRtpPacket {
715 #[inline]
716 fn borrow(&self) -> &IncomingRtpPacket {
717 &self.inner
718 }
719}
720
721impl Deref for OrderedRtpPacket {
722 type Target = IncomingRtpPacket;
723
724 #[inline]
725 fn deref(&self) -> &Self::Target {
726 &self.inner
727 }
728}
729
730impl From<OrderedRtpPacket> for RtpPacket {
731 #[inline]
732 fn from(packet: OrderedRtpPacket) -> Self {
733 packet.inner.into()
734 }
735}
736
737impl From<OrderedRtpPacket> for IncomingRtpPacket {
738 #[inline]
739 fn from(packet: OrderedRtpPacket) -> Self {
740 packet.inner
741 }
742}