oddity_rtsp_protocol/
tokio.rs

1use tokio_util::codec::{Decoder, Encoder};
2
3use bytes::BytesMut;
4
5use super::{
6    error::Error,
7    interleaved::{self, InterleavedParser, MaybeInterleaved},
8    io::Target,
9    parse::{Parser, Status},
10    serialize::Serialize,
11};
12
13pub struct Codec<T: Target> {
14    state: State,
15    parser: Parser<T::Inbound>,
16    interleaved_parser: InterleavedParser,
17}
18
19enum State {
20    Init,
21    ParseMessage,
22    ParseInterleaved,
23}
24
25impl<T: Target> Codec<T> {
26    pub fn new() -> Self {
27        Self {
28            state: State::Init,
29            parser: Parser::<T::Inbound>::new(),
30            interleaved_parser: InterleavedParser::new(),
31        }
32    }
33}
34
35impl<T: Target> Default for Codec<T> {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41impl<T: Target> Decoder for Codec<T> {
42    type Item = MaybeInterleaved<T::Inbound>;
43    type Error = Error;
44
45    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
46        if let State::Init = self.state {
47            if !src.is_empty() {
48                if src[0] == interleaved::MAGIC {
49                    self.state = State::ParseInterleaved;
50                } else {
51                    self.state = State::ParseMessage;
52                }
53            } else {
54                return Ok(None);
55            }
56        };
57
58        match &mut self.state {
59            State::Init => unreachable!(),
60            State::ParseMessage => match self.parser.parse(src)? {
61                Status::Done => {
62                    self.state = State::Init;
63                    let parser = std::mem::take(&mut self.parser);
64                    Ok(Some(
65                        parser
66                            .into_message()
67                            .map(MaybeInterleaved::<T::Inbound>::Message)?,
68                    ))
69                }
70                Status::Hungry => Ok(None),
71            },
72            State::ParseInterleaved => match self.interleaved_parser.parse(src) {
73                Some(parsed) => {
74                    let (channel, payload) = parsed?;
75                    self.state = State::Init;
76                    self.interleaved_parser = InterleavedParser::new();
77                    Ok(Some(MaybeInterleaved::<T::Inbound>::Interleaved {
78                        channel,
79                        payload,
80                    }))
81                }
82                None => Ok(None),
83            },
84        }
85    }
86}
87
88impl<T: Target> Encoder<MaybeInterleaved<T::Outbound>> for Codec<T> {
89    type Error = Error;
90
91    fn encode(
92        &mut self,
93        item: MaybeInterleaved<T::Outbound>,
94        dst: &mut BytesMut,
95    ) -> Result<(), Self::Error> {
96        item.serialize(dst)
97    }
98}