1use crate::{DePayloader, Payloadable, Rtp};
2use ezk::{ConfigRange, Frame, NextEventIsCancelSafe, Result, Source, SourceEvent};
3
4pub struct DePacketizer<S: Source<MediaType = Rtp>, M: Payloadable> {
5 source: S,
6
7 stream: Option<Stream<M>>,
8}
9
10impl<S: Source<MediaType = Rtp> + NextEventIsCancelSafe, M: Payloadable> NextEventIsCancelSafe
11 for DePacketizer<S, M>
12{
13}
14
15struct Stream<M: Payloadable> {
16 depayloader: M::DePayloader,
17}
18
19impl<S, M> DePacketizer<S, M>
20where
21 S: Source<MediaType = Rtp>,
22 M: Payloadable,
23{
24 pub fn new(source: S) -> Self {
25 Self {
26 source,
27 stream: None,
28 }
29 }
30}
31
32impl<S, M> Source for DePacketizer<S, M>
33where
34 S: Source<MediaType = Rtp>,
35 M: Payloadable,
36{
37 type MediaType = M;
38
39 async fn capabilities(&mut self) -> Result<Vec<M::ConfigRange>> {
40 let _capabilities = self.source.capabilities().await?;
41
42 Ok(vec![M::ConfigRange::any()])
43 }
44
45 async fn negotiate_config(&mut self, available: Vec<M::ConfigRange>) -> Result<M::Config> {
46 let (config, depayloader) = M::make_depayloader(available);
47
48 self.stream = Some(Stream { depayloader });
49
50 Ok(config)
51 }
52
53 async fn next_event(&mut self) -> Result<SourceEvent<Self::MediaType>> {
54 let Some(stream) = &mut self.stream else {
55 return Ok(SourceEvent::RenegotiationNeeded);
56 };
57
58 let frame = match self.source.next_event().await? {
59 SourceEvent::Frame(frame) => frame,
60 SourceEvent::EndOfData => return Ok(SourceEvent::EndOfData),
61 SourceEvent::RenegotiationNeeded => return Ok(SourceEvent::RenegotiationNeeded),
62 };
63
64 let frame_timestamp = frame.timestamp;
65 let rtp_packet = frame.into_data();
66
67 let data = stream.depayloader.depayload(rtp_packet.get().payload());
68
69 Ok(SourceEvent::Frame(Frame::new(data, frame_timestamp)))
70 }
71}