dnstap_utils/
framestreams_codec.rs1use bytes::{Buf, BufMut, BytesMut};
4use tokio_util::codec::{Decoder, Encoder};
5
6pub const CONTROL_FRAME_LENGTH_MAX: usize = 512;
9pub const FRAME_LENGTH_MAX: usize = 128 * 1024;
10
11pub const FRAMESTREAMS_CONTROL_ACCEPT: u32 = 0x01;
14pub const FRAMESTREAMS_CONTROL_START: u32 = 0x02;
15pub const FRAMESTREAMS_CONTROL_STOP: u32 = 0x03;
16pub const FRAMESTREAMS_CONTROL_READY: u32 = 0x04;
17pub const FRAMESTREAMS_CONTROL_FINISH: u32 = 0x05;
18
19pub const FRAMESTREAMS_CONTROL_FIELD_CONTENT_TYPE: u32 = 0x01;
20
21pub const FRAMESTREAMS_ESCAPE_SEQUENCE: u32 = 0x00;
22
23#[derive(Debug)]
24pub enum Frame {
25 ControlReady(BytesMut),
26 ControlAccept(BytesMut),
27 ControlStart(BytesMut),
28 ControlStop,
29 ControlFinish,
30 ControlUnknown(BytesMut),
31 Data(BytesMut),
32}
33
34pub struct FrameStreamsCodec {}
35
36impl Encoder<Frame> for FrameStreamsCodec {
37 type Error = std::io::Error;
38
39 fn encode(&mut self, frame: Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
40 match frame {
41 Frame::ControlReady(payload) => {
42 dst.put_u32(FRAMESTREAMS_ESCAPE_SEQUENCE);
43 dst.put_u32(4);
44 dst.put_u32(FRAMESTREAMS_CONTROL_READY);
45 dst.put(payload);
46 }
47 Frame::ControlAccept(payload) => {
48 dst.put_u32(FRAMESTREAMS_ESCAPE_SEQUENCE);
49 dst.put_u32(4 + payload.len() as u32);
50 dst.put_u32(FRAMESTREAMS_CONTROL_ACCEPT);
51 dst.put(payload);
52 }
53 Frame::ControlStart(payload) => {
54 dst.put_u32(FRAMESTREAMS_ESCAPE_SEQUENCE);
55 dst.put_u32(4 + payload.len() as u32);
56 dst.put_u32(FRAMESTREAMS_CONTROL_START);
57 dst.put(payload);
58 }
59 Frame::ControlStop => {
60 dst.put_u32(FRAMESTREAMS_ESCAPE_SEQUENCE);
61 dst.put_u32(4);
62 dst.put_u32(FRAMESTREAMS_CONTROL_STOP);
63 }
64 Frame::ControlFinish => {
65 dst.put_u32(FRAMESTREAMS_ESCAPE_SEQUENCE);
66 dst.put_u32(4);
67 dst.put_u32(FRAMESTREAMS_CONTROL_FINISH);
68 }
69 Frame::ControlUnknown(_) => todo!(),
70 Frame::Data(payload) => {
71 dst.put_u32(payload.len() as u32);
72 dst.put(payload);
73 }
74 }
75 Ok(())
76 }
77}
78
79impl Decoder for FrameStreamsCodec {
80 type Item = Frame;
81 type Error = std::io::Error;
82
83 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
84 if src.len() < 4 {
86 return Ok(None);
87 }
88
89 let mut len_frame_bytes = [0u8; 4];
91 len_frame_bytes.copy_from_slice(&src[..4]);
92 let len_frame = u32::from_be_bytes(len_frame_bytes) as usize;
93
94 if len_frame > FRAME_LENGTH_MAX {
96 return Err(std::io::Error::new(
97 std::io::ErrorKind::InvalidData,
98 format!(
99 "Frame of length {} is too large for this implementation",
100 len_frame
101 ),
102 ));
103 }
104
105 if len_frame == FRAMESTREAMS_ESCAPE_SEQUENCE as usize {
107 if src.len() < 4 + 4 {
110 return Ok(None);
111 }
112
113 let mut len_control_bytes = [0u8; 4];
115 len_control_bytes.copy_from_slice(&src[4..8]);
116 let len_control = u32::from_be_bytes(len_control_bytes) as usize;
117
118 if len_control < 4 {
121 return Err(std::io::Error::new(
122 std::io::ErrorKind::InvalidData,
123 format!("Control frame of length {} is too small", len_control),
124 ));
125 }
126
127 if len_control > CONTROL_FRAME_LENGTH_MAX {
129 return Err(std::io::Error::new(
130 std::io::ErrorKind::InvalidData,
131 format!("Control frame of length {} is too large", len_control),
132 ));
133 }
134
135 if src.len() < 4 + 4 + len_control {
139 return Ok(None);
140 }
141
142 src.advance(4 + 4);
144
145 let control_frame_type = src.get_u32();
147
148 let payload = src.split_to(len_control - 4);
151
152 Ok(Some(match control_frame_type {
153 FRAMESTREAMS_CONTROL_READY => Frame::ControlReady(payload),
154 FRAMESTREAMS_CONTROL_ACCEPT => Frame::ControlAccept(payload),
155 FRAMESTREAMS_CONTROL_START => Frame::ControlStart(payload),
156 FRAMESTREAMS_CONTROL_STOP => Frame::ControlStop,
157 FRAMESTREAMS_CONTROL_FINISH => Frame::ControlFinish,
158 _ => Frame::ControlUnknown(payload),
159 }))
160 } else {
161 if src.len() < 4 + len_frame {
165 return Ok(None);
167 }
168
169 src.advance(4);
171
172 let data = src.split_to(len_frame);
174
175 Ok(Some(Frame::Data(data)))
177 }
178 }
179}
180
181pub fn encode_content_type_payload(content_type: &[u8]) -> BytesMut {
184 let mut buf = BytesMut::with_capacity(4 + 4 + content_type.len());
185 buf.put_u32(FRAMESTREAMS_CONTROL_FIELD_CONTENT_TYPE);
186 buf.put_u32(content_type.len() as u32);
187 buf.put(content_type);
188 buf
189}