dnstap_utils/
framestreams_codec.rs

1// Copyright 2021 Fastly, Inc.
2
3use bytes::{Buf, BufMut, BytesMut};
4use tokio_util::codec::{Decoder, Encoder};
5
6// Implementation defined limits.
7
8pub const CONTROL_FRAME_LENGTH_MAX: usize = 512;
9pub const FRAME_LENGTH_MAX: usize = 128 * 1024;
10
11// Protocol constants.
12
13pub const FRAMESTREAMS_CONTROL_ACCEPT: u32 = 0x01;
14pub const FRAMESTREAMS_CONTROL_START: u32 = 0x02;
15pub const FRAMESTREAMS_CONTROL_STOP: u32 = 0x03;
16pub const FRAMESTREAMS_CONTROL_READY: u32 = 0x04;
17pub const FRAMESTREAMS_CONTROL_FINISH: u32 = 0x05;
18
19pub const FRAMESTREAMS_CONTROL_FIELD_CONTENT_TYPE: u32 = 0x01;
20
21pub const FRAMESTREAMS_ESCAPE_SEQUENCE: u32 = 0x00;
22
23#[derive(Debug)]
24pub enum Frame {
25    ControlReady(BytesMut),
26    ControlAccept(BytesMut),
27    ControlStart(BytesMut),
28    ControlStop,
29    ControlFinish,
30    ControlUnknown(BytesMut),
31    Data(BytesMut),
32}
33
34pub struct FrameStreamsCodec {}
35
36impl Encoder<Frame> for FrameStreamsCodec {
37    type Error = std::io::Error;
38
39    fn encode(&mut self, frame: Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
40        match frame {
41            Frame::ControlReady(payload) => {
42                dst.put_u32(FRAMESTREAMS_ESCAPE_SEQUENCE);
43                dst.put_u32(4);
44                dst.put_u32(FRAMESTREAMS_CONTROL_READY);
45                dst.put(payload);
46            }
47            Frame::ControlAccept(payload) => {
48                dst.put_u32(FRAMESTREAMS_ESCAPE_SEQUENCE);
49                dst.put_u32(4 + payload.len() as u32);
50                dst.put_u32(FRAMESTREAMS_CONTROL_ACCEPT);
51                dst.put(payload);
52            }
53            Frame::ControlStart(payload) => {
54                dst.put_u32(FRAMESTREAMS_ESCAPE_SEQUENCE);
55                dst.put_u32(4 + payload.len() as u32);
56                dst.put_u32(FRAMESTREAMS_CONTROL_START);
57                dst.put(payload);
58            }
59            Frame::ControlStop => {
60                dst.put_u32(FRAMESTREAMS_ESCAPE_SEQUENCE);
61                dst.put_u32(4);
62                dst.put_u32(FRAMESTREAMS_CONTROL_STOP);
63            }
64            Frame::ControlFinish => {
65                dst.put_u32(FRAMESTREAMS_ESCAPE_SEQUENCE);
66                dst.put_u32(4);
67                dst.put_u32(FRAMESTREAMS_CONTROL_FINISH);
68            }
69            Frame::ControlUnknown(_) => todo!(),
70            Frame::Data(payload) => {
71                dst.put_u32(payload.len() as u32);
72                dst.put(payload);
73            }
74        }
75        Ok(())
76    }
77}
78
79impl Decoder for FrameStreamsCodec {
80    type Item = Frame;
81    type Error = std::io::Error;
82
83    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
84        // Check if there is enough data to read the frame length.
85        if src.len() < 4 {
86            return Ok(None);
87        }
88
89        // Read the frame length.
90        let mut len_frame_bytes = [0u8; 4];
91        len_frame_bytes.copy_from_slice(&src[..4]);
92        let len_frame = u32::from_be_bytes(len_frame_bytes) as usize;
93
94        // Enforce the maximum frame size.
95        if len_frame > FRAME_LENGTH_MAX {
96            return Err(std::io::Error::new(
97                std::io::ErrorKind::InvalidData,
98                format!(
99                    "Frame of length {} is too large for this implementation",
100                    len_frame
101                ),
102            ));
103        }
104
105        // Check if this is a control frame.
106        if len_frame == FRAMESTREAMS_ESCAPE_SEQUENCE as usize {
107            // Check if there is enough data to read the control frame escape (4 bytes) + the
108            // control frame length (4 bytes).
109            if src.len() < 4 + 4 {
110                return Ok(None);
111            }
112
113            // Read the control frame length.
114            let mut len_control_bytes = [0u8; 4];
115            len_control_bytes.copy_from_slice(&src[4..8]);
116            let len_control = u32::from_be_bytes(len_control_bytes) as usize;
117
118            // Check that the control frame length is large enough. The minimum control frame
119            // length is 4 bytes (to encode the control frame types that have no payload).
120            if len_control < 4 {
121                return Err(std::io::Error::new(
122                    std::io::ErrorKind::InvalidData,
123                    format!("Control frame of length {} is too small", len_control),
124                ));
125            }
126
127            // Check that the control frame length is not too large.
128            if len_control > CONTROL_FRAME_LENGTH_MAX {
129                return Err(std::io::Error::new(
130                    std::io::ErrorKind::InvalidData,
131                    format!("Control frame of length {} is too large", len_control),
132                ));
133            }
134
135            // Check if there is enough data to read the full control frame sequence of the control
136            // frame escape (4 bytes) + the control frame length (4 bytes) + the actual length of
137            // the control frame.
138            if src.len() < 4 + 4 + len_control {
139                return Ok(None);
140            }
141
142            // Detach the control frame escape (4 bytes) and the control frame length (4 bytes).
143            src.advance(4 + 4);
144
145            // Read the control frame type. This is the first 4 bytes of the control frame.
146            let control_frame_type = src.get_u32();
147
148            // Detach the control frame payload. This is the remainder of the control frame after
149            // the control frame type.
150            let payload = src.split_to(len_control - 4);
151
152            Ok(Some(match control_frame_type {
153                FRAMESTREAMS_CONTROL_READY => Frame::ControlReady(payload),
154                FRAMESTREAMS_CONTROL_ACCEPT => Frame::ControlAccept(payload),
155                FRAMESTREAMS_CONTROL_START => Frame::ControlStart(payload),
156                FRAMESTREAMS_CONTROL_STOP => Frame::ControlStop,
157                FRAMESTREAMS_CONTROL_FINISH => Frame::ControlFinish,
158                _ => Frame::ControlUnknown(payload),
159            }))
160        } else {
161            // This is a data frame.
162
163            // Check if there is enough data to read the frame length plus the data frame.
164            if src.len() < 4 + len_frame {
165                // Inform the Framed that more data is needed.
166                return Ok(None);
167            }
168
169            // Detach the frame length.
170            src.advance(4);
171
172            // Detach the data frame.
173            let data = src.split_to(len_frame);
174
175            // Return the bytes of the frame.
176            Ok(Some(Frame::Data(data)))
177        }
178    }
179}
180
181/// Helper function for encoding the "content type" payload used in the Frame Streams Ready,
182/// Accept, and Start control frames.
183pub fn encode_content_type_payload(content_type: &[u8]) -> BytesMut {
184    let mut buf = BytesMut::with_capacity(4 + 4 + content_type.len());
185    buf.put_u32(FRAMESTREAMS_CONTROL_FIELD_CONTENT_TYPE);
186    buf.put_u32(content_type.len() as u32);
187    buf.put(content_type);
188    buf
189}