pocket_relay_client_shared/
fire.rs

1//! Minimal fire packet parsing and creation implementation
2//! this supports the very minimal required features
3
4use bytes::{Buf, BufMut, Bytes};
5use std::io;
6use tdf::{serialize_vec, TdfSerialize};
7use tokio_util::codec::{Decoder, Encoder};
8
9/// The different types of frames that can be sent
10#[repr(u8)]
11pub enum FrameType {
12    /// Request to a server
13    Request = 0x0,
14    /// Response to a request
15    Response = 0x1,
16    /// Async notification from the server
17    Notify = 0x2,
18    /// Error response from the server
19    Error = 0x3,
20}
21
22/// Conversion for bytes to frame type
23impl From<u8> for FrameType {
24    fn from(value: u8) -> Self {
25        match value {
26            0x1 => FrameType::Response,
27            0x2 => FrameType::Notify,
28            0x3 => FrameType::Error,
29            _ => FrameType::Request,
30        }
31    }
32}
33
34/// The header for a fire frame
35pub struct FrameHeader {
36    /// The length of the frame contents
37    pub length: usize,
38    /// The component that should handle this frame
39    pub component: u16,
40    /// The command that should handle this frame
41    pub command: u16,
42    /// Error code if this is an error frame
43    pub error: u16,
44    /// The type of frame
45    pub ty: FrameType,
46    /// Frame options bitset
47    pub options: u8,
48    /// Sequence number for tracking request and response mappings
49    pub seq: u16,
50}
51
52/// Packet framing structure
53pub struct Frame {
54    /// Header for the frame
55    pub header: FrameHeader,
56    /// The encoded byte contents of the packet
57    pub contents: Bytes,
58}
59
60impl Frame {
61    /// Creates a new response frame responding to the provided `header`
62    /// with the bytes contents of `value`
63    ///
64    /// ## Arguments
65    /// * `header`   - The header of the frame to respond to
66    /// * `contents` - The bytes of the frame
67    pub fn response_raw(header: &FrameHeader, contents: Bytes) -> Frame {
68        Frame {
69            header: FrameHeader {
70                length: contents.len(),
71                component: header.component,
72                command: header.command,
73                error: 0,
74                ty: FrameType::Response,
75                options: 0,
76                seq: header.seq,
77            },
78            contents,
79        }
80    }
81
82    /// Creates a new response frame responding to the provided `header`
83    /// with the encoded contents of the `value`
84    ///
85    /// ## Arguments
86    /// * `header` - The header of the frame to respond to
87    /// * `value`  - The value to encode as the frame bytes
88    #[inline]
89    pub fn response<V>(header: &FrameHeader, value: V) -> Frame
90    where
91        V: TdfSerialize,
92    {
93        Self::response_raw(header, Bytes::from(serialize_vec(&value)))
94    }
95
96    /// Creates a new response frame responding to the provided `header`
97    /// with empty contents
98    ///
99    /// ## Arguments
100    /// * `header` - The header of the frame to respond to
101    pub fn response_empty(header: &FrameHeader) -> Frame {
102        Self::response_raw(header, Bytes::new())
103    }
104}
105
106/// Codec for encoding and decoding fire frames
107#[derive(Default)]
108pub struct FireCodec {
109    /// Incomplete frame thats currently being read
110    current_frame: Option<FrameHeader>,
111}
112
113impl FireCodec {
114    const MIN_HEADER_SIZE: usize = 12;
115}
116
117impl Decoder for FireCodec {
118    // The codec doesn't have any errors of its own so IO error is used
119    type Error = io::Error;
120    // The decoder provides fire frames
121    type Item = Frame;
122
123    fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
124        let current_frame = if let Some(current_frame) = self.current_frame.as_mut() {
125            // Use the existing frame
126            current_frame
127        } else {
128            // Ensure there is at least enough bytes for the header
129            if src.len() < Self::MIN_HEADER_SIZE {
130                return Ok(None);
131            }
132
133            // Read the length of the fire frame
134            let length: usize = src.get_u16() as usize;
135            let component: u16 = src.get_u16();
136            let command: u16 = src.get_u16();
137            let error: u16 = src.get_u16();
138            let ty: FrameType = FrameType::from(src.get_u8() >> 4);
139            let options: u8 = src.get_u8() >> 4;
140            let seq: u16 = src.get_u16();
141
142            let header = FrameHeader {
143                length,
144                component,
145                command,
146                error,
147                ty,
148                options,
149                seq,
150            };
151
152            self.current_frame.insert(header)
153        };
154
155        // Ensure there are enough bytes for the entire frame
156        if src.len() < current_frame.length {
157            return Ok(None);
158        }
159
160        // Take the frame we are currently working on
161        let header = self.current_frame.take().expect("Missing current frame");
162        // Take all the frame bytes
163        let buffer = src.split_to(header.length);
164
165        Ok(Some(Frame {
166            header,
167            contents: buffer.freeze(),
168        }))
169    }
170}
171
172impl Encoder<Frame> for FireCodec {
173    type Error = io::Error;
174
175    fn encode(&mut self, item: Frame, dst: &mut bytes::BytesMut) -> Result<(), Self::Error> {
176        let header = item.header;
177        dst.put_u16(header.length as u16);
178        dst.put_u16(header.component);
179        dst.put_u16(header.command);
180        dst.put_u16(header.error);
181        dst.put_u8((header.ty as u8) << 4);
182        dst.put_u8(header.options << 4);
183        dst.put_u16(header.seq);
184
185        dst.extend_from_slice(&item.contents);
186        Ok(())
187    }
188}