1use bytes::BytesMut;
4use futures::{
5 Stream,
6 channel::mpsc::{Receiver, UnboundedSender},
7 stream::FusedStream,
8 task::Waker,
9};
10
11use std::{
12 io,
13 pin::Pin,
14 task::{Context, Poll},
15};
16
17use futures::future;
18use log::{debug, trace};
19use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
20
21use crate::{
22 StreamId,
23 config::INITIAL_STREAM_WINDOW,
24 error::Error,
25 frame::{Flag, Flags, Frame, Type},
26};
27
28#[derive(Debug)]
30pub struct StreamHandle {
31 id: StreamId,
32 state: StreamState,
33
34 max_recv_window: u32,
35 pub(crate) recv_window: u32,
36 send_window: u32,
37 read_buf: Vec<BytesMut>,
38
39 unbound_event_sender: UnboundedSender<StreamEvent>,
41
42 frame_receiver: Receiver<Frame>,
45
46 writeable_wake: Option<Waker>,
48
49 readable_wake: Option<Waker>,
51}
52
53impl StreamHandle {
54 pub(crate) fn new(
56 id: StreamId,
57 unbound_event_sender: UnboundedSender<StreamEvent>,
58 frame_receiver: Receiver<Frame>,
59 state: StreamState,
60 max_window_size: u32,
61 ) -> StreamHandle {
62 assert!(state == StreamState::Init || state == StreamState::SynReceived);
63 StreamHandle {
64 id,
65 state,
66 max_recv_window: max_window_size,
67 recv_window: INITIAL_STREAM_WINDOW,
68 send_window: INITIAL_STREAM_WINDOW,
69 read_buf: Vec::new(),
70 unbound_event_sender,
71 frame_receiver,
72 writeable_wake: None,
73 readable_wake: None,
74 }
75 }
76
77 pub fn id(&self) -> StreamId {
79 self.id
80 }
81 pub fn state(&self) -> StreamState {
83 self.state
84 }
85 pub fn recv_window(&self) -> u32 {
87 self.recv_window
88 }
89 pub fn send_window(&self) -> u32 {
91 self.send_window
92 }
93
94 fn close(&mut self) -> Result<(), Error> {
95 match self.state {
96 StreamState::SynSent
97 | StreamState::SynReceived
98 | StreamState::Established
99 | StreamState::Init => {
100 self.state = StreamState::LocalClosing;
101 self.send_close()?;
102 }
103 StreamState::RemoteClosing => {
104 self.state = StreamState::Closed;
105 self.send_close()?;
106 let event = StreamEvent::Closed(self.id);
107 self.unbound_send_event(event)?;
108 }
109 StreamState::Reset | StreamState::Closed => {
110 self.state = StreamState::Closed;
111 let event = StreamEvent::Closed(self.id);
112 self.unbound_send_event(event)?;
113 }
114 StreamState::LocalClosing => {
115 self.state = StreamState::Closed;
116 let event = StreamEvent::Closed(self.id);
117 self.unbound_send_event(event)?;
118 }
119 }
120 Ok(())
121 }
122
123 fn send_go_away(&mut self) {
124 self.state = StreamState::LocalClosing;
125 let _ignore = self
126 .unbound_event_sender
127 .unbounded_send(StreamEvent::GoAway);
128 }
129
130 fn unbound_send_event(&mut self, event: StreamEvent) -> Result<(), Error> {
131 self.unbound_event_sender
132 .unbounded_send(event)
133 .map_err(|_| Error::SessionShutdown)
134 }
135
136 #[inline]
137 fn unbound_send_frame(&mut self, frame: Frame) -> Result<(), Error> {
138 trace!(
139 "stream-handle({}) send_frame ty={:?}, size={}",
140 self.id,
141 frame.ty(),
142 frame.size()
143 );
144 let event = StreamEvent::Frame(frame);
145 self.unbound_send_event(event)
146 }
147
148 pub(crate) fn send_window_update(&mut self) -> Result<(), Error> {
150 let buf_len = self.read_buf.iter().map(|b| b.len()).sum::<usize>() as u32;
151 let delta = self.max_recv_window - buf_len - self.recv_window;
152
153 let flags = self.get_flags();
155 if delta < (self.max_recv_window / 2) && flags.value() == 0 {
156 return Ok(());
157 }
158 self.recv_window += delta;
160 let frame = Frame::new_window_update(flags, self.id, delta);
161 self.unbound_event_sender
162 .unbounded_send(StreamEvent::Frame(frame))
163 .map_err(|_| Error::SessionShutdown)
164 }
165
166 fn send_data(&mut self, data: &[u8]) -> Result<(), Error> {
167 let flags = self.get_flags();
168 let frame = Frame::new_data(flags, self.id, BytesMut::from(data));
169 self.unbound_send_frame(frame)
170 }
171
172 fn send_close(&mut self) -> Result<(), Error> {
173 let mut flags = self.get_flags();
174 flags.add(Flag::Fin);
175 let frame = Frame::new_window_update(flags, self.id, 0);
176 self.unbound_send_frame(frame)
177 }
178
179 fn process_flags(&mut self, flags: Flags) -> Result<(), Error> {
180 if flags.contains(Flag::Ack) && self.state == StreamState::SynSent {
181 self.state = StreamState::Established;
182 }
183 if flags.contains(Flag::Fin) {
184 match self.state {
185 StreamState::Init
186 | StreamState::SynSent
187 | StreamState::SynReceived
188 | StreamState::Established => {
189 self.state = StreamState::RemoteClosing;
190 }
191 StreamState::LocalClosing => {
192 self.state = StreamState::Closed;
193 }
194 _ => return Err(Error::UnexpectedFlag),
195 }
196 }
197 if flags.contains(Flag::Rst) {
198 self.state = StreamState::Reset;
199 }
200 Ok(())
201 }
202
203 fn get_flags(&mut self) -> Flags {
204 match self.state {
205 StreamState::Init => {
206 self.state = StreamState::SynSent;
207 Flags::from(Flag::Syn)
208 }
209 StreamState::SynReceived => {
210 self.state = StreamState::Established;
211 Flags::from(Flag::Ack)
212 }
213 _ => Flags::default(),
214 }
215 }
216
217 fn handle_frame(&mut self, frame: Frame) -> Result<(), Error> {
218 trace!(
219 "stream-handle({}) handle_frame ty={:?}, size={}",
220 self.id,
221 frame.ty(),
222 frame.size()
223 );
224 match frame.ty() {
225 Type::WindowUpdate => {
226 self.handle_window_update(&frame)?;
227 }
228 Type::Data => {
229 self.handle_data(frame)?;
230 }
231 _ => {
232 return Err(Error::InvalidMsgType);
233 }
234 }
235 Ok(())
236 }
237
238 fn handle_window_update(&mut self, frame: &Frame) -> Result<(), Error> {
239 self.process_flags(frame.flags())?;
240 self.send_window = self
241 .send_window
242 .checked_add(frame.length())
243 .ok_or(Error::InvalidMsgType)?;
244 if let Some(waker) = self.writeable_wake.take() {
246 waker.wake()
247 }
248 Ok(())
249 }
250
251 fn handle_data(&mut self, frame: Frame) -> Result<(), Error> {
252 self.process_flags(frame.flags())?;
253 let length = frame.length();
254 if length > self.recv_window {
255 return Err(Error::RecvWindowExceeded);
256 }
257
258 let (_, body) = frame.into_parts();
259 if let Some(data) = body {
260 if length > 0 {
263 self.read_buf.push(data);
264 }
265 }
266 self.recv_window -= length;
267 Ok(())
268 }
269
270 fn recv_frames(&mut self, cx: &mut Context) -> Result<bool, Error> {
271 trace!("stream-handle({}) recv_frames", self.id);
272 let mut has_new_frame = false;
273 loop {
274 match self.state {
275 StreamState::RemoteClosing => {
276 return Err(Error::SubStreamRemoteClosing);
277 }
278 StreamState::Reset | StreamState::Closed => {
279 return Err(Error::SessionShutdown);
280 }
281 _ => {}
282 }
283
284 if self.frame_receiver.is_terminated() {
285 self.state = StreamState::RemoteClosing;
286 return Err(Error::SubStreamRemoteClosing);
287 }
288
289 match Pin::new(&mut self.frame_receiver).as_mut().poll_next(cx) {
290 Poll::Ready(Some(frame)) => {
291 self.handle_frame(frame)?;
292 has_new_frame = true;
293 }
294 Poll::Ready(None) => {
295 self.state = StreamState::RemoteClosing;
296 return Err(Error::SubStreamRemoteClosing);
297 }
298 Poll::Pending => break,
299 }
300 }
301 Ok(has_new_frame)
302 }
303
304 fn recv_frames_wake(&mut self, cx: &mut Context) -> Result<(), Error> {
305 let buf_len = self.read_buf.len();
306 let state = self.state;
307 match self.recv_frames(cx) {
308 Ok(should_wake_read) => {
309 if (self.state == StreamState::RemoteClosing && state != StreamState::RemoteClosing)
312 || (should_wake_read && buf_len != self.read_buf.len())
313 {
314 if let Some(waker) = self.readable_wake.take() {
315 waker.wake();
316 }
317 }
318
319 Ok(())
320 }
321 Err(e) => {
322 if self.state == StreamState::RemoteClosing && state != StreamState::RemoteClosing {
324 if let Some(waker) = self.readable_wake.take() {
325 waker.wake();
326 }
327 }
328
329 Err(e)
330 }
331 }
332 }
333
334 fn check_self_state(&mut self) -> io::Result<bool> {
336 if self.read_buf.is_empty() {
338 match self.state {
339 StreamState::RemoteClosing | StreamState::Closed => {
340 debug!("closed(EOF)");
341 Ok(true)
343 }
344 StreamState::Reset => {
345 debug!("connection reset");
346 Err(io::ErrorKind::ConnectionReset.into())
347 }
348 _ => Ok(false),
349 }
350 } else {
351 Ok(false)
352 }
353 }
354
355 pub fn poll_peek(
358 &mut self,
359 cx: &mut Context<'_>,
360 buf: &mut ReadBuf<'_>,
361 ) -> Poll<io::Result<usize>> {
362 if self.check_self_state()? {
363 return Poll::Ready(Ok(0));
364 }
365
366 if let Err(Error::UnexpectedFlag | Error::RecvWindowExceeded | Error::InvalidMsgType) =
367 self.recv_frames(cx)
368 {
369 self.send_go_away();
371 return Poll::Ready(Err(io::ErrorKind::InvalidData.into()));
372 }
373
374 if self.check_self_state()? {
375 return Poll::Ready(Ok(0));
376 }
377
378 if self.read_buf.is_empty() {
379 self.readable_wake = Some(cx.waker().clone());
380 return Poll::Pending;
381 }
382
383 let mut total_read = 0;
384 for read_buf in self.read_buf.iter() {
385 let n = buf.remaining().min(read_buf.len());
386 if n == 0 {
387 break;
388 }
389 total_read += n;
390 let b = &read_buf[..n];
391 buf.put_slice(b);
392 }
393
394 trace!(
395 "stream-handle({}) poll_peek self.read_buf.len={}, buf.len={}, n={}",
396 self.id,
397 self.read_buf.len(),
398 buf.remaining(),
399 total_read,
400 );
401
402 Poll::Ready(Ok(total_read))
403 }
404
405 pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
408 let mut read_buf = ReadBuf::new(buf);
409 future::poll_fn(|cx| self.poll_peek(cx, &mut read_buf)).await
410 }
411}
412
413impl AsyncRead for StreamHandle {
414 fn poll_read(
415 mut self: Pin<&mut Self>,
416 cx: &mut Context,
417 buf: &mut ReadBuf<'_>,
418 ) -> Poll<io::Result<()>> {
419 if self.check_self_state()? {
420 return Poll::Ready(Ok(()));
421 }
422
423 if let Err(Error::UnexpectedFlag | Error::RecvWindowExceeded | Error::InvalidMsgType) =
424 self.recv_frames(cx)
425 {
426 self.send_go_away();
428 return Poll::Ready(Err(io::ErrorKind::InvalidData.into()));
429 }
430
431 if self.check_self_state()? {
432 return Poll::Ready(Ok(()));
433 }
434
435 if self.read_buf.is_empty() {
436 self.readable_wake = Some(cx.waker().clone());
437 return Poll::Pending;
438 }
439
440 let mut offset = None;
441 let mut total_read = 0;
442 for (index, read_buf) in self.read_buf.iter_mut().enumerate() {
443 let n = buf.remaining().min(read_buf.len());
444 if n == 0 {
445 break;
446 }
447 buf.put_slice(&read_buf.split_to(n));
448 if read_buf.is_empty() {
449 offset = Some(index);
450 }
451 total_read += n;
452 }
453 if let Some(offset) = offset {
454 self.read_buf.drain(..=offset);
455 if self.read_buf.capacity() > 24
457 && self.read_buf.capacity() / (self.read_buf.len() + 1) > 4
458 {
459 self.read_buf.shrink_to_fit();
460 }
461 }
462
463 trace!(
464 "stream-handle({}) poll_read self.read_buf.len={}, buf.len={}, n={}",
465 self.id,
466 self.read_buf.len(),
467 buf.remaining(),
468 total_read,
469 );
470
471 match self.state {
472 StreamState::RemoteClosing | StreamState::Closed | StreamState::Reset => {
473 debug!("this branch should be unreachable")
474 }
475 _ => {
476 if self.send_window_update().is_err() {
477 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
478 }
479 }
480 }
481
482 Poll::Ready(Ok(()))
483 }
484}
485
486impl AsyncWrite for StreamHandle {
487 fn poll_write(
488 mut self: Pin<&mut Self>,
489 cx: &mut Context,
490 buf: &[u8],
491 ) -> Poll<io::Result<usize>> {
492 if let Err(Error::UnexpectedFlag | Error::RecvWindowExceeded | Error::InvalidMsgType) =
507 self.recv_frames_wake(cx)
508 {
509 self.send_go_away();
511 }
512
513 match self.state {
514 StreamState::Reset => return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
515 StreamState::LocalClosing | StreamState::Closed => {
516 return Poll::Ready(Err(io::Error::new(
517 io::ErrorKind::BrokenPipe,
518 "The local is closed and data cannot be written.",
519 )));
520 }
521 _ => (),
522 }
523
524 if self.send_window == 0 {
525 self.writeable_wake = Some(cx.waker().clone());
528 return Poll::Pending;
529 }
530 let n = ::std::cmp::min(self.send_window as usize, buf.len());
532 trace!(
533 "stream-hanlde({}) poll_write self.send_window={}, buf.len={}, n={}",
534 self.id,
535 self.send_window,
536 buf.len(),
537 n,
538 );
539 let data = &buf[0..n];
540 match self.send_data(data) {
541 Ok(_) => {
542 self.send_window -= n as u32;
543
544 Poll::Ready(Ok(n))
545 }
546 Err(Error::WouldBlock) => Poll::Pending,
547 _ => Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
548 }
549 }
550
551 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
552 Poll::Ready(Ok(()))
553 }
554
555 fn poll_shutdown(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
556 debug!("[{}] StreamHandle.shutdown()", self.id);
557 match self.close() {
558 Err(_) => Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
559 Ok(()) => Poll::Ready(Ok(())),
560 }
561 }
562}
563
564impl Drop for StreamHandle {
565 fn drop(&mut self) {
566 if !self.unbound_event_sender.is_closed() && self.state != StreamState::Closed {
567 match self.state {
568 StreamState::LocalClosing | StreamState::Reset => (),
570 StreamState::Established
572 | StreamState::Init
573 | StreamState::RemoteClosing
574 | StreamState::SynReceived
575 | StreamState::SynSent => {
576 let mut flags = self.get_flags();
577 flags.add(Flag::Rst);
578 let frame = Frame::new_window_update(flags, self.id, 0);
579 let rst_event = StreamEvent::Frame(frame);
580
581 let _ignore = self.unbound_event_sender.unbounded_send(rst_event);
583 }
584 StreamState::Closed => unreachable!(),
585 }
586
587 let event = StreamEvent::Closed(self.id);
588 let _ignore = self.unbound_event_sender.unbounded_send(event);
589 }
590 }
591}
592
593#[derive(Debug, Eq, PartialEq)]
595pub(crate) enum StreamEvent {
596 Frame(Frame),
597 Closed(StreamId),
598 GoAway,
600}
601
602#[derive(Debug, Copy, Clone, Eq, PartialEq)]
604pub enum StreamState {
605 Init,
607 SynSent,
609 SynReceived,
611 Established,
613 LocalClosing,
615 RemoteClosing,
617 Closed,
619 Reset,
621}
622
623#[cfg(test)]
624mod test {
625 use super::{StreamEvent, StreamHandle, StreamState};
626 use crate::{
627 config::INITIAL_STREAM_WINDOW,
628 frame::{Flag, Flags, Frame, Type},
629 session::rt,
630 };
631 use bytes::BytesMut;
632 use futures::{
633 SinkExt, StreamExt,
634 channel::mpsc::{channel, unbounded},
635 };
636 use std::io::ErrorKind;
637 use tokio::io::{AsyncReadExt, AsyncWriteExt};
638
639 #[test]
640 fn test_drop() {
641 let rt = rt();
642 rt.block_on(async {
643 let (_frame_sender, frame_receiver) = channel(2);
644 let (unbound_sender, mut unbound_receiver) = unbounded();
645 let stream = StreamHandle::new(
646 0,
647 unbound_sender,
648 frame_receiver,
649 StreamState::Init,
650 INITIAL_STREAM_WINDOW,
651 );
652
653 drop(stream);
654 let event = unbound_receiver.next().await.unwrap();
655 match event {
656 StreamEvent::Frame(frame) => assert!(frame.flags().contains(Flag::Rst)),
657 _ => panic!("must be a frame msg contain RST"),
658 }
659 let event = unbound_receiver.next().await.unwrap();
660 match event {
661 StreamEvent::Closed(_) => (),
662 _ => panic!("must be state closed"),
663 }
664 });
665 }
666
667 #[test]
668 fn test_drop_with_state_reset() {
669 let rt = rt();
670 rt.block_on(async {
671 let (mut frame_sender, frame_receiver) = channel(2);
672 let (unbound_sender, mut unbound_receiver) = unbounded();
673 let mut stream = StreamHandle::new(
674 0,
675 unbound_sender,
676 frame_receiver,
677 StreamState::Init,
678 INITIAL_STREAM_WINDOW,
679 );
680
681 let mut flags = Flags::from(Flag::Syn);
682 flags.add(Flag::Rst);
683 let frame = Frame::new_window_update(flags, 0, 0);
684 frame_sender.send(frame).await.unwrap();
685 let mut b = [0; 1024];
686
687 assert_eq!(
689 stream.read(&mut b).await.unwrap_err().kind(),
690 ErrorKind::ConnectionReset
691 );
692
693 assert_eq!(stream.state, StreamState::Reset);
694
695 drop(stream);
696
697 let event = unbound_receiver.next().await.unwrap();
698 match event {
699 StreamEvent::Closed(_) => (),
700 _ => panic!("must be state closed"),
701 }
702 });
703 }
704
705 #[test]
706 fn test_drop_with_state_local_close() {
707 let rt = rt();
708 rt.block_on(async {
709 let (_frame_sender, frame_receiver) = channel(2);
710 let (unbound_sender, mut unbound_receiver) = unbounded();
711 let mut stream = StreamHandle::new(
712 0,
713 unbound_sender,
714 frame_receiver,
715 StreamState::Init,
716 INITIAL_STREAM_WINDOW,
717 );
718
719 let _ignore = stream.shutdown().await;
720
721 let event = unbound_receiver.next().await.unwrap();
722 match event {
723 StreamEvent::Frame(frame) => {
724 assert!(frame.flags().contains(Flag::Fin));
725 assert_eq!(frame.ty(), Type::WindowUpdate);
726 }
727 _ => panic!("must be fin window update"),
728 }
729
730 drop(stream);
731 let event = unbound_receiver.next().await.unwrap();
732 match event {
733 StreamEvent::Closed(_) => (),
734 _ => panic!("must be state closed"),
735 }
736 });
737 }
738
739 #[test]
740 fn test_data_large_than_recv_window() {
741 let rt = rt();
742 rt.block_on(async {
743 let (mut frame_sender, frame_receiver) = channel(2);
744 let (unbound_sender, mut unbound_receiver) = unbounded();
745 let mut stream = StreamHandle::new(
746 0,
747 unbound_sender,
748 frame_receiver,
749 StreamState::Init,
750 INITIAL_STREAM_WINDOW,
751 );
752
753 stream.recv_window = 2;
754
755 let flags = Flags::from(Flag::Syn);
756 let frame = Frame::new_data(flags, 0, BytesMut::from("1234"));
757 frame_sender.send(frame).await.unwrap();
758 let mut b = [0; 1024];
759
760 assert_eq!(
762 stream.read(&mut b).await.unwrap_err().kind(),
763 ErrorKind::InvalidData
764 );
765
766 let event = unbound_receiver.next().await.unwrap();
767 match event {
768 StreamEvent::GoAway => (),
769 _ => panic!("must be go away"),
770 }
771 });
772 }
773
774 #[test]
783 fn test_open_stream_with_data() {
784 let rt = rt();
785 rt.block_on(async {
786 let (_frame_sender, frame_receiver) = channel(2);
787 let (unbound_sender, mut unbound_receiver) = unbounded();
788 let mut stream = StreamHandle::new(
789 0,
790 unbound_sender,
791 frame_receiver,
792 StreamState::Init,
793 INITIAL_STREAM_WINDOW,
794 );
795
796 let data = [0; 8];
797
798 stream.send_window_update().unwrap();
799 stream.write_all(&data).await.unwrap();
800
801 let event = unbound_receiver.next().await.unwrap();
802 match event {
803 StreamEvent::Frame(frame) => assert!(frame.ty() == Type::WindowUpdate),
804 _ => panic!("must be a window update msg"),
805 }
806
807 let event = unbound_receiver.next().await.unwrap();
808 match event {
809 StreamEvent::Frame(frame) => assert!(frame.ty() == Type::Data),
810 _ => panic!("must be a frame msg"),
811 }
812 });
813 }
814
815 #[test]
816 fn test_read_with_half_close() {
817 let rt = rt();
818 rt.block_on(async {
819 let (mut frame_sender, frame_receiver) = channel(2);
820 let (unbound_sender, _unbound_receiver) = unbounded();
821 let mut stream = StreamHandle::new(
822 0,
823 unbound_sender,
824 frame_receiver,
825 StreamState::Init,
826 INITIAL_STREAM_WINDOW,
827 );
828
829 stream.shutdown().await.unwrap();
830
831 assert_eq!(stream.state, StreamState::LocalClosing);
832
833 let flags = Flags::from(Flag::Syn);
834 let frame = Frame::new_data(flags, 0, BytesMut::from("1234"));
835 frame_sender.send(frame).await.unwrap();
836 let mut b = [0; 1024];
837
838 assert_eq!(stream.read(&mut b).await.unwrap(), 4);
839 assert_eq!(&b[..4], b"1234");
840
841 assert_eq!(stream.state, StreamState::LocalClosing);
842 });
843 }
844
845 #[test]
846 fn test_write_with_half_close() {
847 let rt = rt();
848 rt.block_on(async {
849 let (mut frame_sender, frame_receiver) = channel(2);
850 let (unbound_sender, mut unbound_receiver) = unbounded();
851 let mut stream = StreamHandle::new(
852 0,
853 unbound_sender,
854 frame_receiver,
855 StreamState::Init,
856 INITIAL_STREAM_WINDOW,
857 );
858
859 let flags = Flags::from(Flag::Fin);
860 let frame = Frame::new_window_update(flags, 0, 0);
861 frame_sender.send(frame).await.unwrap();
862 let mut b = [0; 1024];
863
864 assert_eq!(stream.read(&mut b).await.unwrap(), 0);
865 assert_eq!(stream.state, StreamState::RemoteClosing);
866
867 const TEXT: &[u8] = b"testtext";
868
869 let jh = tokio::spawn(tokio::time::timeout(std::time::Duration::from_secs(4), async move {
870 loop {
871 match unbound_receiver.try_next() {
872 Ok(Some(ref event)) if matches!(event, StreamEvent::Frame(frame) if frame.length() == TEXT.len() as u32) => break,
873 Err(_) => (),
874 _ => panic!("must be frame with written text"),
875 }
876 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
877 }
878 }));
879
880 stream.write_all(TEXT).await.unwrap();
881
882 jh.await.unwrap().expect("not tiemout");
883
884 assert_eq!(stream.state, StreamState::RemoteClosing);
885 });
886 }
887
888 #[test]
889 fn test_frame_read_more_than_one() {
890 let rt = rt();
891 rt.block_on(async {
892 let (mut frame_sender, frame_receiver) = channel(3);
893 let (unbound_sender, _unbound_receiver) = unbounded();
894 let mut stream = StreamHandle::new(
895 0,
896 unbound_sender,
897 frame_receiver,
898 StreamState::Init,
899 INITIAL_STREAM_WINDOW,
900 );
901
902 let flags = Flags::from(Flag::Syn);
903 let frame = Frame::new_data(flags, 0, BytesMut::from("1234"));
904 frame_sender.send(frame).await.unwrap();
905 let flags = Flags::from(Flag::Syn);
906 let frame = Frame::new_data(flags, 0, BytesMut::default());
907 frame_sender.send(frame).await.unwrap();
908 let flags = Flags::from(Flag::Syn);
909 let frame = Frame::new_data(flags, 0, BytesMut::from("5678"));
910 frame_sender.send(frame).await.unwrap();
911 let mut b = [0; 2];
912
913 assert_eq!(stream.read(&mut b).await.unwrap(), 2);
914 assert_eq!(&b[..2], b"12");
915 assert_eq!(stream.read_buf.len(), 2);
916 assert_eq!(stream.read_buf.capacity(), 4);
917
918 assert_eq!(stream.read(&mut b).await.unwrap(), 2);
919 assert_eq!(&b[..2], b"34");
920 assert_eq!(stream.read_buf.len(), 1);
921 assert_eq!(stream.read_buf.capacity(), 4);
923
924 let flags = Flags::from(Flag::Syn);
925 let frame = Frame::new_data(flags, 0, BytesMut::default());
926 frame_sender.send(frame).await.unwrap();
927 let flags = Flags::from(Flag::Syn);
928 let frame = Frame::new_data(flags, 0, BytesMut::from("1234"));
929 frame_sender.send(frame).await.unwrap();
930 let mut c = [0; 5];
931
932 assert_eq!(stream.read(&mut c).await.unwrap(), 5);
933 assert_eq!(&c[..5], b"56781");
934 assert_eq!(stream.read_buf.len(), 1);
935 assert_eq!(stream.read_buf.capacity(), 4);
936
937 assert_eq!(stream.read(&mut b).await.unwrap(), 2);
938 assert_eq!(&b[..2], b"23");
939 assert_eq!(stream.read_buf.len(), 1);
940 assert_eq!(stream.read_buf.capacity(), 4);
941 });
942 }
943}