hydra/frame/
mod.rs

1use std::io::Error;
2use std::io::ErrorKind;
3
4use bincode::Decode;
5use bincode::Encode;
6
7use bincode::config;
8use bincode::config::Configuration;
9use bincode::config::Fixint;
10use bincode::config::LittleEndian;
11use bincode::error::DecodeError;
12
13use bytes::Buf;
14use bytes::BufMut;
15use bytes::BytesMut;
16
17use tokio_util::codec::Decoder;
18use tokio_util::codec::Encoder;
19
20mod exit;
21mod hello;
22mod link;
23mod link_down;
24mod monitor;
25mod monitor_down;
26mod monitor_update;
27mod ping;
28mod pong;
29mod send;
30
31pub use exit::*;
32pub use hello::*;
33pub use link::*;
34pub use link_down::*;
35pub use monitor::*;
36pub use monitor_down::*;
37pub use monitor_update::*;
38pub use ping::*;
39pub use pong::*;
40pub use send::*;
41
42/// Bincode configuration for the frame codec.
43const FRAME_CONFIG: Configuration<LittleEndian, Fixint> = config::standard()
44    .with_fixed_int_encoding()
45    .with_little_endian();
46
47/// The size of the marker.
48const MARKER_LENGTH: usize = std::mem::size_of::<u32>();
49
50/// A frame value for the codec.
51#[derive(Debug, Encode, Decode)]
52pub enum Frame {
53    Hello(Hello),
54    Ping,
55    Pong,
56    Send(Send),
57    Monitor(Monitor),
58    MonitorDown(MonitorDown),
59    MonitorUpdate(MonitorUpdate),
60    Link(Link),
61    LinkDown(LinkDown),
62    Exit(Exit),
63}
64
65impl From<Hello> for Frame {
66    fn from(value: Hello) -> Self {
67        Self::Hello(value)
68    }
69}
70
71impl From<Ping> for Frame {
72    fn from(_: Ping) -> Self {
73        Self::Ping
74    }
75}
76
77impl From<Pong> for Frame {
78    fn from(_: Pong) -> Self {
79        Self::Pong
80    }
81}
82
83impl From<Send> for Frame {
84    fn from(value: Send) -> Self {
85        Self::Send(value)
86    }
87}
88
89impl From<Monitor> for Frame {
90    fn from(value: Monitor) -> Self {
91        Self::Monitor(value)
92    }
93}
94
95impl From<MonitorDown> for Frame {
96    fn from(value: MonitorDown) -> Self {
97        Self::MonitorDown(value)
98    }
99}
100
101impl From<MonitorUpdate> for Frame {
102    fn from(value: MonitorUpdate) -> Self {
103        Self::MonitorUpdate(value)
104    }
105}
106
107impl From<Link> for Frame {
108    fn from(value: Link) -> Self {
109        Self::Link(value)
110    }
111}
112
113impl From<LinkDown> for Frame {
114    fn from(value: LinkDown) -> Self {
115        Self::LinkDown(value)
116    }
117}
118
119impl From<Exit> for Frame {
120    fn from(value: Exit) -> Self {
121        Self::Exit(value)
122    }
123}
124
125/// The frame codec.
126#[derive(Default)]
127pub struct Codec;
128
129impl Codec {
130    /// Constructs a new instance of [Codec] with default settings.
131    pub const fn new() -> Self {
132        Self
133    }
134}
135
136impl Encoder<Frame> for Codec {
137    type Error = Error;
138
139    fn encode(&mut self, item: Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
140        let marker = dst.len();
141
142        dst.put_u32_le(0);
143
144        let size = bincode::encode_into_std_write(item, &mut dst.writer(), FRAME_CONFIG)
145            .map_err(|e| Error::new(ErrorKind::InvalidInput, e))?;
146
147        dst[marker..marker + MARKER_LENGTH].copy_from_slice(&(size as u32).to_le_bytes());
148
149        Ok(())
150    }
151}
152
153impl Decoder for Codec {
154    type Item = Frame;
155    type Error = Error;
156
157    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
158        if src.len() < 4 {
159            return Ok(None);
160        }
161
162        let mut length_marker = [0u8; MARKER_LENGTH];
163
164        length_marker.copy_from_slice(&src[0..MARKER_LENGTH]);
165
166        let length = u32::from_le_bytes(length_marker) as usize;
167
168        if src.len() < MARKER_LENGTH + length {
169            src.reserve(MARKER_LENGTH + length - src.len());
170            return Ok(None);
171        }
172
173        let result = bincode::decode_from_slice(&src[4..], FRAME_CONFIG);
174
175        match result {
176            Ok((frame, _)) => {
177                src.advance(MARKER_LENGTH + length);
178                Ok(Some(frame))
179            }
180            Err(DecodeError::UnexpectedEnd { additional }) => {
181                src.reserve(additional);
182                Ok(None)
183            }
184            Err(e) => Err(Error::new(ErrorKind::InvalidData, e)),
185        }
186    }
187}