Skip to main content

ezk_rtp/
depacketizer.rs

1use crate::{DePayloader, Payloadable, Rtp};
2use ezk::{ConfigRange, Frame, NextEventIsCancelSafe, Result, Source, SourceEvent};
3
4pub struct DePacketizer<S: Source<MediaType = Rtp>, M: Payloadable> {
5    source: S,
6
7    stream: Option<Stream<M>>,
8}
9
10impl<S: Source<MediaType = Rtp> + NextEventIsCancelSafe, M: Payloadable> NextEventIsCancelSafe
11    for DePacketizer<S, M>
12{
13}
14
15struct Stream<M: Payloadable> {
16    depayloader: M::DePayloader,
17}
18
19impl<S, M> DePacketizer<S, M>
20where
21    S: Source<MediaType = Rtp>,
22    M: Payloadable,
23{
24    pub fn new(source: S) -> Self {
25        Self {
26            source,
27            stream: None,
28        }
29    }
30}
31
32impl<S, M> Source for DePacketizer<S, M>
33where
34    S: Source<MediaType = Rtp>,
35    M: Payloadable,
36{
37    type MediaType = M;
38
39    async fn capabilities(&mut self) -> Result<Vec<M::ConfigRange>> {
40        let _capabilities = self.source.capabilities().await?;
41
42        Ok(vec![M::ConfigRange::any()])
43    }
44
45    async fn negotiate_config(&mut self, available: Vec<M::ConfigRange>) -> Result<M::Config> {
46        let (config, depayloader) = M::make_depayloader(available);
47
48        self.stream = Some(Stream { depayloader });
49
50        Ok(config)
51    }
52
53    async fn next_event(&mut self) -> Result<SourceEvent<Self::MediaType>> {
54        let Some(stream) = &mut self.stream else {
55            return Ok(SourceEvent::RenegotiationNeeded);
56        };
57
58        let frame = match self.source.next_event().await? {
59            SourceEvent::Frame(frame) => frame,
60            SourceEvent::EndOfData => return Ok(SourceEvent::EndOfData),
61            SourceEvent::RenegotiationNeeded => return Ok(SourceEvent::RenegotiationNeeded),
62        };
63
64        let frame_timestamp = frame.timestamp;
65        let rtp_packet = frame.into_data();
66
67        let data = stream.depayloader.depayload(rtp_packet.get().payload());
68
69        Ok(SourceEvent::Frame(Frame::new(data, frame_timestamp)))
70    }
71}