Skip to main content

ntex_h2/codec/
mod.rs

1use std::{cell::RefCell, rc::Rc};
2
3use ntex_bytes::{BytePages, Bytes, BytesMut};
4use ntex_codec::{Decoder, Encoder};
5
6mod error;
7mod length_delimited;
8
9pub use self::error::EncoderError;
10
11use self::length_delimited::LengthDelimitedCodec;
12use crate::{consts, frame, frame::Frame, frame::Kind, hpack};
13
14// Push promise frame kind
15const PUSH_PROMISE: u8 = 5;
16
17#[derive(Clone, Debug)]
18pub struct Codec(Rc<RefCell<CodecInner>>);
19
20/// Partially loaded headers frame
21#[derive(Debug)]
22struct Partial {
23    /// Empty frame
24    frame: frame::Headers,
25    /// Partial header payload
26    buf: Bytes,
27    /// Number of continuations
28    count: usize,
29}
30
31#[derive(Debug)]
32struct CodecInner {
33    // encoder state
34    encoder_hpack: hpack::Encoder,
35    encoder_max_frame_size: frame::FrameSize, // Max frame size, this is specified by the peer
36
37    // decoder state
38    decoder: LengthDelimitedCodec,
39    decoder_hpack: hpack::Decoder,
40    decoder_max_header_list_size: usize,
41    decoder_max_header_continuations: usize,
42    partial: Option<Partial>, // Partially loaded headers frame
43}
44
45impl Default for Codec {
46    #[inline]
47    /// Returns a new `Codec` with the default max frame size
48    fn default() -> Self {
49        // Delimit the frames
50        let decoder = self::length_delimited::Builder::new()
51            .length_field_length(3)
52            .length_adjustment(9)
53            .max_frame_length(frame::DEFAULT_MAX_FRAME_SIZE as usize)
54            .num_skip(0) // Don't skip the header
55            .new_codec();
56
57        Codec(Rc::new(RefCell::new(CodecInner {
58            decoder,
59            decoder_hpack: hpack::Decoder::new(frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
60            decoder_max_header_list_size: consts::DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE as usize,
61            decoder_max_header_continuations: consts::DEFAULT_MAX_COUNTINUATIONS,
62            partial: None,
63
64            encoder_hpack: hpack::Encoder::default(),
65            encoder_max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE,
66        })))
67    }
68}
69
70impl Codec {
71    /// Updates the max received frame size.
72    ///
73    /// The change takes effect the next time a frame is decoded. In other
74    /// words, if a frame is currently in process of being decoded with a frame
75    /// size greater than `val` but less than the max frame size in effect
76    /// before calling this function, then the frame will be allowed.
77    ///
78    /// # Panics
79    ///
80    /// Panics if size is greater than `16_777_215`.
81    #[inline]
82    pub fn set_recv_frame_size(&self, val: usize) {
83        assert!(
84            frame::DEFAULT_MAX_FRAME_SIZE as usize <= val
85                && val <= frame::MAX_MAX_FRAME_SIZE as usize
86        );
87        self.0.borrow_mut().decoder.set_max_frame_length(val);
88    }
89
90    /// Local max frame size.
91    pub fn recv_frame_size(&self) -> u32 {
92        self.0.borrow_mut().decoder.max_frame_length() as u32
93    }
94
95    /// Set the max header list size that can be received.
96    ///
97    /// By default value is set to 48kb
98    pub fn set_recv_header_list_size(&self, val: usize) {
99        self.0.borrow_mut().decoder_max_header_list_size = val;
100    }
101
102    /// Set the max header continuation frames.
103    ///
104    /// By default value is set to 5
105    pub fn set_max_header_continuations(&self, val: usize) {
106        self.0.borrow_mut().decoder_max_header_continuations = val;
107    }
108
109    /// Set the peer's max frame size.
110    ///
111    /// # Panics
112    ///
113    /// Panics if size is greater than `16_777_215`.
114    pub fn set_send_frame_size(&self, val: usize) {
115        assert!(val <= frame::MAX_MAX_FRAME_SIZE as usize);
116        self.0.borrow_mut().encoder_max_frame_size = val as frame::FrameSize;
117    }
118
119    /// Set the peer's header table size size.
120    pub fn set_send_header_table_size(&self, val: usize) {
121        self.0.borrow_mut().encoder_hpack.update_max_size(val);
122    }
123
124    /// Remote max frame size.
125    pub fn send_frame_size(&self) -> u32 {
126        self.0.borrow_mut().encoder_max_frame_size
127    }
128}
129
130impl Decoder for Codec {
131    type Item = Frame;
132    type Error = frame::FrameError;
133
134    #[allow(clippy::too_many_lines)]
135    /// Decodes a frame.
136    ///
137    /// This method is intentionally de-generified and outlined because it is very large.
138    fn decode(&self, src: &mut BytesMut) -> Result<Option<Frame>, frame::FrameError> {
139        let mut inner = self.0.borrow_mut();
140        loop {
141            let Some(mut bytes) = inner.decoder.decode(src)? else {
142                return Ok(None);
143            };
144
145            // check push promise, we do not support push
146            if bytes[3] == PUSH_PROMISE {
147                return Err(frame::FrameError::UnexpectedPushPromise);
148            }
149
150            // Parse the head
151            let head = frame::Head::parse(&bytes);
152            let kind = head.kind();
153
154            if inner.partial.is_some() && kind != Kind::Continuation {
155                proto_err!(conn: "expected CONTINUATION, got {:?}", kind);
156                return Err(frame::FrameError::Continuation(
157                    frame::FrameContinuationError::Expected,
158                ));
159            }
160
161            log::trace!("decoding {:?} frame, frame buf len {}", kind, bytes.len());
162
163            let frame = match kind {
164                Kind::Settings => frame::Settings::load(head, &bytes[frame::HEADER_LEN..])
165                    .inspect_err(|e| {
166                        proto_err!(conn: "failed to load SETTINGS frame; err={:?}", e);
167                    })?
168                    .into(),
169                Kind::Ping => frame::Ping::load(head, &bytes[frame::HEADER_LEN..])
170                    .inspect_err(|e| {
171                        proto_err!(conn: "failed to load PING frame; err={:?}", e);
172                    })?
173                    .into(),
174                Kind::WindowUpdate => frame::WindowUpdate::load(head, &bytes[frame::HEADER_LEN..])
175                    .inspect_err(|e| {
176                        proto_err!(conn: "failed to load WINDOW_UPDATE frame; err={:?}", e);
177                    })?
178                    .into(),
179                Kind::Data => {
180                    bytes.advance_to(frame::HEADER_LEN);
181
182                    frame::Data::load(head, bytes)
183                        // TODO: Should this always be connection level? Probably not...
184                        .inspect_err(|e| {
185                            proto_err!(conn: "failed to load DATA frame; err={:?}", e);
186                        })?
187                        .into()
188                }
189                Kind::Headers => {
190                    // Drop the frame header
191                    bytes.advance_to(frame::HEADER_LEN);
192
193                    // Parse the header frame w/o parsing the payload
194                    let mut frame = match frame::Headers::load(head, &mut bytes) {
195                        Ok(res) => Ok(res),
196                        Err(frame::FrameError::InvalidDependencyId) => {
197                            proto_err!(stream: "invalid HEADERS dependency ID");
198                            // A stream cannot depend on itself. An endpoint MUST
199                            // treat this as a stream error (Section 5.4.2) of type `PROTOCOL_ERROR`.
200                            Err(frame::FrameError::InvalidDependencyId)
201                        }
202                        Err(e) => {
203                            proto_err!(conn: "failed to load frame; err={:?}", e);
204                            Err(e)
205                        }
206                    }?;
207
208                    if frame.is_end_headers() {
209                        // Load the HPACK encoded headers
210                        match frame.load_hpack(&mut bytes, &mut inner.decoder_hpack) {
211                            Ok(()) => {}
212                            Err(frame::FrameError::MalformedMessage) => {
213                                let id = head.stream_id();
214                                proto_err!(stream: "malformed header block; stream={:?}", id);
215                                return Err(frame::FrameError::MalformedMessage);
216                            }
217                            Err(e) => {
218                                proto_err!(conn: "failed HPACK decoding; err={:?}", e);
219                                return Err(e);
220                            }
221                        }
222                        frame.into()
223                    } else {
224                        log::trace!("loaded partial header block");
225                        // Defer returning the frame
226                        inner.partial = Some(Partial {
227                            frame,
228                            buf: bytes,
229                            count: 0,
230                        });
231
232                        continue;
233                    }
234                }
235                Kind::Reset => frame::Reset::load(head, &bytes[frame::HEADER_LEN..])
236                    .inspect_err(|e| {
237                        proto_err!(conn: "failed to load RESET frame; err={:?}", e);
238                    })?
239                    .into(),
240                Kind::GoAway => frame::GoAway::load(&bytes[frame::HEADER_LEN..])
241                    .inspect_err(|e| {
242                        proto_err!(conn: "failed to load GO_AWAY frame; err={:?}", e);
243                    })?
244                    .into(),
245                Kind::Priority => {
246                    if head.stream_id() == 0 {
247                        // Invalid stream identifier
248                        proto_err!(conn: "invalid stream ID 0");
249                        return Err(frame::FrameError::InvalidStreamId);
250                    }
251
252                    match frame::Priority::load(head, &bytes[frame::HEADER_LEN..]) {
253                        Ok(frame) => frame.into(),
254                        Err(frame::FrameError::InvalidDependencyId) => {
255                            // A stream cannot depend on itself. An endpoint MUST
256                            // treat this as a stream error (Section 5.4.2) of type
257                            // `PROTOCOL_ERROR`.
258                            let id = head.stream_id();
259                            proto_err!(stream: "PRIORITY invalid dependency ID; stream={:?}", id);
260                            return Err(frame::FrameError::InvalidDependencyId);
261                        }
262                        Err(e) => {
263                            proto_err!(conn: "failed to load PRIORITY frame; err={:?};", e);
264                            return Err(e);
265                        }
266                    }
267                }
268                Kind::Continuation => {
269                    let mut partial = inner.partial.take().ok_or_else(|| {
270                        proto_err!(conn: "received unexpected CONTINUATION frame");
271                        frame::FrameError::Continuation(frame::FrameContinuationError::Unexpected)
272                    })?;
273
274                    // The stream identifiers must match
275                    if partial.frame.stream_id() != head.stream_id() {
276                        proto_err!(conn: "CONTINUATION frame stream ID does not match previous frame stream ID");
277                        return Err(frame::FrameError::Continuation(
278                            frame::FrameContinuationError::UnknownStreamId,
279                        ));
280                    }
281
282                    if inner.decoder_max_header_continuations > 0 {
283                        // Check count of continuation frames
284                        partial.count += 1;
285                        if partial.count > inner.decoder_max_header_continuations {
286                            proto_err!(conn: "received excessive amount of CONTINUATION frames");
287                            return Err(frame::FrameError::Continuation(
288                                frame::FrameContinuationError::MaxContinuations,
289                            ));
290                        }
291                    }
292
293                    // Extend the buf
294                    if partial.buf.is_empty() {
295                        partial.buf = bytes.split_off(frame::HEADER_LEN);
296                    } else {
297                        // If there was left over bytes previously, they may be
298                        // needed to continue decoding, even though we will
299                        // be ignoring this frame. This is done to keep the HPACK
300                        // decoder state up-to-date.
301                        //
302                        // Still, we need to be careful, because if a malicious
303                        // attacker were to try to send a gigantic string, such
304                        // that it fits over multiple header blocks.
305                        //
306                        // Instead, we use a simple heuristic to determine if
307                        // we should continue to ignore decoding, or to tell
308                        // the attacker to go away.
309                        if partial.buf.len() + bytes.len() > inner.decoder_max_header_list_size {
310                            proto_err!(conn: "CONTINUATION frame header block size over ignorable limit");
311                            return Err(frame::FrameError::Continuation(
312                                frame::FrameContinuationError::MaxLeftoverSize,
313                            ));
314                        }
315                        let mut buf = BytesMut::with_capacity(
316                            partial.buf.len() + bytes.len() - frame::HEADER_LEN,
317                        );
318                        buf.extend_from_slice(&partial.buf);
319                        buf.extend_from_slice(&bytes[frame::HEADER_LEN..]);
320                        partial.buf = buf.into();
321                    }
322
323                    if (head.flag() & 0x4) == 0x4 {
324                        match partial
325                            .frame
326                            .load_hpack(&mut partial.buf, &mut inner.decoder_hpack)
327                        {
328                            Ok(()) => {}
329                            Err(frame::FrameError::MalformedMessage) => {
330                                let id = head.stream_id();
331                                proto_err!(stream: "malformed CONTINUATION frame; stream={:?}", id);
332                                return Err(frame::FrameContinuationError::Malformed.into());
333                            }
334                            Err(e) => {
335                                proto_err!(conn: "failed HPACK decoding; err={:?}", e);
336                                return Err(e);
337                            }
338                        }
339
340                        partial.frame.into()
341                    } else {
342                        inner.partial = Some(partial);
343                        continue;
344                    }
345                }
346                Kind::Unknown => {
347                    // Unknown frames are ignored
348                    continue;
349                }
350            };
351
352            return Ok(Some(frame));
353        }
354    }
355}
356
357impl Encoder for Codec {
358    type Item = Frame;
359    type Error = error::EncoderError;
360
361    fn encodev(&self, item: Frame, buf: &mut BytePages) -> Result<(), error::EncoderError> {
362        // Ensure that we have enough capacity to accept the write.
363        // log::debug!(frame = ?item, "send");
364
365        let mut inner = self.0.borrow_mut();
366
367        match item {
368            Frame::Data(v) => {
369                // Ensure that the payload is not greater than the max frame.
370                let len = v.payload().len();
371                if len > inner.encoder_max_frame_size as usize {
372                    return Err(error::EncoderError::MaxSizeExceeded);
373                }
374                v.encode(buf);
375            }
376            Frame::Headers(v) => {
377                let max_size = inner.encoder_max_frame_size as usize;
378                v.encode(&mut inner.encoder_hpack, buf, max_size);
379            }
380            Frame::Settings(v) => {
381                v.encode(buf);
382            }
383            Frame::GoAway(v) => {
384                v.encode(buf);
385            }
386            Frame::Ping(v) => {
387                v.encode(buf);
388            }
389            Frame::WindowUpdate(v) => {
390                v.encode(buf);
391            }
392
393            Frame::Priority(_) => (),
394            Frame::Reset(v) => {
395                v.encode(buf);
396            }
397        }
398
399        Ok(())
400    }
401}