1use std::{cell::RefCell, rc::Rc};
2
3use ntex_bytes::BytesMut;
4use ntex_codec::{Decoder, Encoder};
5
6mod error;
7mod length_delimited;
8
9pub use self::error::EncoderError;
10
11use self::length_delimited::LengthDelimitedCodec;
12use crate::{consts, frame, frame::Frame, frame::Kind, hpack};
13
14const PUSH_PROMISE: u8 = 5;
16
17#[derive(Clone, Debug)]
18pub struct Codec(Rc<RefCell<CodecInner>>);
19
20#[derive(Debug)]
22struct Partial {
23 frame: frame::Headers,
25 buf: BytesMut,
27 count: usize,
29}
30
31#[derive(Debug)]
32struct CodecInner {
33 encoder_hpack: hpack::Encoder,
35 encoder_last_data_frame: Option<frame::Data>,
36 encoder_max_frame_size: frame::FrameSize, decoder: LengthDelimitedCodec,
40 decoder_hpack: hpack::Decoder,
41 decoder_max_header_list_size: usize,
42 decoder_max_header_continuations: usize,
43 partial: Option<Partial>, }
45
46impl Default for Codec {
47 #[inline]
48 fn default() -> Self {
50 let decoder = self::length_delimited::Builder::new()
52 .length_field_length(3)
53 .length_adjustment(9)
54 .max_frame_length(frame::DEFAULT_MAX_FRAME_SIZE as usize)
55 .num_skip(0) .new_codec();
57
58 Codec(Rc::new(RefCell::new(CodecInner {
59 decoder,
60 decoder_hpack: hpack::Decoder::new(frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
61 decoder_max_header_list_size: consts::DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE as usize,
62 decoder_max_header_continuations: consts::DEFAULT_MAX_COUNTINUATIONS,
63 partial: None,
64
65 encoder_hpack: hpack::Encoder::default(),
66 encoder_last_data_frame: None,
67 encoder_max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE,
68 })))
69 }
70}
71
72impl Codec {
73 #[inline]
80 pub fn set_recv_frame_size(&self, val: usize) {
81 assert!(
82 frame::DEFAULT_MAX_FRAME_SIZE as usize <= val
83 && val <= frame::MAX_MAX_FRAME_SIZE as usize
84 );
85 self.0.borrow_mut().decoder.set_max_frame_length(val);
86 }
87
88 pub fn recv_frame_size(&self) -> u32 {
90 self.0.borrow_mut().decoder.max_frame_length() as u32
91 }
92
93 pub fn set_recv_header_list_size(&self, val: usize) {
97 self.0.borrow_mut().decoder_max_header_list_size = val;
98 }
99
100 pub fn set_max_header_continuations(&self, val: usize) {
104 self.0.borrow_mut().decoder_max_header_continuations = val;
105 }
106
107 pub fn set_send_frame_size(&self, val: usize) {
109 assert!(val <= frame::MAX_MAX_FRAME_SIZE as usize);
110 self.0.borrow_mut().encoder_max_frame_size = val as frame::FrameSize;
111 }
112
113 pub fn set_send_header_table_size(&self, val: usize) {
115 self.0.borrow_mut().encoder_hpack.update_max_size(val);
116 }
117
118 pub fn send_frame_size(&self) -> u32 {
120 self.0.borrow_mut().encoder_max_frame_size
121 }
122}
123
124impl Decoder for Codec {
125 type Item = Frame;
126 type Error = frame::FrameError;
127
128 fn decode(&self, src: &mut BytesMut) -> Result<Option<Frame>, frame::FrameError> {
132 let mut inner = self.0.borrow_mut();
133 loop {
134 let mut bytes = if let Some(bytes) = inner.decoder.decode(src)? {
135 bytes
136 } else {
137 return Ok(None);
138 };
139
140 if bytes[3] == PUSH_PROMISE {
142 return Err(frame::FrameError::UnexpectedPushPromise);
143 }
144
145 let head = frame::Head::parse(&bytes);
147 let kind = head.kind();
148
149 if inner.partial.is_some() && kind != Kind::Continuation {
150 proto_err!(conn: "expected CONTINUATION, got {:?}", kind);
151 return Err(frame::FrameError::Continuation(
152 frame::FrameContinuationError::Expected,
153 ));
154 }
155
156 log::trace!("decoding {:?} frame, frame buf len {}", kind, bytes.len());
157
158 let frame = match kind {
159 Kind::Settings => frame::Settings::load(head, &bytes[frame::HEADER_LEN..])
160 .inspect_err(|e| {
161 proto_err!(conn: "failed to load SETTINGS frame; err={:?}", e);
162 })?
163 .into(),
164 Kind::Ping => frame::Ping::load(head, &bytes[frame::HEADER_LEN..])
165 .inspect_err(|e| {
166 proto_err!(conn: "failed to load PING frame; err={:?}", e);
167 })?
168 .into(),
169 Kind::WindowUpdate => frame::WindowUpdate::load(head, &bytes[frame::HEADER_LEN..])
170 .inspect_err(|e| {
171 proto_err!(conn: "failed to load WINDOW_UPDATE frame; err={:?}", e);
172 })?
173 .into(),
174 Kind::Data => {
175 let _ = bytes.split_to(frame::HEADER_LEN);
176
177 frame::Data::load(head, bytes.freeze())
178 .inspect_err(|e| {
180 proto_err!(conn: "failed to load DATA frame; err={:?}", e);
181 })?
182 .into()
183 }
184 Kind::Headers => {
185 let _ = bytes.split_to(frame::HEADER_LEN);
187
188 let mut frame = match frame::Headers::load(head, &mut bytes) {
190 Ok(res) => Ok(res),
191 Err(frame::FrameError::InvalidDependencyId) => {
192 proto_err!(stream: "invalid HEADERS dependency ID");
193 Err(frame::FrameError::InvalidDependencyId)
196 }
197 Err(e) => {
198 proto_err!(conn: "failed to load frame; err={:?}", e);
199 Err(e)
200 }
201 }?;
202
203 if frame.is_end_headers() {
204 match frame.load_hpack(&mut bytes, &mut inner.decoder_hpack) {
206 Ok(_) => {}
207 Err(frame::FrameError::MalformedMessage) => {
208 let id = head.stream_id();
209 proto_err!(stream: "malformed header block; stream={:?}", id);
210 return Err(frame::FrameError::MalformedMessage);
211 }
212 Err(e) => {
213 proto_err!(conn: "failed HPACK decoding; err={:?}", e);
214 return Err(e);
215 }
216 }
217 frame.into()
218 } else {
219 log::trace!("loaded partial header block");
220 inner.partial = Some(Partial {
222 frame,
223 buf: bytes.split(),
224 count: 0,
225 });
226
227 continue;
228 }
229 }
230 Kind::Reset => frame::Reset::load(head, &bytes[frame::HEADER_LEN..])
231 .inspect_err(|e| {
232 proto_err!(conn: "failed to load RESET frame; err={:?}", e);
233 })?
234 .into(),
235 Kind::GoAway => frame::GoAway::load(&bytes[frame::HEADER_LEN..])
236 .inspect_err(|e| {
237 proto_err!(conn: "failed to load GO_AWAY frame; err={:?}", e);
238 })?
239 .into(),
240 Kind::Priority => {
241 if head.stream_id() == 0 {
242 proto_err!(conn: "invalid stream ID 0");
244 return Err(frame::FrameError::InvalidStreamId);
245 }
246
247 match frame::Priority::load(head, &bytes[frame::HEADER_LEN..]) {
248 Ok(frame) => frame.into(),
249 Err(frame::FrameError::InvalidDependencyId) => {
250 let id = head.stream_id();
254 proto_err!(stream: "PRIORITY invalid dependency ID; stream={:?}", id);
255 return Err(frame::FrameError::InvalidDependencyId);
256 }
257 Err(e) => {
258 proto_err!(conn: "failed to load PRIORITY frame; err={:?};", e);
259 return Err(e);
260 }
261 }
262 }
263 Kind::Continuation => {
264 let mut partial = inner.partial.take().ok_or_else(|| {
265 proto_err!(conn: "received unexpected CONTINUATION frame");
266 frame::FrameError::Continuation(frame::FrameContinuationError::Unexpected)
267 })?;
268
269 if partial.frame.stream_id() != head.stream_id() {
271 proto_err!(conn: "CONTINUATION frame stream ID does not match previous frame stream ID");
272 return Err(frame::FrameError::Continuation(
273 frame::FrameContinuationError::UnknownStreamId,
274 ));
275 }
276
277 if inner.decoder_max_header_continuations > 0 {
278 partial.count += 1;
280 if partial.count > inner.decoder_max_header_continuations {
281 proto_err!(conn: "received excessive amount of CONTINUATION frames");
282 return Err(frame::FrameError::Continuation(
283 frame::FrameContinuationError::MaxContinuations,
284 ));
285 }
286 }
287
288 if partial.buf.is_empty() {
290 partial.buf = bytes.split_off(frame::HEADER_LEN);
291 } else {
292 if partial.buf.len() + bytes.len() > inner.decoder_max_header_list_size {
305 proto_err!(conn: "CONTINUATION frame header block size over ignorable limit");
306 return Err(frame::FrameError::Continuation(
307 frame::FrameContinuationError::MaxLeftoverSize,
308 ));
309 }
310 partial.buf.extend_from_slice(&bytes[frame::HEADER_LEN..]);
311 }
312
313 if (head.flag() & 0x4) == 0x4 {
314 match partial
315 .frame
316 .load_hpack(&mut partial.buf, &mut inner.decoder_hpack)
317 {
318 Ok(_) => {}
319 Err(frame::FrameError::MalformedMessage) => {
320 let id = head.stream_id();
321 proto_err!(stream: "malformed CONTINUATION frame; stream={:?}", id);
322 return Err(frame::FrameContinuationError::Malformed.into());
323 }
324 Err(e) => {
325 proto_err!(conn: "failed HPACK decoding; err={:?}", e);
326 return Err(e);
327 }
328 }
329
330 partial.frame.into()
331 } else {
332 inner.partial = Some(partial);
333 continue;
334 }
335 }
336 Kind::Unknown => {
337 continue;
339 }
340 };
341
342 return Ok(Some(frame));
343 }
344 }
345}
346
347impl Encoder for Codec {
348 type Item = Frame;
349 type Error = error::EncoderError;
350
351 fn encode(&self, item: Frame, buf: &mut BytesMut) -> Result<(), error::EncoderError> {
352 let mut inner = self.0.borrow_mut();
356
357 match item {
358 Frame::Data(v) => {
359 let len = v.payload().len();
361 if len > inner.encoder_max_frame_size as usize {
362 return Err(error::EncoderError::MaxSizeExceeded);
363 }
364 v.encode(buf);
365
366 inner.encoder_last_data_frame = Some(v);
368 }
369 Frame::Headers(v) => {
370 let max_size = inner.encoder_max_frame_size as usize;
371 v.encode(&mut inner.encoder_hpack, buf, max_size);
372 }
373 Frame::Settings(v) => {
374 v.encode(buf);
375 }
376 Frame::GoAway(v) => {
377 v.encode(buf);
378 }
379 Frame::Ping(v) => {
380 v.encode(buf);
381 }
382 Frame::WindowUpdate(v) => {
383 v.encode(buf);
384 }
385
386 Frame::Priority(_) => (),
387 Frame::Reset(v) => {
388 v.encode(buf);
389 }
390 }
391
392 Ok(())
393 }
394}