Skip to main content

ezk_rtp/
packetizer.rs

1use crate::{Payloadable, Payloader, Rtp, RtpConfig, RtpConfigRange, RtpPacket};
2use ezk::{ConfigRange, Frame, NextEventIsCancelSafe, Result, Source, SourceEvent, ValueRange};
3use std::collections::VecDeque;
4
5pub struct Packetizer<S: Source<MediaType: Payloadable>> {
6    source: S,
7    mtu: usize,
8    stream: Option<Stream<S::MediaType>>,
9}
10
11impl<S: Source<MediaType: Payloadable> + NextEventIsCancelSafe> NextEventIsCancelSafe
12    for Packetizer<S>
13{
14}
15
16struct Stream<M: Payloadable> {
17    config: RtpConfig,
18    sequence_number: u16,
19
20    queue: VecDeque<RtpPacket>,
21    payloader: M::Payloader,
22}
23
24impl<S> Packetizer<S>
25where
26    S: Source<MediaType: Payloadable>,
27{
28    pub fn new(source: S) -> Self {
29        Self {
30            source,
31            mtu: 1400,
32            stream: None,
33        }
34    }
35
36    pub fn with_mtu(mut self, mtu: usize) -> Self {
37        self.mtu = mtu;
38        self
39    }
40}
41
42impl<S> Source for Packetizer<S>
43where
44    S: Source<MediaType: Payloadable>,
45{
46    type MediaType = Rtp;
47
48    async fn capabilities(&mut self) -> Result<Vec<RtpConfigRange>> {
49        let _capabilities = self.source.capabilities().await?;
50
51        if let Some(static_pt) = S::MediaType::STATIC_PT {
52            Ok(vec![RtpConfigRange {
53                pt: ValueRange::Value(static_pt),
54            }])
55        } else {
56            Ok(vec![RtpConfigRange {
57                pt: ValueRange::range(96, 127),
58            }])
59        }
60    }
61
62    async fn negotiate_config(&mut self, available: Vec<RtpConfigRange>) -> Result<RtpConfig> {
63        let config_ = self
64            .source
65            .negotiate_config(vec![ConfigRange::any()])
66            .await?;
67
68        let pt = available[0].pt.first_value();
69
70        let config = RtpConfig { pt };
71
72        self.stream = Some(Stream {
73            config,
74            sequence_number: rand::random(),
75            queue: VecDeque::new(),
76            payloader: S::MediaType::make_payloader(config_),
77        });
78
79        Ok(config)
80    }
81
82    async fn next_event(&mut self) -> Result<SourceEvent<Self::MediaType>> {
83        let Some(stream) = &mut self.stream else {
84            return Ok(SourceEvent::RenegotiationNeeded);
85        };
86
87        loop {
88            if let Some(packet) = stream.queue.pop_front() {
89                let timestamp = packet.get().timestamp();
90
91                return Ok(SourceEvent::Frame(Frame::new(packet, timestamp as u64)));
92            }
93
94            let frame = match self.source.next_event().await? {
95                SourceEvent::Frame(frame) => frame,
96                SourceEvent::EndOfData => return Ok(SourceEvent::EndOfData),
97                SourceEvent::RenegotiationNeeded => return Ok(SourceEvent::RenegotiationNeeded),
98            };
99
100            let timestamp = (frame.timestamp & u64::from(u32::MAX)) as u32;
101
102            for payload in stream.payloader.payload(frame, self.mtu) {
103                stream.sequence_number = stream.sequence_number.wrapping_add(1);
104
105                let packet = RtpPacket::new(
106                    &rtp_types::RtpPacketBuilder::new()
107                        .sequence_number(stream.sequence_number)
108                        .timestamp(timestamp)
109                        .payload_type(stream.config.pt)
110                        .payload(&payload),
111                );
112
113                stream.queue.push_back(packet);
114            }
115        }
116    }
117}