1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
//! Minimal fire packet parsing and creation implementation
//! this supports the very minimal required features
use bytes::{Buf, BufMut, Bytes};
use std::io;
use tdf::{serialize_vec, TdfSerialize};
use tokio_util::codec::{Decoder, Encoder};
/// The different types of frames that can be sent
#[repr(u8)]
pub enum FrameType {
/// Request to a server
Request = 0x0,
/// Response to a request
Response = 0x1,
/// Async notification from the server
Notify = 0x2,
/// Error response from the server
Error = 0x3,
}
/// Conversion for bytes to frame type
impl From<u8> for FrameType {
fn from(value: u8) -> Self {
match value {
0x1 => FrameType::Response,
0x2 => FrameType::Notify,
0x3 => FrameType::Error,
_ => FrameType::Request,
}
}
}
/// The header for a fire frame
pub struct FrameHeader {
/// The length of the frame contents
pub length: usize,
/// The component that should handle this frame
pub component: u16,
/// The command that should handle this frame
pub command: u16,
/// Error code if this is an error frame
pub error: u16,
/// The type of frame
pub ty: FrameType,
/// Frame options bitset
pub options: u8,
/// Sequence number for tracking request and response mappings
pub seq: u16,
}
/// Packet framing structure
pub struct Frame {
/// Header for the frame
pub header: FrameHeader,
/// The encoded byte contents of the packet
pub contents: Bytes,
}
impl Frame {
/// Creates a new response frame responding to the provided `header`
/// with the bytes contents of `value`
///
/// ## Arguments
/// * `header` - The header of the frame to respond to
/// * `contents` - The bytes of the frame
pub fn response_raw(header: &FrameHeader, contents: Bytes) -> Frame {
Frame {
header: FrameHeader {
length: contents.len(),
component: header.component,
command: header.command,
error: 0,
ty: FrameType::Response,
options: 0,
seq: header.seq,
},
contents,
}
}
/// Creates a new response frame responding to the provided `header`
/// with the encoded contents of the `value`
///
/// ## Arguments
/// * `header` - The header of the frame to respond to
/// * `value` - The value to encode as the frame bytes
#[inline]
pub fn response<V>(header: &FrameHeader, value: V) -> Frame
where
V: TdfSerialize,
{
Self::response_raw(header, Bytes::from(serialize_vec(&value)))
}
/// Creates a new response frame responding to the provided `header`
/// with empty contents
///
/// ## Arguments
/// * `header` - The header of the frame to respond to
pub fn response_empty(header: &FrameHeader) -> Frame {
Self::response_raw(header, Bytes::new())
}
}
/// Codec for encoding and decoding fire frames
#[derive(Default)]
pub struct FireCodec {
/// Incomplete frame thats currently being read
current_frame: Option<FrameHeader>,
}
impl FireCodec {
const MIN_HEADER_SIZE: usize = 12;
}
impl Decoder for FireCodec {
// The codec doesn't have any errors of its own so IO error is used
type Error = io::Error;
// The decoder provides fire frames
type Item = Frame;
fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let current_frame = if let Some(current_frame) = self.current_frame.as_mut() {
// Use the existing frame
current_frame
} else {
// Ensure there is atleast enough bytes for the header
if src.len() < Self::MIN_HEADER_SIZE {
return Ok(None);
}
// Read the length of the fire frame
let length: usize = src.get_u16() as usize;
let component: u16 = src.get_u16();
let command: u16 = src.get_u16();
let error: u16 = src.get_u16();
let ty: FrameType = FrameType::from(src.get_u8() >> 4);
let options: u8 = src.get_u8() >> 4;
let seq: u16 = src.get_u16();
let header = FrameHeader {
length,
component,
command,
error,
ty,
options,
seq,
};
self.current_frame.insert(header)
};
// Ensure there are enough bytes for the entire frame
if src.len() < current_frame.length {
return Ok(None);
}
// Take the frame we are currently working on
let header = self.current_frame.take().expect("Missing current frame");
// Take all the frame bytes
let buffer = src.split_to(header.length);
Ok(Some(Frame {
header,
contents: buffer.freeze(),
}))
}
}
impl Encoder<Frame> for FireCodec {
type Error = io::Error;
fn encode(&mut self, item: Frame, dst: &mut bytes::BytesMut) -> Result<(), Self::Error> {
let header = item.header;
dst.put_u16(header.length as u16);
dst.put_u16(header.component);
dst.put_u16(header.command);
dst.put_u16(header.error);
dst.put_u8((header.ty as u8) << 4);
dst.put_u8(header.options << 4);
dst.put_u16(header.seq);
dst.extend_from_slice(&item.contents);
Ok(())
}
}