oddity_rtsp_protocol/
tokio.rs1use tokio_util::codec::{Decoder, Encoder};
2
3use bytes::BytesMut;
4
5use super::{
6 error::Error,
7 interleaved::{self, InterleavedParser, MaybeInterleaved},
8 io::Target,
9 parse::{Parser, Status},
10 serialize::Serialize,
11};
12
13pub struct Codec<T: Target> {
14 state: State,
15 parser: Parser<T::Inbound>,
16 interleaved_parser: InterleavedParser,
17}
18
19enum State {
20 Init,
21 ParseMessage,
22 ParseInterleaved,
23}
24
25impl<T: Target> Codec<T> {
26 pub fn new() -> Self {
27 Self {
28 state: State::Init,
29 parser: Parser::<T::Inbound>::new(),
30 interleaved_parser: InterleavedParser::new(),
31 }
32 }
33}
34
35impl<T: Target> Default for Codec<T> {
36 fn default() -> Self {
37 Self::new()
38 }
39}
40
41impl<T: Target> Decoder for Codec<T> {
42 type Item = MaybeInterleaved<T::Inbound>;
43 type Error = Error;
44
45 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
46 if let State::Init = self.state {
47 if !src.is_empty() {
48 if src[0] == interleaved::MAGIC {
49 self.state = State::ParseInterleaved;
50 } else {
51 self.state = State::ParseMessage;
52 }
53 } else {
54 return Ok(None);
55 }
56 };
57
58 match &mut self.state {
59 State::Init => unreachable!(),
60 State::ParseMessage => match self.parser.parse(src)? {
61 Status::Done => {
62 self.state = State::Init;
63 let parser = std::mem::take(&mut self.parser);
64 Ok(Some(
65 parser
66 .into_message()
67 .map(MaybeInterleaved::<T::Inbound>::Message)?,
68 ))
69 }
70 Status::Hungry => Ok(None),
71 },
72 State::ParseInterleaved => match self.interleaved_parser.parse(src) {
73 Some(parsed) => {
74 let (channel, payload) = parsed?;
75 self.state = State::Init;
76 self.interleaved_parser = InterleavedParser::new();
77 Ok(Some(MaybeInterleaved::<T::Inbound>::Interleaved {
78 channel,
79 payload,
80 }))
81 }
82 None => Ok(None),
83 },
84 }
85 }
86}
87
88impl<T: Target> Encoder<MaybeInterleaved<T::Outbound>> for Codec<T> {
89 type Error = Error;
90
91 fn encode(
92 &mut self,
93 item: MaybeInterleaved<T::Outbound>,
94 dst: &mut BytesMut,
95 ) -> Result<(), Self::Error> {
96 item.serialize(dst)
97 }
98}