1use std::{cell::RefCell, rc::Rc};
2
3use ntex_bytes::BytesMut;
4use ntex_codec::{Decoder, Encoder};
5
6mod error;
7mod length_delimited;
8
9pub use self::error::EncoderError;
10
11use self::length_delimited::LengthDelimitedCodec;
12use crate::{consts, frame, frame::Frame, frame::Kind, hpack};
13
14const PUSH_PROMISE: u8 = 5;
16
17#[derive(Clone, Debug)]
18pub struct Codec(Rc<RefCell<CodecInner>>);
19
20#[derive(Debug)]
22struct Partial {
23 frame: frame::Headers,
25 buf: BytesMut,
27 count: usize,
29}
30
31#[derive(Debug)]
32struct CodecInner {
33 encoder_hpack: hpack::Encoder,
35 encoder_last_data_frame: Option<frame::Data>,
36 encoder_max_frame_size: frame::FrameSize, decoder: LengthDelimitedCodec,
40 decoder_hpack: hpack::Decoder,
41 decoder_max_header_list_size: usize,
42 decoder_max_header_continuations: usize,
43 partial: Option<Partial>, }
45
46impl Default for Codec {
47 #[inline]
48 fn default() -> Self {
50 let decoder = self::length_delimited::Builder::new()
52 .length_field_length(3)
53 .length_adjustment(9)
54 .max_frame_length(frame::DEFAULT_MAX_FRAME_SIZE as usize)
55 .num_skip(0) .new_codec();
57
58 Codec(Rc::new(RefCell::new(CodecInner {
59 decoder,
60 decoder_hpack: hpack::Decoder::new(frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
61 decoder_max_header_list_size: consts::DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE as usize,
62 decoder_max_header_continuations: consts::DEFAULT_MAX_COUNTINUATIONS,
63 partial: None,
64
65 encoder_hpack: hpack::Encoder::default(),
66 encoder_last_data_frame: None,
67 encoder_max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE,
68 })))
69 }
70}
71
72impl Codec {
73 #[inline]
80 pub fn set_recv_frame_size(&self, val: usize) {
81 assert!(
82 frame::DEFAULT_MAX_FRAME_SIZE as usize <= val
83 && val <= frame::MAX_MAX_FRAME_SIZE as usize
84 );
85 self.0.borrow_mut().decoder.set_max_frame_length(val);
86 }
87
88 pub fn recv_frame_size(&self) -> u32 {
90 self.0.borrow_mut().decoder.max_frame_length() as u32
91 }
92
93 pub fn set_recv_header_list_size(&self, val: usize) {
97 self.0.borrow_mut().decoder_max_header_list_size = val;
98 }
99
100 pub fn set_max_header_continuations(&self, val: usize) {
104 self.0.borrow_mut().decoder_max_header_continuations = val;
105 }
106
107 pub fn set_send_frame_size(&self, val: usize) {
109 assert!(val <= frame::MAX_MAX_FRAME_SIZE as usize);
110 self.0.borrow_mut().encoder_max_frame_size = val as frame::FrameSize;
111 }
112
113 pub fn set_send_header_table_size(&self, val: usize) {
115 self.0.borrow_mut().encoder_hpack.update_max_size(val);
116 }
117
118 pub fn send_frame_size(&self) -> u32 {
120 self.0.borrow_mut().encoder_max_frame_size
121 }
122}
123
124impl Decoder for Codec {
125 type Item = Frame;
126 type Error = frame::FrameError;
127
128 fn decode(&self, src: &mut BytesMut) -> Result<Option<Frame>, frame::FrameError> {
132 let mut inner = self.0.borrow_mut();
133 loop {
134 let mut bytes = if let Some(bytes) = inner.decoder.decode(src)? {
135 bytes
136 } else {
137 return Ok(None);
138 };
139
140 if bytes[3] == PUSH_PROMISE {
142 return Err(frame::FrameError::UnexpectedPushPromise);
143 }
144
145 let head = frame::Head::parse(&bytes);
147 let kind = head.kind();
148
149 if inner.partial.is_some() && kind != Kind::Continuation {
150 proto_err!(conn: "expected CONTINUATION, got {:?}", kind);
151 return Err(frame::FrameError::Continuation(
152 frame::FrameContinuationError::Expected,
153 ));
154 }
155
156 log::trace!("decoding {:?} frame, frame buf len {}", kind, bytes.len());
157
158 let frame = match kind {
159 Kind::Settings => frame::Settings::load(head, &bytes[frame::HEADER_LEN..])
160 .map_err(|e| {
161 proto_err!(conn: "failed to load SETTINGS frame; err={:?}", e);
162 e
163 })?
164 .into(),
165 Kind::Ping => frame::Ping::load(head, &bytes[frame::HEADER_LEN..])
166 .map_err(|e| {
167 proto_err!(conn: "failed to load PING frame; err={:?}", e);
168 e
169 })?
170 .into(),
171 Kind::WindowUpdate => frame::WindowUpdate::load(head, &bytes[frame::HEADER_LEN..])
172 .map_err(|e| {
173 proto_err!(conn: "failed to load WINDOW_UPDATE frame; err={:?}", e);
174 e
175 })?
176 .into(),
177 Kind::Data => {
178 let _ = bytes.split_to(frame::HEADER_LEN);
179
180 frame::Data::load(head, bytes.freeze())
181 .map_err(|e| {
183 proto_err!(conn: "failed to load DATA frame; err={:?}", e);
184 e
185 })?
186 .into()
187 }
188 Kind::Headers => {
189 let _ = bytes.split_to(frame::HEADER_LEN);
191
192 let mut frame = match frame::Headers::load(head, &mut bytes) {
194 Ok(res) => Ok(res),
195 Err(frame::FrameError::InvalidDependencyId) => {
196 proto_err!(stream: "invalid HEADERS dependency ID");
197 Err(frame::FrameError::InvalidDependencyId)
200 }
201 Err(e) => {
202 proto_err!(conn: "failed to load frame; err={:?}", e);
203 Err(e)
204 }
205 }?;
206
207 if frame.is_end_headers() {
208 match frame.load_hpack(&mut bytes, &mut inner.decoder_hpack) {
210 Ok(_) => {}
211 Err(frame::FrameError::MalformedMessage) => {
212 let id = head.stream_id();
213 proto_err!(stream: "malformed header block; stream={:?}", id);
214 return Err(frame::FrameError::MalformedMessage);
215 }
216 Err(e) => {
217 proto_err!(conn: "failed HPACK decoding; err={:?}", e);
218 return Err(e);
219 }
220 }
221 frame.into()
222 } else {
223 log::trace!("loaded partial header block");
224 inner.partial = Some(Partial {
226 frame,
227 buf: bytes.split(),
228 count: 0,
229 });
230
231 continue;
232 }
233 }
234 Kind::Reset => frame::Reset::load(head, &bytes[frame::HEADER_LEN..])
235 .map_err(|e| {
236 proto_err!(conn: "failed to load RESET frame; err={:?}", e);
237 e
238 })?
239 .into(),
240 Kind::GoAway => frame::GoAway::load(&bytes[frame::HEADER_LEN..])
241 .map_err(|e| {
242 proto_err!(conn: "failed to load GO_AWAY frame; err={:?}", e);
243 e
244 })?
245 .into(),
246 Kind::Priority => {
247 if head.stream_id() == 0 {
248 proto_err!(conn: "invalid stream ID 0");
250 return Err(frame::FrameError::InvalidStreamId);
251 }
252
253 match frame::Priority::load(head, &bytes[frame::HEADER_LEN..]) {
254 Ok(frame) => frame.into(),
255 Err(frame::FrameError::InvalidDependencyId) => {
256 let id = head.stream_id();
260 proto_err!(stream: "PRIORITY invalid dependency ID; stream={:?}", id);
261 return Err(frame::FrameError::InvalidDependencyId);
262 }
263 Err(e) => {
264 proto_err!(conn: "failed to load PRIORITY frame; err={:?};", e);
265 return Err(e);
266 }
267 }
268 }
269 Kind::Continuation => {
270 let mut partial = inner.partial.take().ok_or_else(|| {
271 proto_err!(conn: "received unexpected CONTINUATION frame");
272 frame::FrameError::Continuation(frame::FrameContinuationError::Unexpected)
273 })?;
274
275 if partial.frame.stream_id() != head.stream_id() {
277 proto_err!(conn: "CONTINUATION frame stream ID does not match previous frame stream ID");
278 return Err(frame::FrameError::Continuation(
279 frame::FrameContinuationError::UnknownStreamId,
280 ));
281 }
282
283 if inner.decoder_max_header_continuations > 0 {
284 partial.count += 1;
286 if partial.count > inner.decoder_max_header_continuations {
287 proto_err!(conn: "received excessive amount of CONTINUATION frames");
288 return Err(frame::FrameError::Continuation(
289 frame::FrameContinuationError::MaxContinuations,
290 ));
291 }
292 }
293
294 if partial.buf.is_empty() {
296 partial.buf = bytes.split_off(frame::HEADER_LEN);
297 } else {
298 if partial.buf.len() + bytes.len() > inner.decoder_max_header_list_size {
311 proto_err!(conn: "CONTINUATION frame header block size over ignorable limit");
312 return Err(frame::FrameError::Continuation(
313 frame::FrameContinuationError::MaxLeftoverSize,
314 ));
315 }
316 partial.buf.extend_from_slice(&bytes[frame::HEADER_LEN..]);
317 }
318
319 if (head.flag() & 0x4) == 0x4 {
320 match partial
321 .frame
322 .load_hpack(&mut partial.buf, &mut inner.decoder_hpack)
323 {
324 Ok(_) => {}
325 Err(frame::FrameError::MalformedMessage) => {
326 let id = head.stream_id();
327 proto_err!(stream: "malformed CONTINUATION frame; stream={:?}", id);
328 return Err(frame::FrameContinuationError::Malformed.into());
329 }
330 Err(e) => {
331 proto_err!(conn: "failed HPACK decoding; err={:?}", e);
332 return Err(e);
333 }
334 }
335
336 partial.frame.into()
337 } else {
338 inner.partial = Some(partial);
339 continue;
340 }
341 }
342 Kind::Unknown => {
343 continue;
345 }
346 };
347
348 return Ok(Some(frame));
349 }
350 }
351}
352
353impl Encoder for Codec {
354 type Item = Frame;
355 type Error = error::EncoderError;
356
357 fn encode(&self, item: Frame, buf: &mut BytesMut) -> Result<(), error::EncoderError> {
358 let mut inner = self.0.borrow_mut();
362
363 match item {
364 Frame::Data(v) => {
365 let len = v.payload().len();
367 if len > inner.encoder_max_frame_size as usize {
368 return Err(error::EncoderError::MaxSizeExceeded);
369 }
370 v.encode(buf);
371
372 inner.encoder_last_data_frame = Some(v);
374 }
375 Frame::Headers(v) => {
376 let max_size = inner.encoder_max_frame_size as usize;
377 v.encode(&mut inner.encoder_hpack, buf, max_size);
378 }
379 Frame::Settings(v) => {
380 v.encode(buf);
381 }
382 Frame::GoAway(v) => {
383 v.encode(buf);
384 }
385 Frame::Ping(v) => {
386 v.encode(buf);
387 }
388 Frame::WindowUpdate(v) => {
389 v.encode(buf);
390 }
391
392 Frame::Priority(_) => (),
393 Frame::Reset(v) => {
394 v.encode(buf);
395 }
396 }
397
398 Ok(())
399 }
400}