pocket_relay_client_shared/
fire.rs1use bytes::{Buf, BufMut, Bytes};
5use std::io;
6use tdf::{serialize_vec, TdfSerialize};
7use tokio_util::codec::{Decoder, Encoder};
8
9#[repr(u8)]
11pub enum FrameType {
12 Request = 0x0,
14 Response = 0x1,
16 Notify = 0x2,
18 Error = 0x3,
20}
21
22impl From<u8> for FrameType {
24 fn from(value: u8) -> Self {
25 match value {
26 0x1 => FrameType::Response,
27 0x2 => FrameType::Notify,
28 0x3 => FrameType::Error,
29 _ => FrameType::Request,
30 }
31 }
32}
33
34pub struct FrameHeader {
36 pub length: usize,
38 pub component: u16,
40 pub command: u16,
42 pub error: u16,
44 pub ty: FrameType,
46 pub options: u8,
48 pub seq: u16,
50}
51
52pub struct Frame {
54 pub header: FrameHeader,
56 pub contents: Bytes,
58}
59
60impl Frame {
61 pub fn response_raw(header: &FrameHeader, contents: Bytes) -> Frame {
68 Frame {
69 header: FrameHeader {
70 length: contents.len(),
71 component: header.component,
72 command: header.command,
73 error: 0,
74 ty: FrameType::Response,
75 options: 0,
76 seq: header.seq,
77 },
78 contents,
79 }
80 }
81
82 #[inline]
89 pub fn response<V>(header: &FrameHeader, value: V) -> Frame
90 where
91 V: TdfSerialize,
92 {
93 Self::response_raw(header, Bytes::from(serialize_vec(&value)))
94 }
95
96 pub fn response_empty(header: &FrameHeader) -> Frame {
102 Self::response_raw(header, Bytes::new())
103 }
104}
105
106#[derive(Default)]
108pub struct FireCodec {
109 current_frame: Option<FrameHeader>,
111}
112
113impl FireCodec {
114 const MIN_HEADER_SIZE: usize = 12;
115}
116
117impl Decoder for FireCodec {
118 type Error = io::Error;
120 type Item = Frame;
122
123 fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
124 let current_frame = if let Some(current_frame) = self.current_frame.as_mut() {
125 current_frame
127 } else {
128 if src.len() < Self::MIN_HEADER_SIZE {
130 return Ok(None);
131 }
132
133 let length: usize = src.get_u16() as usize;
135 let component: u16 = src.get_u16();
136 let command: u16 = src.get_u16();
137 let error: u16 = src.get_u16();
138 let ty: FrameType = FrameType::from(src.get_u8() >> 4);
139 let options: u8 = src.get_u8() >> 4;
140 let seq: u16 = src.get_u16();
141
142 let header = FrameHeader {
143 length,
144 component,
145 command,
146 error,
147 ty,
148 options,
149 seq,
150 };
151
152 self.current_frame.insert(header)
153 };
154
155 if src.len() < current_frame.length {
157 return Ok(None);
158 }
159
160 let header = self.current_frame.take().expect("Missing current frame");
162 let buffer = src.split_to(header.length);
164
165 Ok(Some(Frame {
166 header,
167 contents: buffer.freeze(),
168 }))
169 }
170}
171
172impl Encoder<Frame> for FireCodec {
173 type Error = io::Error;
174
175 fn encode(&mut self, item: Frame, dst: &mut bytes::BytesMut) -> Result<(), Self::Error> {
176 let header = item.header;
177 dst.put_u16(header.length as u16);
178 dst.put_u16(header.component);
179 dst.put_u16(header.command);
180 dst.put_u16(header.error);
181 dst.put_u8((header.ty as u8) << 4);
182 dst.put_u8(header.options << 4);
183 dst.put_u16(header.seq);
184
185 dst.extend_from_slice(&item.contents);
186 Ok(())
187 }
188}