ntex_h2/codec/
mod.rs

1use std::{cell::RefCell, rc::Rc};
2
3use ntex_bytes::BytesMut;
4use ntex_codec::{Decoder, Encoder};
5
6mod error;
7mod length_delimited;
8
9pub use self::error::EncoderError;
10
11use self::length_delimited::LengthDelimitedCodec;
12use crate::{consts, frame, frame::Frame, frame::Kind, hpack};
13
14// Push promise frame kind
15const PUSH_PROMISE: u8 = 5;
16
17#[derive(Clone, Debug)]
18pub struct Codec(Rc<RefCell<CodecInner>>);
19
20/// Partially loaded headers frame
21#[derive(Debug)]
22struct Partial {
23    /// Empty frame
24    frame: frame::Headers,
25    /// Partial header payload
26    buf: BytesMut,
27    /// Number of continuations
28    count: usize,
29}
30
31#[derive(Debug)]
32struct CodecInner {
33    // encoder state
34    encoder_hpack: hpack::Encoder,
35    encoder_last_data_frame: Option<frame::Data>,
36    encoder_max_frame_size: frame::FrameSize, // Max frame size, this is specified by the peer
37
38    // decoder state
39    decoder: LengthDelimitedCodec,
40    decoder_hpack: hpack::Decoder,
41    decoder_max_header_list_size: usize,
42    decoder_max_header_continuations: usize,
43    partial: Option<Partial>, // Partially loaded headers frame
44}
45
46impl Default for Codec {
47    #[inline]
48    /// Returns a new `Codec` with the default max frame size
49    fn default() -> Self {
50        // Delimit the frames
51        let decoder = self::length_delimited::Builder::new()
52            .length_field_length(3)
53            .length_adjustment(9)
54            .max_frame_length(frame::DEFAULT_MAX_FRAME_SIZE as usize)
55            .num_skip(0) // Don't skip the header
56            .new_codec();
57
58        Codec(Rc::new(RefCell::new(CodecInner {
59            decoder,
60            decoder_hpack: hpack::Decoder::new(frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
61            decoder_max_header_list_size: consts::DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE as usize,
62            decoder_max_header_continuations: consts::DEFAULT_MAX_COUNTINUATIONS,
63            partial: None,
64
65            encoder_hpack: hpack::Encoder::default(),
66            encoder_last_data_frame: None,
67            encoder_max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE,
68        })))
69    }
70}
71
72impl Codec {
73    /// Updates the max received frame size.
74    ///
75    /// The change takes effect the next time a frame is decoded. In other
76    /// words, if a frame is currently in process of being decoded with a frame
77    /// size greater than `val` but less than the max frame size in effect
78    /// before calling this function, then the frame will be allowed.
79    #[inline]
80    pub fn set_recv_frame_size(&self, val: usize) {
81        assert!(
82            frame::DEFAULT_MAX_FRAME_SIZE as usize <= val
83                && val <= frame::MAX_MAX_FRAME_SIZE as usize
84        );
85        self.0.borrow_mut().decoder.set_max_frame_length(val);
86    }
87
88    /// Local max frame size.
89    pub fn recv_frame_size(&self) -> u32 {
90        self.0.borrow_mut().decoder.max_frame_length() as u32
91    }
92
93    /// Set the max header list size that can be received.
94    ///
95    /// By default value is set to 48kb
96    pub fn set_recv_header_list_size(&self, val: usize) {
97        self.0.borrow_mut().decoder_max_header_list_size = val;
98    }
99
100    /// Set the max header continuation frames.
101    ///
102    /// By default value is set to 5
103    pub fn set_max_header_continuations(&self, val: usize) {
104        self.0.borrow_mut().decoder_max_header_continuations = val;
105    }
106
107    /// Set the peer's max frame size.
108    pub fn set_send_frame_size(&self, val: usize) {
109        assert!(val <= frame::MAX_MAX_FRAME_SIZE as usize);
110        self.0.borrow_mut().encoder_max_frame_size = val as frame::FrameSize;
111    }
112
113    /// Set the peer's header table size size.
114    pub fn set_send_header_table_size(&self, val: usize) {
115        self.0.borrow_mut().encoder_hpack.update_max_size(val);
116    }
117
118    /// Remote max frame size.
119    pub fn send_frame_size(&self) -> u32 {
120        self.0.borrow_mut().encoder_max_frame_size
121    }
122}
123
124impl Decoder for Codec {
125    type Item = Frame;
126    type Error = frame::FrameError;
127
128    /// Decodes a frame.
129    ///
130    /// This method is intentionally de-generified and outlined because it is very large.
131    fn decode(&self, src: &mut BytesMut) -> Result<Option<Frame>, frame::FrameError> {
132        let mut inner = self.0.borrow_mut();
133        loop {
134            let mut bytes = if let Some(bytes) = inner.decoder.decode(src)? {
135                bytes
136            } else {
137                return Ok(None);
138            };
139
140            // check push promise, we do not support push
141            if bytes[3] == PUSH_PROMISE {
142                return Err(frame::FrameError::UnexpectedPushPromise);
143            }
144
145            // Parse the head
146            let head = frame::Head::parse(&bytes);
147            let kind = head.kind();
148
149            if inner.partial.is_some() && kind != Kind::Continuation {
150                proto_err!(conn: "expected CONTINUATION, got {:?}", kind);
151                return Err(frame::FrameError::Continuation(
152                    frame::FrameContinuationError::Expected,
153                ));
154            }
155
156            log::trace!("decoding {:?} frame, frame buf len {}", kind, bytes.len());
157
158            let frame = match kind {
159                Kind::Settings => frame::Settings::load(head, &bytes[frame::HEADER_LEN..])
160                    .inspect_err(|e| {
161                        proto_err!(conn: "failed to load SETTINGS frame; err={:?}", e);
162                    })?
163                    .into(),
164                Kind::Ping => frame::Ping::load(head, &bytes[frame::HEADER_LEN..])
165                    .inspect_err(|e| {
166                        proto_err!(conn: "failed to load PING frame; err={:?}", e);
167                    })?
168                    .into(),
169                Kind::WindowUpdate => frame::WindowUpdate::load(head, &bytes[frame::HEADER_LEN..])
170                    .inspect_err(|e| {
171                        proto_err!(conn: "failed to load WINDOW_UPDATE frame; err={:?}", e);
172                    })?
173                    .into(),
174                Kind::Data => {
175                    let _ = bytes.split_to(frame::HEADER_LEN);
176
177                    frame::Data::load(head, bytes.freeze())
178                        // TODO: Should this always be connection level? Probably not...
179                        .inspect_err(|e| {
180                            proto_err!(conn: "failed to load DATA frame; err={:?}", e);
181                        })?
182                        .into()
183                }
184                Kind::Headers => {
185                    // Drop the frame header
186                    let _ = bytes.split_to(frame::HEADER_LEN);
187
188                    // Parse the header frame w/o parsing the payload
189                    let mut frame = match frame::Headers::load(head, &mut bytes) {
190                        Ok(res) => Ok(res),
191                        Err(frame::FrameError::InvalidDependencyId) => {
192                            proto_err!(stream: "invalid HEADERS dependency ID");
193                            // A stream cannot depend on itself. An endpoint MUST
194                            // treat this as a stream error (Section 5.4.2) of type `PROTOCOL_ERROR`.
195                            Err(frame::FrameError::InvalidDependencyId)
196                        }
197                        Err(e) => {
198                            proto_err!(conn: "failed to load frame; err={:?}", e);
199                            Err(e)
200                        }
201                    }?;
202
203                    if frame.is_end_headers() {
204                        // Load the HPACK encoded headers
205                        match frame.load_hpack(&mut bytes, &mut inner.decoder_hpack) {
206                            Ok(_) => {}
207                            Err(frame::FrameError::MalformedMessage) => {
208                                let id = head.stream_id();
209                                proto_err!(stream: "malformed header block; stream={:?}", id);
210                                return Err(frame::FrameError::MalformedMessage);
211                            }
212                            Err(e) => {
213                                proto_err!(conn: "failed HPACK decoding; err={:?}", e);
214                                return Err(e);
215                            }
216                        }
217                        frame.into()
218                    } else {
219                        log::trace!("loaded partial header block");
220                        // Defer returning the frame
221                        inner.partial = Some(Partial {
222                            frame,
223                            buf: bytes.split(),
224                            count: 0,
225                        });
226
227                        continue;
228                    }
229                }
230                Kind::Reset => frame::Reset::load(head, &bytes[frame::HEADER_LEN..])
231                    .inspect_err(|e| {
232                        proto_err!(conn: "failed to load RESET frame; err={:?}", e);
233                    })?
234                    .into(),
235                Kind::GoAway => frame::GoAway::load(&bytes[frame::HEADER_LEN..])
236                    .inspect_err(|e| {
237                        proto_err!(conn: "failed to load GO_AWAY frame; err={:?}", e);
238                    })?
239                    .into(),
240                Kind::Priority => {
241                    if head.stream_id() == 0 {
242                        // Invalid stream identifier
243                        proto_err!(conn: "invalid stream ID 0");
244                        return Err(frame::FrameError::InvalidStreamId);
245                    }
246
247                    match frame::Priority::load(head, &bytes[frame::HEADER_LEN..]) {
248                        Ok(frame) => frame.into(),
249                        Err(frame::FrameError::InvalidDependencyId) => {
250                            // A stream cannot depend on itself. An endpoint MUST
251                            // treat this as a stream error (Section 5.4.2) of type
252                            // `PROTOCOL_ERROR`.
253                            let id = head.stream_id();
254                            proto_err!(stream: "PRIORITY invalid dependency ID; stream={:?}", id);
255                            return Err(frame::FrameError::InvalidDependencyId);
256                        }
257                        Err(e) => {
258                            proto_err!(conn: "failed to load PRIORITY frame; err={:?};", e);
259                            return Err(e);
260                        }
261                    }
262                }
263                Kind::Continuation => {
264                    let mut partial = inner.partial.take().ok_or_else(|| {
265                        proto_err!(conn: "received unexpected CONTINUATION frame");
266                        frame::FrameError::Continuation(frame::FrameContinuationError::Unexpected)
267                    })?;
268
269                    // The stream identifiers must match
270                    if partial.frame.stream_id() != head.stream_id() {
271                        proto_err!(conn: "CONTINUATION frame stream ID does not match previous frame stream ID");
272                        return Err(frame::FrameError::Continuation(
273                            frame::FrameContinuationError::UnknownStreamId,
274                        ));
275                    }
276
277                    if inner.decoder_max_header_continuations > 0 {
278                        // Check count of continuation frames
279                        partial.count += 1;
280                        if partial.count > inner.decoder_max_header_continuations {
281                            proto_err!(conn: "received excessive amount of CONTINUATION frames");
282                            return Err(frame::FrameError::Continuation(
283                                frame::FrameContinuationError::MaxContinuations,
284                            ));
285                        }
286                    }
287
288                    // Extend the buf
289                    if partial.buf.is_empty() {
290                        partial.buf = bytes.split_off(frame::HEADER_LEN);
291                    } else {
292                        // If there was left over bytes previously, they may be
293                        // needed to continue decoding, even though we will
294                        // be ignoring this frame. This is done to keep the HPACK
295                        // decoder state up-to-date.
296                        //
297                        // Still, we need to be careful, because if a malicious
298                        // attacker were to try to send a gigantic string, such
299                        // that it fits over multiple header blocks.
300                        //
301                        // Instead, we use a simple heuristic to determine if
302                        // we should continue to ignore decoding, or to tell
303                        // the attacker to go away.
304                        if partial.buf.len() + bytes.len() > inner.decoder_max_header_list_size {
305                            proto_err!(conn: "CONTINUATION frame header block size over ignorable limit");
306                            return Err(frame::FrameError::Continuation(
307                                frame::FrameContinuationError::MaxLeftoverSize,
308                            ));
309                        }
310                        partial.buf.extend_from_slice(&bytes[frame::HEADER_LEN..]);
311                    }
312
313                    if (head.flag() & 0x4) == 0x4 {
314                        match partial
315                            .frame
316                            .load_hpack(&mut partial.buf, &mut inner.decoder_hpack)
317                        {
318                            Ok(_) => {}
319                            Err(frame::FrameError::MalformedMessage) => {
320                                let id = head.stream_id();
321                                proto_err!(stream: "malformed CONTINUATION frame; stream={:?}", id);
322                                return Err(frame::FrameContinuationError::Malformed.into());
323                            }
324                            Err(e) => {
325                                proto_err!(conn: "failed HPACK decoding; err={:?}", e);
326                                return Err(e);
327                            }
328                        }
329
330                        partial.frame.into()
331                    } else {
332                        inner.partial = Some(partial);
333                        continue;
334                    }
335                }
336                Kind::Unknown => {
337                    // Unknown frames are ignored
338                    continue;
339                }
340            };
341
342            return Ok(Some(frame));
343        }
344    }
345}
346
347impl Encoder for Codec {
348    type Item = Frame;
349    type Error = error::EncoderError;
350
351    fn encode(&self, item: Frame, buf: &mut BytesMut) -> Result<(), error::EncoderError> {
352        // Ensure that we have enough capacity to accept the write.
353        // log::debug!(frame = ?item, "send");
354
355        let mut inner = self.0.borrow_mut();
356
357        match item {
358            Frame::Data(v) => {
359                // Ensure that the payload is not greater than the max frame.
360                let len = v.payload().len();
361                if len > inner.encoder_max_frame_size as usize {
362                    return Err(error::EncoderError::MaxSizeExceeded);
363                }
364                v.encode(buf);
365
366                // Save off the last frame...
367                inner.encoder_last_data_frame = Some(v);
368            }
369            Frame::Headers(v) => {
370                let max_size = inner.encoder_max_frame_size as usize;
371                v.encode(&mut inner.encoder_hpack, buf, max_size);
372            }
373            Frame::Settings(v) => {
374                v.encode(buf);
375            }
376            Frame::GoAway(v) => {
377                v.encode(buf);
378            }
379            Frame::Ping(v) => {
380                v.encode(buf);
381            }
382            Frame::WindowUpdate(v) => {
383                v.encode(buf);
384            }
385
386            Frame::Priority(_) => (),
387            Frame::Reset(v) => {
388                v.encode(buf);
389            }
390        }
391
392        Ok(())
393    }
394}