oddity_rtsp_protocol/
interleaved.rs

1use std::fmt;
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4
5use super::{
6    error::{Error, Result},
7    message::Message,
8    request::Request,
9    response::Response,
10    serialize::Serialize,
11};
12
13pub const MAGIC: u8 = 0x24; // $
14
15pub type ChannelId = u8;
16
17pub type RequestMaybeInterleaved = MaybeInterleaved<Request>;
18pub type ResponseMaybeInterleaved = MaybeInterleaved<Response>;
19
20pub enum MaybeInterleaved<M: Message> {
21    Message(M),
22    Interleaved { channel: ChannelId, payload: Bytes },
23}
24
25impl<M: Message> fmt::Display for MaybeInterleaved<M> {
26    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
27        match self {
28            Self::Message(message) => write!(f, "{}", message),
29            Self::Interleaved { channel, payload } => write!(
30                f,
31                "interleaved payload over channel: {}, size: {}",
32                channel,
33                payload.len()
34            ),
35        }
36    }
37}
38
39impl<M: Message> Serialize for MaybeInterleaved<M> {
40    fn serialize(self, dst: &mut BytesMut) -> Result<()> {
41        match self {
42            Self::Message(response) => response.serialize(dst),
43            Self::Interleaved { channel, payload } => {
44                dst.put_u8(MAGIC); // $
45                dst.put_u8(channel);
46                dst.put_u16(
47                    payload
48                        .len()
49                        .try_into()
50                        .map_err(|_| Error::InterleavedPayloadTooLarge)?,
51                );
52                dst.put(payload);
53
54                Ok(())
55            }
56        }
57    }
58}
59
60pub struct InterleavedParser {
61    channel_and_size: Option<(u8, u16)>,
62}
63
64impl InterleavedParser {
65    pub fn new() -> Self {
66        Self {
67            channel_and_size: None,
68        }
69    }
70
71    pub fn parse(&mut self, buffer: &mut impl Buf) -> Option<Result<(ChannelId, Bytes)>> {
72        if let Some((channel, size)) = self.channel_and_size {
73            if buffer.remaining() >= size.into() {
74                let payload = buffer.copy_to_bytes(size.into());
75                Some(Ok((channel, payload)))
76            } else {
77                None
78            }
79        } else if buffer.remaining() >= 4 {
80            let header = &buffer.chunk()[..4];
81            if header[0] != MAGIC {
82                return Some(Err(Error::InterleavedInvalid));
83            }
84
85            let channel = header[1];
86            let size = u16::from_be_bytes([header[2], header[3]]);
87
88            self.channel_and_size = Some((channel, size));
89
90            buffer.advance(4);
91
92            self.parse(buffer)
93        } else {
94            None
95        }
96    }
97}