1use crate::{Payloadable, Payloader, Rtp, RtpConfig, RtpConfigRange, RtpPacket};
2use ezk::{ConfigRange, Frame, NextEventIsCancelSafe, Result, Source, SourceEvent, ValueRange};
3use std::collections::VecDeque;
4
5pub struct Packetizer<S: Source<MediaType: Payloadable>> {
6 source: S,
7 mtu: usize,
8 stream: Option<Stream<S::MediaType>>,
9}
10
11impl<S: Source<MediaType: Payloadable> + NextEventIsCancelSafe> NextEventIsCancelSafe
12 for Packetizer<S>
13{
14}
15
16struct Stream<M: Payloadable> {
17 config: RtpConfig,
18 sequence_number: u16,
19
20 queue: VecDeque<RtpPacket>,
21 payloader: M::Payloader,
22}
23
24impl<S> Packetizer<S>
25where
26 S: Source<MediaType: Payloadable>,
27{
28 pub fn new(source: S) -> Self {
29 Self {
30 source,
31 mtu: 1400,
32 stream: None,
33 }
34 }
35
36 pub fn with_mtu(mut self, mtu: usize) -> Self {
37 self.mtu = mtu;
38 self
39 }
40}
41
42impl<S> Source for Packetizer<S>
43where
44 S: Source<MediaType: Payloadable>,
45{
46 type MediaType = Rtp;
47
48 async fn capabilities(&mut self) -> Result<Vec<RtpConfigRange>> {
49 let _capabilities = self.source.capabilities().await?;
50
51 if let Some(static_pt) = S::MediaType::STATIC_PT {
52 Ok(vec![RtpConfigRange {
53 pt: ValueRange::Value(static_pt),
54 }])
55 } else {
56 Ok(vec![RtpConfigRange {
57 pt: ValueRange::range(96, 127),
58 }])
59 }
60 }
61
62 async fn negotiate_config(&mut self, available: Vec<RtpConfigRange>) -> Result<RtpConfig> {
63 let config_ = self
64 .source
65 .negotiate_config(vec![ConfigRange::any()])
66 .await?;
67
68 let pt = available[0].pt.first_value();
69
70 let config = RtpConfig { pt };
71
72 self.stream = Some(Stream {
73 config,
74 sequence_number: rand::random(),
75 queue: VecDeque::new(),
76 payloader: S::MediaType::make_payloader(config_),
77 });
78
79 Ok(config)
80 }
81
82 async fn next_event(&mut self) -> Result<SourceEvent<Self::MediaType>> {
83 let Some(stream) = &mut self.stream else {
84 return Ok(SourceEvent::RenegotiationNeeded);
85 };
86
87 loop {
88 if let Some(packet) = stream.queue.pop_front() {
89 let timestamp = packet.get().timestamp();
90
91 return Ok(SourceEvent::Frame(Frame::new(packet, timestamp as u64)));
92 }
93
94 let frame = match self.source.next_event().await? {
95 SourceEvent::Frame(frame) => frame,
96 SourceEvent::EndOfData => return Ok(SourceEvent::EndOfData),
97 SourceEvent::RenegotiationNeeded => return Ok(SourceEvent::RenegotiationNeeded),
98 };
99
100 let timestamp = (frame.timestamp & u64::from(u32::MAX)) as u32;
101
102 for payload in stream.payloader.payload(frame, self.mtu) {
103 stream.sequence_number = stream.sequence_number.wrapping_add(1);
104
105 let packet = RtpPacket::new(
106 &rtp_types::RtpPacketBuilder::new()
107 .sequence_number(stream.sequence_number)
108 .timestamp(timestamp)
109 .payload_type(stream.config.pt)
110 .payload(&payload),
111 );
112
113 stream.queue.push_back(packet);
114 }
115 }
116 }
117}