1use crate::buffers::BufFactory;
28
29use super::Error;
30use super::Result;
31
32use super::frame;
33
34pub const HTTP3_CONTROL_STREAM_TYPE_ID: u64 = 0x0;
35pub const HTTP3_PUSH_STREAM_TYPE_ID: u64 = 0x1;
36pub const QPACK_ENCODER_STREAM_TYPE_ID: u64 = 0x2;
37pub const QPACK_DECODER_STREAM_TYPE_ID: u64 = 0x3;
38
39const MAX_STATE_BUF_SIZE: usize = (1 << 24) - 1;
40
41#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42pub enum Type {
43 Control,
44 Request,
45 Push,
46 QpackEncoder,
47 QpackDecoder,
48 Unknown,
49}
50
51impl Type {
52 #[cfg(feature = "qlog")]
53 pub fn to_qlog(self) -> qlog::events::http3::StreamType {
54 match self {
55 Type::Control => qlog::events::http3::StreamType::Control,
56 Type::Request => qlog::events::http3::StreamType::Request,
57 Type::Push => qlog::events::http3::StreamType::Push,
58 Type::QpackEncoder => qlog::events::http3::StreamType::QpackEncode,
59 Type::QpackDecoder => qlog::events::http3::StreamType::QpackDecode,
60 Type::Unknown => qlog::events::http3::StreamType::Unknown,
61 }
62 }
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq)]
66pub enum State {
67 StreamType,
69
70 FrameType,
72
73 FramePayloadLen,
75
76 FramePayload,
78
79 Data,
81
82 PushId,
84
85 QpackInstruction,
87
88 Drain,
90
91 Finished,
93}
94
95impl Type {
96 pub fn deserialize(v: u64) -> Result<Type> {
97 match v {
98 HTTP3_CONTROL_STREAM_TYPE_ID => Ok(Type::Control),
99 HTTP3_PUSH_STREAM_TYPE_ID => Ok(Type::Push),
100 QPACK_ENCODER_STREAM_TYPE_ID => Ok(Type::QpackEncoder),
101 QPACK_DECODER_STREAM_TYPE_ID => Ok(Type::QpackDecoder),
102
103 _ => Ok(Type::Unknown),
104 }
105 }
106}
107
108#[derive(Debug)]
123pub struct Stream {
124 id: u64,
126
127 ty: Option<Type>,
129
130 state: State,
132
133 state_buf: Vec<u8>,
135
136 state_len: usize,
138
139 state_off: usize,
143
144 frame_type: Option<u64>,
146
147 is_local: bool,
149
150 remote_initialized: bool,
152
153 local_initialized: bool,
155
156 data_event_triggered: bool,
158
159 last_priority_update: Option<Vec<u8>>,
161
162 headers_received_count: usize,
164
165 data_received: bool,
167
168 trailers_sent: bool,
170
171 trailers_received: bool,
173}
174
175impl Stream {
176 pub fn new(id: u64, is_local: bool) -> Stream {
181 let (ty, state) = if crate::stream::is_bidi(id) {
182 (Some(Type::Request), State::FrameType)
185 } else {
186 (None, State::StreamType)
188 };
189
190 Stream {
191 id,
192 ty,
193
194 state,
195
196 state_buf: vec![0; 16],
198
199 state_len: 1,
202 state_off: 0,
203
204 frame_type: None,
205
206 is_local,
207 remote_initialized: false,
208 local_initialized: false,
209
210 data_event_triggered: false,
211
212 last_priority_update: None,
213
214 headers_received_count: 0,
215
216 data_received: false,
217
218 trailers_sent: false,
219 trailers_received: false,
220 }
221 }
222
223 pub fn ty(&self) -> Option<Type> {
224 self.ty
225 }
226
227 pub fn state(&self) -> State {
228 self.state
229 }
230
231 pub fn set_ty(&mut self, ty: Type) -> Result<()> {
233 assert_eq!(self.state, State::StreamType);
234
235 self.ty = Some(ty);
236
237 let state = match ty {
238 Type::Control | Type::Request => State::FrameType,
239
240 Type::Push => State::PushId,
241
242 Type::QpackEncoder | Type::QpackDecoder => {
243 self.remote_initialized = true;
244
245 State::QpackInstruction
246 },
247
248 Type::Unknown => State::Drain,
249 };
250
251 self.state_transition(state, 1, true)?;
252
253 Ok(())
254 }
255
256 pub fn set_push_id(&mut self, _id: u64) -> Result<()> {
258 assert_eq!(self.state, State::PushId);
259
260 self.state_transition(State::FrameType, 1, true)?;
263
264 Ok(())
265 }
266
267 pub fn set_frame_type(&mut self, ty: u64) -> Result<()> {
269 assert_eq!(self.state, State::FrameType);
270
271 match self.ty {
273 Some(Type::Control) => {
274 match (ty, self.remote_initialized) {
278 (frame::SETTINGS_FRAME_TYPE_ID, false) =>
280 self.remote_initialized = true,
281
282 (_, false) => return Err(Error::MissingSettings),
285
286 (frame::SETTINGS_FRAME_TYPE_ID, true) =>
288 return Err(Error::FrameUnexpected),
289
290 (frame::DATA_FRAME_TYPE_ID, true) =>
293 return Err(Error::FrameUnexpected),
294
295 (frame::HEADERS_FRAME_TYPE_ID, true) =>
296 return Err(Error::FrameUnexpected),
297
298 (frame::PUSH_PROMISE_FRAME_TYPE_ID, true) =>
299 return Err(Error::FrameUnexpected),
300
301 (_, true) => (),
303 }
304 },
305
306 Some(Type::Request) => {
307 if !self.is_local {
314 match (ty, self.remote_initialized) {
315 (frame::HEADERS_FRAME_TYPE_ID, false) => {
316 self.remote_initialized = true;
317 },
318
319 (frame::DATA_FRAME_TYPE_ID, false) =>
320 return Err(Error::FrameUnexpected),
321
322 (frame::HEADERS_FRAME_TYPE_ID, true) => {
323 if self.trailers_received {
324 return Err(Error::FrameUnexpected);
325 }
326
327 if self.data_received {
328 self.trailers_received = true;
329 }
330 },
331
332 (frame::DATA_FRAME_TYPE_ID, true) => {
333 if self.trailers_received {
334 return Err(Error::FrameUnexpected);
335 }
336
337 self.data_received = true;
338 },
339
340 (frame::CANCEL_PUSH_FRAME_TYPE_ID, _) =>
341 return Err(Error::FrameUnexpected),
342
343 (frame::SETTINGS_FRAME_TYPE_ID, _) =>
344 return Err(Error::FrameUnexpected),
345
346 (frame::GOAWAY_FRAME_TYPE_ID, _) =>
347 return Err(Error::FrameUnexpected),
348
349 (frame::MAX_PUSH_FRAME_TYPE_ID, _) =>
350 return Err(Error::FrameUnexpected),
351
352 (frame::PRIORITY_UPDATE_FRAME_REQUEST_TYPE_ID, _) =>
353 return Err(Error::FrameUnexpected),
354
355 (frame::PRIORITY_UPDATE_FRAME_PUSH_TYPE_ID, _) =>
356 return Err(Error::FrameUnexpected),
357
358 _ => (),
361 }
362 }
363 },
364
365 Some(Type::Push) => {
366 match ty {
367 frame::CANCEL_PUSH_FRAME_TYPE_ID =>
369 return Err(Error::FrameUnexpected),
370
371 frame::SETTINGS_FRAME_TYPE_ID =>
372 return Err(Error::FrameUnexpected),
373
374 frame::PUSH_PROMISE_FRAME_TYPE_ID =>
375 return Err(Error::FrameUnexpected),
376
377 frame::GOAWAY_FRAME_TYPE_ID =>
378 return Err(Error::FrameUnexpected),
379
380 frame::MAX_PUSH_FRAME_TYPE_ID =>
381 return Err(Error::FrameUnexpected),
382
383 _ => (),
384 }
385 },
386
387 _ => return Err(Error::FrameUnexpected),
388 }
389
390 self.frame_type = Some(ty);
391
392 self.state_transition(State::FramePayloadLen, 1, true)?;
393
394 Ok(())
395 }
396
397 pub fn frame_type(&self) -> Option<u64> {
399 self.frame_type
400 }
401
402 pub fn set_frame_payload_len(&mut self, len: u64) -> Result<()> {
404 assert_eq!(self.state, State::FramePayloadLen);
405
406 if matches!(self.ty, Some(Type::Control | Type::Request | Type::Push)) {
408 let (state, resize) = match self.frame_type {
409 Some(frame::DATA_FRAME_TYPE_ID) => (State::Data, false),
410
411 Some(
414 frame::GOAWAY_FRAME_TYPE_ID |
415 frame::PUSH_PROMISE_FRAME_TYPE_ID |
416 frame::CANCEL_PUSH_FRAME_TYPE_ID |
417 frame::MAX_PUSH_FRAME_TYPE_ID,
418 ) => {
419 if len == 0 {
420 return Err(Error::FrameError);
421 }
422
423 (State::FramePayload, true)
424 },
425
426 _ => (State::FramePayload, true),
427 };
428
429 self.state_transition(state, len as usize, resize)?;
430
431 return Ok(());
432 }
433
434 Err(Error::InternalError)
435 }
436
437 pub fn try_fill_buffer<F: BufFactory>(
443 &mut self, conn: &mut crate::Connection<F>,
444 ) -> Result<()> {
445 if self.state_buffer_complete() {
447 return Ok(());
448 }
449
450 let buf = &mut self.state_buf[self.state_off..self.state_len];
451
452 let read = match conn.stream_recv(self.id, buf) {
453 Ok((len, fin)) => {
454 if fin &&
456 matches!(
457 self.ty,
458 Some(Type::Control) |
459 Some(Type::QpackEncoder) |
460 Some(Type::QpackDecoder)
461 )
462 {
463 super::close_conn_critical_stream(conn)?;
464 }
465
466 len
467 },
468
469 Err(e @ crate::Error::StreamReset(_)) => {
470 if matches!(
472 self.ty,
473 Some(Type::Control) |
474 Some(Type::QpackEncoder) |
475 Some(Type::QpackDecoder)
476 ) {
477 super::close_conn_critical_stream(conn)?;
478 }
479
480 return Err(e.into());
481 },
482
483 Err(e) => {
484 if e == crate::Error::Done {
486 self.reset_data_event();
487 }
488
489 return Err(e.into());
490 },
491 };
492
493 trace!(
494 "{} read {} bytes on stream {}",
495 conn.trace_id(),
496 read,
497 self.id,
498 );
499
500 self.state_off += read;
501
502 if !self.state_buffer_complete() {
503 self.reset_data_event();
504
505 return Err(Error::Done);
506 }
507
508 Ok(())
509 }
510
511 pub fn initialize_local(&mut self) {
513 self.local_initialized = true
514 }
515
516 pub fn local_initialized(&self) -> bool {
518 self.local_initialized
519 }
520
521 pub fn increment_headers_received(&mut self) {
522 self.headers_received_count =
523 self.headers_received_count.saturating_add(1);
524 }
525
526 pub fn headers_received_count(&self) -> usize {
527 self.headers_received_count
528 }
529
530 pub fn mark_trailers_sent(&mut self) {
531 self.trailers_sent = true;
532 }
533
534 pub fn trailers_sent(&self) -> bool {
535 self.trailers_sent
536 }
537
538 #[cfg(test)]
543 fn try_fill_buffer_for_tests(
544 &mut self, stream: &mut std::io::Cursor<Vec<u8>>,
545 ) -> Result<()> {
546 if self.state_buffer_complete() {
548 return Ok(());
549 }
550
551 let buf = &mut self.state_buf[self.state_off..self.state_len];
552
553 let read = std::io::Read::read(stream, buf).unwrap();
554
555 self.state_off += read;
556
557 if !self.state_buffer_complete() {
558 return Err(Error::Done);
559 }
560
561 Ok(())
562 }
563
564 pub fn try_consume_varint(&mut self) -> Result<u64> {
566 if self.state_off == 1 {
567 self.state_len = octets::varint_parse_len(self.state_buf[0]);
568 self.state_buf.resize(self.state_len, 0);
569 }
570
571 if !self.state_buffer_complete() {
574 return Err(Error::Done);
575 }
576
577 let varint = octets::Octets::with_slice(&self.state_buf).get_varint()?;
578
579 Ok(varint)
580 }
581
582 pub fn try_consume_frame(&mut self) -> Result<(frame::Frame, u64)> {
586 debug_assert_eq!(self.state, State::FramePayload);
587 self.reset_data_event();
589
590 let payload_len = self.state_len as u64;
591
592 let frame = frame::Frame::from_bytes(
594 self.frame_type.unwrap(),
595 payload_len,
596 &self.state_buf,
597 )?;
598
599 self.state_transition(State::FrameType, 1, true)?;
600
601 Ok((frame, payload_len))
602 }
603
604 pub fn try_consume_data<F: BufFactory, OUT: bytes::BufMut>(
606 &mut self, conn: &mut crate::Connection<F>, out: OUT,
607 ) -> Result<(usize, bool)> {
608 debug_assert_eq!(self.state, State::Data);
609 let out = out.limit(self.state_len - self.state_off);
610
611 let (len, fin) = match conn.stream_recv_buf(self.id, out) {
612 Ok(v) => v,
613
614 Err(e) => {
615 if e == crate::Error::Done {
617 self.reset_data_event();
618 }
619
620 return Err(e.into());
621 },
622 };
623
624 self.state_off += len;
625 debug_assert!(self.state_len >= self.state_off);
626
627 if !conn.stream_readable(self.id) {
629 self.reset_data_event();
630 }
631
632 if self.state_buffer_complete() {
633 self.state_transition(State::FrameType, 1, true)?;
634 }
635
636 Ok((len, fin))
637 }
638
639 pub fn finished(&mut self) {
641 let _ = self.state_transition(State::Finished, 0, false);
642 }
643
644 #[cfg(test)]
649 fn try_consume_data_for_tests(
650 &mut self, stream: &mut std::io::Cursor<Vec<u8>>, out: &mut [u8],
651 ) -> Result<usize> {
652 let left = std::cmp::min(out.len(), self.state_len - self.state_off);
653
654 let len = std::io::Read::read(stream, &mut out[..left]).unwrap();
655
656 self.state_off += len;
657
658 if self.state_buffer_complete() {
659 self.state_transition(State::FrameType, 1, true)?;
660 }
661
662 Ok(len)
663 }
664
665 pub fn try_trigger_data_event(&mut self) -> bool {
670 if self.data_event_triggered {
671 return false;
672 }
673
674 self.data_event_triggered = true;
675
676 true
677 }
678
679 fn reset_data_event(&mut self) {
681 self.data_event_triggered = false;
682 }
683
684 pub fn set_last_priority_update(&mut self, priority_update: Option<Vec<u8>>) {
686 self.last_priority_update = priority_update;
687 }
688
689 pub fn take_last_priority_update(&mut self) -> Option<Vec<u8>> {
691 self.last_priority_update.take()
692 }
693
694 pub fn has_last_priority_update(&self) -> bool {
696 self.last_priority_update.is_some()
697 }
698
699 fn state_buffer_complete(&self) -> bool {
701 self.state_off == self.state_len
702 }
703
704 fn state_transition(
707 &mut self, new_state: State, expected_len: usize, resize: bool,
708 ) -> Result<()> {
709 if resize {
712 if expected_len > MAX_STATE_BUF_SIZE {
716 return Err(Error::ExcessiveLoad);
717 }
718
719 self.state_buf.resize(expected_len, 0);
720 }
721
722 self.state = new_state;
723 self.state_off = 0;
724 self.state_len = expected_len;
725
726 Ok(())
727 }
728}
729
730#[cfg(test)]
731mod tests {
732 use crate::h3::frame::*;
733
734 use super::*;
735
736 fn open_uni(b: &mut octets::OctetsMut, ty: u64) -> Result<Stream> {
737 let stream = <Stream>::new(2, false);
738 assert_eq!(stream.state, State::StreamType);
739
740 b.put_varint(ty)?;
741
742 Ok(stream)
743 }
744
745 fn parse_uni(
746 stream: &mut Stream, ty: u64, cursor: &mut std::io::Cursor<Vec<u8>>,
747 ) -> Result<()> {
748 stream.try_fill_buffer_for_tests(cursor)?;
749
750 let stream_ty = stream.try_consume_varint()?;
751 assert_eq!(stream_ty, ty);
752 stream.set_ty(Type::deserialize(stream_ty).unwrap())?;
753
754 Ok(())
755 }
756
757 fn parse_skip_frame(
758 stream: &mut Stream, cursor: &mut std::io::Cursor<Vec<u8>>,
759 ) -> Result<()> {
760 stream.try_fill_buffer_for_tests(cursor)?;
762
763 let frame_ty = stream.try_consume_varint()?;
764
765 stream.set_frame_type(frame_ty)?;
766 assert_eq!(stream.state, State::FramePayloadLen);
767
768 stream.try_fill_buffer_for_tests(cursor)?;
770
771 let frame_payload_len = stream.try_consume_varint()?;
772 stream.set_frame_payload_len(frame_payload_len)?;
773 assert_eq!(stream.state, State::FramePayload);
774
775 stream.try_fill_buffer_for_tests(cursor)?;
777
778 stream.try_consume_frame()?;
779 assert_eq!(stream.state, State::FrameType);
780
781 Ok(())
782 }
783
784 #[test]
785 fn control_good() {
787 let mut d = vec![42; 40];
788 let mut b = octets::OctetsMut::with_slice(&mut d);
789
790 let raw_settings = vec![
791 (SETTINGS_MAX_FIELD_SECTION_SIZE, 0),
792 (SETTINGS_QPACK_MAX_TABLE_CAPACITY, 0),
793 (SETTINGS_QPACK_BLOCKED_STREAMS, 0),
794 ];
795
796 let frame = Frame::Settings {
797 max_field_section_size: Some(0),
798 qpack_max_table_capacity: Some(0),
799 qpack_blocked_streams: Some(0),
800 connect_protocol_enabled: None,
801 h3_datagram: None,
802 grease: None,
803 additional_settings: None,
804 raw: Some(raw_settings),
805 };
806
807 let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
808 frame.to_bytes(&mut b).unwrap();
809
810 let mut cursor = std::io::Cursor::new(d);
811
812 parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
813 .unwrap();
814 assert_eq!(stream.state, State::FrameType);
815
816 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
818
819 let frame_ty = stream.try_consume_varint().unwrap();
820 assert_eq!(frame_ty, SETTINGS_FRAME_TYPE_ID);
821
822 stream.set_frame_type(frame_ty).unwrap();
823 assert_eq!(stream.state, State::FramePayloadLen);
824
825 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
827
828 let frame_payload_len = stream.try_consume_varint().unwrap();
829 assert_eq!(frame_payload_len, 6);
830 stream.set_frame_payload_len(frame_payload_len).unwrap();
831 assert_eq!(stream.state, State::FramePayload);
832
833 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
835
836 assert_eq!(stream.try_consume_frame(), Ok((frame, 6)));
837 assert_eq!(stream.state, State::FrameType);
838 }
839
840 #[test]
841 fn control_empty_settings() {
843 let mut d = vec![42; 40];
844 let mut b = octets::OctetsMut::with_slice(&mut d);
845
846 let frame = Frame::Settings {
847 max_field_section_size: None,
848 qpack_max_table_capacity: None,
849 qpack_blocked_streams: None,
850 connect_protocol_enabled: None,
851 h3_datagram: None,
852 grease: None,
853 additional_settings: None,
854 raw: Some(vec![]),
855 };
856
857 let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
858 frame.to_bytes(&mut b).unwrap();
859
860 let mut cursor = std::io::Cursor::new(d);
861
862 parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
863 .unwrap();
864 assert_eq!(stream.state, State::FrameType);
865
866 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
868
869 let frame_ty = stream.try_consume_varint().unwrap();
870 assert_eq!(frame_ty, SETTINGS_FRAME_TYPE_ID);
871
872 stream.set_frame_type(frame_ty).unwrap();
873 assert_eq!(stream.state, State::FramePayloadLen);
874
875 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
877
878 let frame_payload_len = stream.try_consume_varint().unwrap();
879 assert_eq!(frame_payload_len, 0);
880 stream.set_frame_payload_len(frame_payload_len).unwrap();
881 assert_eq!(stream.state, State::FramePayload);
882
883 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
885
886 assert_eq!(stream.try_consume_frame(), Ok((frame, 0)));
887 assert_eq!(stream.state, State::FrameType);
888 }
889
890 #[test]
891 fn control_bad_multiple_settings() {
893 let mut d = vec![42; 40];
894 let mut b = octets::OctetsMut::with_slice(&mut d);
895
896 let raw_settings = vec![
897 (SETTINGS_MAX_FIELD_SECTION_SIZE, 0),
898 (SETTINGS_QPACK_MAX_TABLE_CAPACITY, 0),
899 (SETTINGS_QPACK_BLOCKED_STREAMS, 0),
900 ];
901
902 let frame = Frame::Settings {
903 max_field_section_size: Some(0),
904 qpack_max_table_capacity: Some(0),
905 qpack_blocked_streams: Some(0),
906 connect_protocol_enabled: None,
907 h3_datagram: None,
908 grease: None,
909 additional_settings: None,
910 raw: Some(raw_settings),
911 };
912
913 let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
914 frame.to_bytes(&mut b).unwrap();
915 frame.to_bytes(&mut b).unwrap();
916
917 let mut cursor = std::io::Cursor::new(d);
918
919 parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
920 .unwrap();
921 assert_eq!(stream.state, State::FrameType);
922
923 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
925
926 let frame_ty = stream.try_consume_varint().unwrap();
927 assert_eq!(frame_ty, SETTINGS_FRAME_TYPE_ID);
928
929 stream.set_frame_type(frame_ty).unwrap();
930 assert_eq!(stream.state, State::FramePayloadLen);
931
932 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
934
935 let frame_payload_len = stream.try_consume_varint().unwrap();
936 assert_eq!(frame_payload_len, 6);
937 stream.set_frame_payload_len(frame_payload_len).unwrap();
938 assert_eq!(stream.state, State::FramePayload);
939
940 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
942
943 assert_eq!(stream.try_consume_frame(), Ok((frame, 6)));
944 assert_eq!(stream.state, State::FrameType);
945
946 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
948
949 let frame_ty = stream.try_consume_varint().unwrap();
950 assert_eq!(stream.set_frame_type(frame_ty), Err(Error::FrameUnexpected));
951 }
952
953 #[test]
954 fn control_bad_late_settings() {
956 let mut d = vec![42; 40];
957 let mut b = octets::OctetsMut::with_slice(&mut d);
958
959 let goaway = Frame::GoAway { id: 0 };
960
961 let raw_settings = vec![
962 (SETTINGS_MAX_FIELD_SECTION_SIZE, 0),
963 (SETTINGS_QPACK_MAX_TABLE_CAPACITY, 0),
964 (SETTINGS_QPACK_BLOCKED_STREAMS, 0),
965 ];
966
967 let settings = Frame::Settings {
968 max_field_section_size: Some(0),
969 qpack_max_table_capacity: Some(0),
970 qpack_blocked_streams: Some(0),
971 connect_protocol_enabled: None,
972 h3_datagram: None,
973 grease: None,
974 additional_settings: None,
975 raw: Some(raw_settings),
976 };
977
978 let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
979 goaway.to_bytes(&mut b).unwrap();
980 settings.to_bytes(&mut b).unwrap();
981
982 let mut cursor = std::io::Cursor::new(d);
983
984 parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
985 .unwrap();
986 assert_eq!(stream.state, State::FrameType);
987
988 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
990
991 let frame_ty = stream.try_consume_varint().unwrap();
992 assert_eq!(stream.set_frame_type(frame_ty), Err(Error::MissingSettings));
993 }
994
995 #[test]
996 fn control_bad_frame() {
998 let mut d = vec![42; 40];
999 let mut b = octets::OctetsMut::with_slice(&mut d);
1000
1001 let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
1002 let hdrs = Frame::Headers { header_block };
1003
1004 let raw_settings = vec![
1005 (SETTINGS_MAX_FIELD_SECTION_SIZE, 0),
1006 (SETTINGS_QPACK_MAX_TABLE_CAPACITY, 0),
1007 (SETTINGS_QPACK_BLOCKED_STREAMS, 0),
1008 (33, 33),
1009 ];
1010
1011 let settings = Frame::Settings {
1012 max_field_section_size: Some(0),
1013 qpack_max_table_capacity: Some(0),
1014 qpack_blocked_streams: Some(0),
1015 connect_protocol_enabled: None,
1016 h3_datagram: None,
1017 grease: None,
1018 additional_settings: None,
1019 raw: Some(raw_settings),
1020 };
1021
1022 let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
1023 settings.to_bytes(&mut b).unwrap();
1024 hdrs.to_bytes(&mut b).unwrap();
1025
1026 let mut cursor = std::io::Cursor::new(d);
1027
1028 parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
1029 .unwrap();
1030 assert_eq!(stream.state, State::FrameType);
1031
1032 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1034
1035 let frame_ty = stream.try_consume_varint().unwrap();
1036 stream.set_frame_type(frame_ty).unwrap();
1037
1038 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1039
1040 let frame_payload_len = stream.try_consume_varint().unwrap();
1041 stream.set_frame_payload_len(frame_payload_len).unwrap();
1042
1043 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1044
1045 assert!(stream.try_consume_frame().is_ok());
1046
1047 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1049
1050 let frame_ty = stream.try_consume_varint().unwrap();
1051 assert_eq!(stream.set_frame_type(frame_ty), Err(Error::FrameUnexpected));
1052 }
1053
1054 #[test]
1055 fn request_no_data() {
1056 let mut stream = <Stream>::new(0, false);
1057
1058 assert_eq!(stream.ty, Some(Type::Request));
1059 assert_eq!(stream.state, State::FrameType);
1060
1061 assert_eq!(stream.try_consume_varint(), Err(Error::Done));
1062 }
1063
1064 #[test]
1065 fn request_good() {
1066 let mut stream = <Stream>::new(0, false);
1067
1068 let mut d = vec![42; 128];
1069 let mut b = octets::OctetsMut::with_slice(&mut d);
1070
1071 let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
1072 let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
1073 let hdrs = Frame::Headers { header_block };
1074 let data = Frame::Data {
1075 payload: payload.clone(),
1076 };
1077
1078 hdrs.to_bytes(&mut b).unwrap();
1079 data.to_bytes(&mut b).unwrap();
1080
1081 let mut cursor = std::io::Cursor::new(d);
1082
1083 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1085
1086 let frame_ty = stream.try_consume_varint().unwrap();
1087 assert_eq!(frame_ty, HEADERS_FRAME_TYPE_ID);
1088
1089 stream.set_frame_type(frame_ty).unwrap();
1090 assert_eq!(stream.state, State::FramePayloadLen);
1091
1092 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1094
1095 let frame_payload_len = stream.try_consume_varint().unwrap();
1096 assert_eq!(frame_payload_len, 12);
1097
1098 stream.set_frame_payload_len(frame_payload_len).unwrap();
1099 assert_eq!(stream.state, State::FramePayload);
1100
1101 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1103
1104 assert_eq!(stream.try_consume_frame(), Ok((hdrs, 12)));
1105 assert_eq!(stream.state, State::FrameType);
1106
1107 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1109
1110 let frame_ty = stream.try_consume_varint().unwrap();
1111 assert_eq!(frame_ty, DATA_FRAME_TYPE_ID);
1112
1113 stream.set_frame_type(frame_ty).unwrap();
1114 assert_eq!(stream.state, State::FramePayloadLen);
1115
1116 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1118
1119 let frame_payload_len = stream.try_consume_varint().unwrap();
1120 assert_eq!(frame_payload_len, 12);
1121
1122 stream.set_frame_payload_len(frame_payload_len).unwrap();
1123 assert_eq!(stream.state, State::Data);
1124
1125 let mut recv_buf = vec![0; payload.len()];
1127 assert_eq!(
1128 stream.try_consume_data_for_tests(&mut cursor, &mut recv_buf),
1129 Ok(payload.len())
1130 );
1131 assert_eq!(payload, recv_buf);
1132
1133 assert_eq!(stream.state, State::FrameType);
1134 }
1135
1136 #[test]
1137 fn push_good() {
1138 let mut d = vec![42; 128];
1139 let mut b = octets::OctetsMut::with_slice(&mut d);
1140
1141 let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
1142 let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
1143 let hdrs = Frame::Headers { header_block };
1144 let data = Frame::Data {
1145 payload: payload.clone(),
1146 };
1147
1148 let mut stream = open_uni(&mut b, HTTP3_PUSH_STREAM_TYPE_ID).unwrap();
1149 b.put_varint(1).unwrap();
1150 hdrs.to_bytes(&mut b).unwrap();
1151 data.to_bytes(&mut b).unwrap();
1152
1153 let mut cursor = std::io::Cursor::new(d);
1154
1155 parse_uni(&mut stream, HTTP3_PUSH_STREAM_TYPE_ID, &mut cursor).unwrap();
1156 assert_eq!(stream.state, State::PushId);
1157
1158 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1160
1161 let push_id = stream.try_consume_varint().unwrap();
1162 assert_eq!(push_id, 1);
1163
1164 stream.set_push_id(push_id).unwrap();
1165 assert_eq!(stream.state, State::FrameType);
1166
1167 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1169
1170 let frame_ty = stream.try_consume_varint().unwrap();
1171 assert_eq!(frame_ty, HEADERS_FRAME_TYPE_ID);
1172
1173 stream.set_frame_type(frame_ty).unwrap();
1174 assert_eq!(stream.state, State::FramePayloadLen);
1175
1176 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1178
1179 let frame_payload_len = stream.try_consume_varint().unwrap();
1180 assert_eq!(frame_payload_len, 12);
1181
1182 stream.set_frame_payload_len(frame_payload_len).unwrap();
1183 assert_eq!(stream.state, State::FramePayload);
1184
1185 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1187
1188 assert_eq!(stream.try_consume_frame(), Ok((hdrs, 12)));
1189 assert_eq!(stream.state, State::FrameType);
1190
1191 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1193
1194 let frame_ty = stream.try_consume_varint().unwrap();
1195 assert_eq!(frame_ty, DATA_FRAME_TYPE_ID);
1196
1197 stream.set_frame_type(frame_ty).unwrap();
1198 assert_eq!(stream.state, State::FramePayloadLen);
1199
1200 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1202
1203 let frame_payload_len = stream.try_consume_varint().unwrap();
1204 assert_eq!(frame_payload_len, 12);
1205
1206 stream.set_frame_payload_len(frame_payload_len).unwrap();
1207 assert_eq!(stream.state, State::Data);
1208
1209 let mut recv_buf = vec![0; payload.len()];
1211 assert_eq!(
1212 stream.try_consume_data_for_tests(&mut cursor, &mut recv_buf),
1213 Ok(payload.len())
1214 );
1215 assert_eq!(payload, recv_buf);
1216
1217 assert_eq!(stream.state, State::FrameType);
1218 }
1219
1220 #[test]
1221 fn grease() {
1222 let mut d = vec![42; 20];
1223 let mut b = octets::OctetsMut::with_slice(&mut d);
1224
1225 let mut stream = open_uni(&mut b, 33).unwrap();
1226
1227 let mut cursor = std::io::Cursor::new(d);
1228
1229 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1231
1232 let stream_ty = stream.try_consume_varint().unwrap();
1233 assert_eq!(stream_ty, 33);
1234 stream
1235 .set_ty(Type::deserialize(stream_ty).unwrap())
1236 .unwrap();
1237 assert_eq!(stream.state, State::Drain);
1238 }
1239
1240 #[test]
1241 fn data_before_headers() {
1242 let mut stream = <Stream>::new(0, false);
1243
1244 let mut d = vec![42; 128];
1245 let mut b = octets::OctetsMut::with_slice(&mut d);
1246
1247 let data = Frame::Data {
1248 payload: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
1249 };
1250
1251 data.to_bytes(&mut b).unwrap();
1252
1253 let mut cursor = std::io::Cursor::new(d);
1254
1255 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1257
1258 let frame_ty = stream.try_consume_varint().unwrap();
1259 assert_eq!(frame_ty, DATA_FRAME_TYPE_ID);
1260
1261 assert_eq!(stream.set_frame_type(frame_ty), Err(Error::FrameUnexpected));
1262 }
1263
1264 #[test]
1265 fn additional_headers() {
1266 let mut stream = Stream::new(0, false);
1267
1268 let mut d = vec![42; 128];
1269 let mut b = octets::OctetsMut::with_slice(&mut d);
1270
1271 let header_block = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
1272 let payload = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
1273 let info_hdrs = Frame::Headers {
1274 header_block: header_block.clone(),
1275 };
1276 let non_info_hdrs = Frame::Headers {
1277 header_block: header_block.clone(),
1278 };
1279 let trailers = Frame::Headers { header_block };
1280 let data = Frame::Data {
1281 payload: payload.clone(),
1282 };
1283
1284 info_hdrs.to_bytes(&mut b).unwrap();
1285 non_info_hdrs.to_bytes(&mut b).unwrap();
1286 data.to_bytes(&mut b).unwrap();
1287 trailers.to_bytes(&mut b).unwrap();
1288
1289 let mut cursor = std::io::Cursor::new(d);
1290
1291 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1293
1294 let frame_ty = stream.try_consume_varint().unwrap();
1295 assert_eq!(frame_ty, HEADERS_FRAME_TYPE_ID);
1296
1297 stream.set_frame_type(frame_ty).unwrap();
1298 assert_eq!(stream.state, State::FramePayloadLen);
1299
1300 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1302
1303 let frame_payload_len = stream.try_consume_varint().unwrap();
1304 assert_eq!(frame_payload_len, 12);
1305
1306 stream.set_frame_payload_len(frame_payload_len).unwrap();
1307 assert_eq!(stream.state, State::FramePayload);
1308
1309 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1311
1312 assert_eq!(stream.try_consume_frame(), Ok((info_hdrs, 12)));
1313 assert_eq!(stream.state, State::FrameType);
1314
1315 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1317
1318 let frame_ty = stream.try_consume_varint().unwrap();
1319 assert_eq!(frame_ty, HEADERS_FRAME_TYPE_ID);
1320
1321 stream.set_frame_type(frame_ty).unwrap();
1322 assert_eq!(stream.state, State::FramePayloadLen);
1323
1324 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1326
1327 let frame_payload_len = stream.try_consume_varint().unwrap();
1328 assert_eq!(frame_payload_len, 12);
1329
1330 stream.set_frame_payload_len(frame_payload_len).unwrap();
1331 assert_eq!(stream.state, State::FramePayload);
1332
1333 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1335
1336 assert_eq!(stream.try_consume_frame(), Ok((non_info_hdrs, 12)));
1337 assert_eq!(stream.state, State::FrameType);
1338
1339 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1341
1342 let frame_ty = stream.try_consume_varint().unwrap();
1343 assert_eq!(frame_ty, DATA_FRAME_TYPE_ID);
1344
1345 stream.set_frame_type(frame_ty).unwrap();
1346 assert_eq!(stream.state, State::FramePayloadLen);
1347
1348 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1350
1351 let frame_payload_len = stream.try_consume_varint().unwrap();
1352 assert_eq!(frame_payload_len, 12);
1353
1354 stream.set_frame_payload_len(frame_payload_len).unwrap();
1355 assert_eq!(stream.state, State::Data);
1356
1357 let mut recv_buf = vec![0; payload.len()];
1359 assert_eq!(
1360 stream.try_consume_data_for_tests(&mut cursor, &mut recv_buf),
1361 Ok(payload.len())
1362 );
1363 assert_eq!(payload, recv_buf);
1364
1365 assert_eq!(stream.state, State::FrameType);
1366
1367 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1369
1370 let frame_ty = stream.try_consume_varint().unwrap();
1371 assert_eq!(frame_ty, HEADERS_FRAME_TYPE_ID);
1372
1373 stream.set_frame_type(frame_ty).unwrap();
1374 assert_eq!(stream.state, State::FramePayloadLen);
1375
1376 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1378
1379 let frame_payload_len = stream.try_consume_varint().unwrap();
1380 assert_eq!(frame_payload_len, 12);
1381
1382 stream.set_frame_payload_len(frame_payload_len).unwrap();
1383 assert_eq!(stream.state, State::FramePayload);
1384
1385 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1387
1388 assert_eq!(stream.try_consume_frame(), Ok((trailers, 12)));
1389 assert_eq!(stream.state, State::FrameType);
1390 }
1391
1392 #[test]
1393 fn zero_length_goaway() {
1394 let mut d = vec![42; 128];
1395 let mut b = octets::OctetsMut::with_slice(&mut d);
1396
1397 let frame = Frame::Settings {
1398 max_field_section_size: None,
1399 qpack_max_table_capacity: None,
1400 qpack_blocked_streams: None,
1401 connect_protocol_enabled: None,
1402 h3_datagram: None,
1403 grease: None,
1404 additional_settings: None,
1405 raw: Some(vec![]),
1406 };
1407
1408 let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
1409 frame.to_bytes(&mut b).unwrap();
1410
1411 b.put_varint(GOAWAY_FRAME_TYPE_ID).unwrap();
1413 b.put_varint(0).unwrap();
1414
1415 let mut cursor = std::io::Cursor::new(d);
1416
1417 parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
1418 .unwrap();
1419
1420 parse_skip_frame(&mut stream, &mut cursor).unwrap();
1422
1423 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1425 let frame_ty = stream.try_consume_varint().unwrap();
1426 assert_eq!(frame_ty, GOAWAY_FRAME_TYPE_ID);
1427
1428 stream.set_frame_type(frame_ty).unwrap();
1429 assert_eq!(stream.state, State::FramePayloadLen);
1430
1431 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1433 let frame_payload_len = stream.try_consume_varint().unwrap();
1434 assert_eq!(
1435 Err(Error::FrameError),
1436 stream.set_frame_payload_len(frame_payload_len)
1437 );
1438 }
1439
1440 #[test]
1441 fn zero_length_push_promise() {
1442 let mut d = vec![42; 128];
1443 let mut b = octets::OctetsMut::with_slice(&mut d);
1444
1445 let mut stream = <Stream>::new(0, false);
1446
1447 assert_eq!(stream.ty, Some(Type::Request));
1448 assert_eq!(stream.state, State::FrameType);
1449
1450 b.put_varint(PUSH_PROMISE_FRAME_TYPE_ID).unwrap();
1452 b.put_varint(0).unwrap();
1453
1454 let mut cursor = std::io::Cursor::new(d);
1455
1456 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1458 let frame_ty = stream.try_consume_varint().unwrap();
1459 assert_eq!(frame_ty, PUSH_PROMISE_FRAME_TYPE_ID);
1460
1461 stream.set_frame_type(frame_ty).unwrap();
1462 assert_eq!(stream.state, State::FramePayloadLen);
1463
1464 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1466 let frame_payload_len = stream.try_consume_varint().unwrap();
1467 assert_eq!(
1468 Err(Error::FrameError),
1469 stream.set_frame_payload_len(frame_payload_len)
1470 );
1471 }
1472
1473 #[test]
1474 fn zero_length_cancel_push() {
1475 let mut d = vec![42; 128];
1476 let mut b = octets::OctetsMut::with_slice(&mut d);
1477
1478 let frame = Frame::Settings {
1479 max_field_section_size: None,
1480 qpack_max_table_capacity: None,
1481 qpack_blocked_streams: None,
1482 connect_protocol_enabled: None,
1483 h3_datagram: None,
1484 grease: None,
1485 additional_settings: None,
1486 raw: Some(vec![]),
1487 };
1488
1489 let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
1490 frame.to_bytes(&mut b).unwrap();
1491
1492 b.put_varint(CANCEL_PUSH_FRAME_TYPE_ID).unwrap();
1494 b.put_varint(0).unwrap();
1495
1496 let mut cursor = std::io::Cursor::new(d);
1497
1498 parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
1499 .unwrap();
1500
1501 parse_skip_frame(&mut stream, &mut cursor).unwrap();
1503
1504 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1506 let frame_ty = stream.try_consume_varint().unwrap();
1507 assert_eq!(frame_ty, CANCEL_PUSH_FRAME_TYPE_ID);
1508
1509 stream.set_frame_type(frame_ty).unwrap();
1510 assert_eq!(stream.state, State::FramePayloadLen);
1511
1512 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1514 let frame_payload_len = stream.try_consume_varint().unwrap();
1515 assert_eq!(
1516 Err(Error::FrameError),
1517 stream.set_frame_payload_len(frame_payload_len)
1518 );
1519 }
1520
1521 #[test]
1522 fn zero_length_max_push_id() {
1523 let mut d = vec![42; 128];
1524 let mut b = octets::OctetsMut::with_slice(&mut d);
1525
1526 let frame = Frame::Settings {
1527 max_field_section_size: None,
1528 qpack_max_table_capacity: None,
1529 qpack_blocked_streams: None,
1530 connect_protocol_enabled: None,
1531 h3_datagram: None,
1532 grease: None,
1533 additional_settings: None,
1534 raw: Some(vec![]),
1535 };
1536
1537 let mut stream = open_uni(&mut b, HTTP3_CONTROL_STREAM_TYPE_ID).unwrap();
1538 frame.to_bytes(&mut b).unwrap();
1539
1540 b.put_varint(MAX_PUSH_FRAME_TYPE_ID).unwrap();
1542 b.put_varint(0).unwrap();
1543
1544 let mut cursor = std::io::Cursor::new(d);
1545
1546 parse_uni(&mut stream, HTTP3_CONTROL_STREAM_TYPE_ID, &mut cursor)
1547 .unwrap();
1548
1549 parse_skip_frame(&mut stream, &mut cursor).unwrap();
1551
1552 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1554 let frame_ty = stream.try_consume_varint().unwrap();
1555 assert_eq!(frame_ty, MAX_PUSH_FRAME_TYPE_ID);
1556
1557 stream.set_frame_type(frame_ty).unwrap();
1558 assert_eq!(stream.state, State::FramePayloadLen);
1559
1560 stream.try_fill_buffer_for_tests(&mut cursor).unwrap();
1562 let frame_payload_len = stream.try_consume_varint().unwrap();
1563 assert_eq!(
1564 Err(Error::FrameError),
1565 stream.set_frame_payload_len(frame_payload_len)
1566 );
1567 }
1568}