tokio_yamux/
stream.rs

1//! The substream, the main interface is AsyncRead/AsyncWrite
2
3use bytes::BytesMut;
4use futures::{
5    Stream,
6    channel::mpsc::{Receiver, UnboundedSender},
7    stream::FusedStream,
8    task::Waker,
9};
10
11use std::{
12    io,
13    pin::Pin,
14    task::{Context, Poll},
15};
16
17use futures::future;
18use log::{debug, trace};
19use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
20
21use crate::{
22    StreamId,
23    config::INITIAL_STREAM_WINDOW,
24    error::Error,
25    frame::{Flag, Flags, Frame, Type},
26};
27
28/// The substream
29#[derive(Debug)]
30pub struct StreamHandle {
31    id: StreamId,
32    state: StreamState,
33
34    max_recv_window: u32,
35    pub(crate) recv_window: u32,
36    send_window: u32,
37    read_buf: Vec<BytesMut>,
38
39    // Send stream event to parent session
40    unbound_event_sender: UnboundedSender<StreamEvent>,
41
42    // Receive frame of current stream from parent session
43    // (if the sender closed means session closed the stream should close too)
44    frame_receiver: Receiver<Frame>,
45
46    // when the cache is sent, a writable notification is issued
47    writeable_wake: Option<Waker>,
48
49    // when the cache is received by write, a readable notification is issued
50    readable_wake: Option<Waker>,
51}
52
53impl StreamHandle {
54    // Create a StreamHandle from session
55    pub(crate) fn new(
56        id: StreamId,
57        unbound_event_sender: UnboundedSender<StreamEvent>,
58        frame_receiver: Receiver<Frame>,
59        state: StreamState,
60        max_window_size: u32,
61    ) -> StreamHandle {
62        assert!(state == StreamState::Init || state == StreamState::SynReceived);
63        StreamHandle {
64            id,
65            state,
66            max_recv_window: max_window_size,
67            recv_window: INITIAL_STREAM_WINDOW,
68            send_window: INITIAL_STREAM_WINDOW,
69            read_buf: Vec::new(),
70            unbound_event_sender,
71            frame_receiver,
72            writeable_wake: None,
73            readable_wake: None,
74        }
75    }
76
77    /// Get the stream id
78    pub fn id(&self) -> StreamId {
79        self.id
80    }
81    /// Get the stream state
82    pub fn state(&self) -> StreamState {
83        self.state
84    }
85    /// Get the receive window size
86    pub fn recv_window(&self) -> u32 {
87        self.recv_window
88    }
89    /// Get the send window size
90    pub fn send_window(&self) -> u32 {
91        self.send_window
92    }
93
94    fn close(&mut self) -> Result<(), Error> {
95        match self.state {
96            StreamState::SynSent
97            | StreamState::SynReceived
98            | StreamState::Established
99            | StreamState::Init => {
100                self.state = StreamState::LocalClosing;
101                self.send_close()?;
102            }
103            StreamState::RemoteClosing => {
104                self.state = StreamState::Closed;
105                self.send_close()?;
106                let event = StreamEvent::Closed(self.id);
107                self.unbound_send_event(event)?;
108            }
109            StreamState::Reset | StreamState::Closed => {
110                self.state = StreamState::Closed;
111                let event = StreamEvent::Closed(self.id);
112                self.unbound_send_event(event)?;
113            }
114            StreamState::LocalClosing => {
115                self.state = StreamState::Closed;
116                let event = StreamEvent::Closed(self.id);
117                self.unbound_send_event(event)?;
118            }
119        }
120        Ok(())
121    }
122
123    fn send_go_away(&mut self) {
124        self.state = StreamState::LocalClosing;
125        let _ignore = self
126            .unbound_event_sender
127            .unbounded_send(StreamEvent::GoAway);
128    }
129
130    fn unbound_send_event(&mut self, event: StreamEvent) -> Result<(), Error> {
131        self.unbound_event_sender
132            .unbounded_send(event)
133            .map_err(|_| Error::SessionShutdown)
134    }
135
136    #[inline]
137    fn unbound_send_frame(&mut self, frame: Frame) -> Result<(), Error> {
138        trace!(
139            "stream-handle({}) send_frame ty={:?}, size={}",
140            self.id,
141            frame.ty(),
142            frame.size()
143        );
144        let event = StreamEvent::Frame(frame);
145        self.unbound_send_event(event)
146    }
147
148    // Send a window update
149    pub(crate) fn send_window_update(&mut self) -> Result<(), Error> {
150        let buf_len = self.read_buf.iter().map(|b| b.len()).sum::<usize>() as u32;
151        let delta = self.max_recv_window - buf_len - self.recv_window;
152
153        // Check if we can omit the update
154        let flags = self.get_flags();
155        if delta < (self.max_recv_window / 2) && flags.value() == 0 {
156            return Ok(());
157        }
158        // Update our window
159        self.recv_window += delta;
160        let frame = Frame::new_window_update(flags, self.id, delta);
161        self.unbound_event_sender
162            .unbounded_send(StreamEvent::Frame(frame))
163            .map_err(|_| Error::SessionShutdown)
164    }
165
166    fn send_data(&mut self, data: &[u8]) -> Result<(), Error> {
167        let flags = self.get_flags();
168        let frame = Frame::new_data(flags, self.id, BytesMut::from(data));
169        self.unbound_send_frame(frame)
170    }
171
172    fn send_close(&mut self) -> Result<(), Error> {
173        let mut flags = self.get_flags();
174        flags.add(Flag::Fin);
175        let frame = Frame::new_window_update(flags, self.id, 0);
176        self.unbound_send_frame(frame)
177    }
178
179    fn process_flags(&mut self, flags: Flags) -> Result<(), Error> {
180        if flags.contains(Flag::Ack) && self.state == StreamState::SynSent {
181            self.state = StreamState::Established;
182        }
183        if flags.contains(Flag::Fin) {
184            match self.state {
185                StreamState::Init
186                | StreamState::SynSent
187                | StreamState::SynReceived
188                | StreamState::Established => {
189                    self.state = StreamState::RemoteClosing;
190                }
191                StreamState::LocalClosing => {
192                    self.state = StreamState::Closed;
193                }
194                _ => return Err(Error::UnexpectedFlag),
195            }
196        }
197        if flags.contains(Flag::Rst) {
198            self.state = StreamState::Reset;
199        }
200        Ok(())
201    }
202
203    fn get_flags(&mut self) -> Flags {
204        match self.state {
205            StreamState::Init => {
206                self.state = StreamState::SynSent;
207                Flags::from(Flag::Syn)
208            }
209            StreamState::SynReceived => {
210                self.state = StreamState::Established;
211                Flags::from(Flag::Ack)
212            }
213            _ => Flags::default(),
214        }
215    }
216
217    fn handle_frame(&mut self, frame: Frame) -> Result<(), Error> {
218        trace!(
219            "stream-handle({}) handle_frame ty={:?}, size={}",
220            self.id,
221            frame.ty(),
222            frame.size()
223        );
224        match frame.ty() {
225            Type::WindowUpdate => {
226                self.handle_window_update(&frame)?;
227            }
228            Type::Data => {
229                self.handle_data(frame)?;
230            }
231            _ => {
232                return Err(Error::InvalidMsgType);
233            }
234        }
235        Ok(())
236    }
237
238    fn handle_window_update(&mut self, frame: &Frame) -> Result<(), Error> {
239        self.process_flags(frame.flags())?;
240        self.send_window = self
241            .send_window
242            .checked_add(frame.length())
243            .ok_or(Error::InvalidMsgType)?;
244        // wake writer continue
245        if let Some(waker) = self.writeable_wake.take() {
246            waker.wake()
247        }
248        Ok(())
249    }
250
251    fn handle_data(&mut self, frame: Frame) -> Result<(), Error> {
252        self.process_flags(frame.flags())?;
253        let length = frame.length();
254        if length > self.recv_window {
255            return Err(Error::RecvWindowExceeded);
256        }
257
258        let (_, body) = frame.into_parts();
259        if let Some(data) = body {
260            // yamux allows empty data frame
261            // but here we just drop it
262            if length > 0 {
263                self.read_buf.push(data);
264            }
265        }
266        self.recv_window -= length;
267        Ok(())
268    }
269
270    fn recv_frames(&mut self, cx: &mut Context) -> Result<bool, Error> {
271        trace!("stream-handle({}) recv_frames", self.id);
272        let mut has_new_frame = false;
273        loop {
274            match self.state {
275                StreamState::RemoteClosing => {
276                    return Err(Error::SubStreamRemoteClosing);
277                }
278                StreamState::Reset | StreamState::Closed => {
279                    return Err(Error::SessionShutdown);
280                }
281                _ => {}
282            }
283
284            if self.frame_receiver.is_terminated() {
285                self.state = StreamState::RemoteClosing;
286                return Err(Error::SubStreamRemoteClosing);
287            }
288
289            match Pin::new(&mut self.frame_receiver).as_mut().poll_next(cx) {
290                Poll::Ready(Some(frame)) => {
291                    self.handle_frame(frame)?;
292                    has_new_frame = true;
293                }
294                Poll::Ready(None) => {
295                    self.state = StreamState::RemoteClosing;
296                    return Err(Error::SubStreamRemoteClosing);
297                }
298                Poll::Pending => break,
299            }
300        }
301        Ok(has_new_frame)
302    }
303
304    fn recv_frames_wake(&mut self, cx: &mut Context) -> Result<(), Error> {
305        let buf_len = self.read_buf.len();
306        let state = self.state;
307        match self.recv_frames(cx) {
308            Ok(should_wake_read) => {
309                // if state change to RemoteClosing, wake read
310                // if read buf len change, wake read
311                if (self.state == StreamState::RemoteClosing && state != StreamState::RemoteClosing)
312                    || (should_wake_read && buf_len != self.read_buf.len())
313                {
314                    if let Some(waker) = self.readable_wake.take() {
315                        waker.wake();
316                    }
317                }
318
319                Ok(())
320            }
321            Err(e) => {
322                // if state change to RemoteClosing, wake read
323                if self.state == StreamState::RemoteClosing && state != StreamState::RemoteClosing {
324                    if let Some(waker) = self.readable_wake.take() {
325                        waker.wake();
326                    }
327                }
328
329                Err(e)
330            }
331        }
332    }
333
334    // Returns Ok(true) only if eof is reached.
335    fn check_self_state(&mut self) -> io::Result<bool> {
336        // if read buf is empty and state is close, return close error
337        if self.read_buf.is_empty() {
338            match self.state {
339                StreamState::RemoteClosing | StreamState::Closed => {
340                    debug!("closed(EOF)");
341                    // an empty read indicates that EOF is reached.
342                    Ok(true)
343                }
344                StreamState::Reset => {
345                    debug!("connection reset");
346                    Err(io::ErrorKind::ConnectionReset.into())
347                }
348                _ => Ok(false),
349            }
350        } else {
351            Ok(false)
352        }
353    }
354
355    /// Attempts to receive data on the socket, without removing that data from the queue,
356    /// registering the current task for wakeup if data is not yet available.
357    pub fn poll_peek(
358        &mut self,
359        cx: &mut Context<'_>,
360        buf: &mut ReadBuf<'_>,
361    ) -> Poll<io::Result<usize>> {
362        if self.check_self_state()? {
363            return Poll::Ready(Ok(0));
364        }
365
366        if let Err(Error::UnexpectedFlag | Error::RecvWindowExceeded | Error::InvalidMsgType) =
367            self.recv_frames(cx)
368        {
369            // read flag error or read data error
370            self.send_go_away();
371            return Poll::Ready(Err(io::ErrorKind::InvalidData.into()));
372        }
373
374        if self.check_self_state()? {
375            return Poll::Ready(Ok(0));
376        }
377
378        if self.read_buf.is_empty() {
379            self.readable_wake = Some(cx.waker().clone());
380            return Poll::Pending;
381        }
382
383        let mut total_read = 0;
384        for read_buf in self.read_buf.iter() {
385            let n = buf.remaining().min(read_buf.len());
386            if n == 0 {
387                break;
388            }
389            total_read += n;
390            let b = &read_buf[..n];
391            buf.put_slice(b);
392        }
393
394        trace!(
395            "stream-handle({}) poll_peek self.read_buf.len={}, buf.len={}, n={}",
396            self.id,
397            self.read_buf.len(),
398            buf.remaining(),
399            total_read,
400        );
401
402        Poll::Ready(Ok(total_read))
403    }
404
405    /// Receives data on the socket from the remote address to which it is connected,
406    /// without removing that data from the queue. On success, returns the number of bytes peeked.
407    pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
408        let mut read_buf = ReadBuf::new(buf);
409        future::poll_fn(|cx| self.poll_peek(cx, &mut read_buf)).await
410    }
411}
412
413impl AsyncRead for StreamHandle {
414    fn poll_read(
415        mut self: Pin<&mut Self>,
416        cx: &mut Context,
417        buf: &mut ReadBuf<'_>,
418    ) -> Poll<io::Result<()>> {
419        if self.check_self_state()? {
420            return Poll::Ready(Ok(()));
421        }
422
423        if let Err(Error::UnexpectedFlag | Error::RecvWindowExceeded | Error::InvalidMsgType) =
424            self.recv_frames(cx)
425        {
426            // read flag error or read data error
427            self.send_go_away();
428            return Poll::Ready(Err(io::ErrorKind::InvalidData.into()));
429        }
430
431        if self.check_self_state()? {
432            return Poll::Ready(Ok(()));
433        }
434
435        if self.read_buf.is_empty() {
436            self.readable_wake = Some(cx.waker().clone());
437            return Poll::Pending;
438        }
439
440        let mut offset = None;
441        let mut total_read = 0;
442        for (index, read_buf) in self.read_buf.iter_mut().enumerate() {
443            let n = buf.remaining().min(read_buf.len());
444            if n == 0 {
445                break;
446            }
447            buf.put_slice(&read_buf.split_to(n));
448            if read_buf.is_empty() {
449                offset = Some(index);
450            }
451            total_read += n;
452        }
453        if let Some(offset) = offset {
454            self.read_buf.drain(..=offset);
455            // drain does not shrink the capacity, if the capacity is too large, shrink it
456            if self.read_buf.capacity() > 24
457                && self.read_buf.capacity() / (self.read_buf.len() + 1) > 4
458            {
459                self.read_buf.shrink_to_fit();
460            }
461        }
462
463        trace!(
464            "stream-handle({}) poll_read self.read_buf.len={}, buf.len={}, n={}",
465            self.id,
466            self.read_buf.len(),
467            buf.remaining(),
468            total_read,
469        );
470
471        match self.state {
472            StreamState::RemoteClosing | StreamState::Closed | StreamState::Reset => {
473                debug!("this branch should be unreachable")
474            }
475            _ => {
476                if self.send_window_update().is_err() {
477                    return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
478                }
479            }
480        }
481
482        Poll::Ready(Ok(()))
483    }
484}
485
486impl AsyncWrite for StreamHandle {
487    fn poll_write(
488        mut self: Pin<&mut Self>,
489        cx: &mut Context,
490        buf: &[u8],
491    ) -> Poll<io::Result<usize>> {
492        // https://github.com/driftluo/tentacle/issues/33
493        // read frame from session is necessary.
494        // The window update message of Yamux must be updated normally.
495        // If the user only writes but does not read, the entire stream will be stuck.
496        // To avoid this, read operations are required when there is a frame in the session.
497        //
498        // Another solution to avoid this problem is to let the session and stream share the state.
499        // In the rust implementation, at least the following three states are required:
500        // 1. writeable_wake
501        // 2. send_window
502        // 3. state
503        //
504        // When the session receives a window update frame, it can update the state of the stream.
505        // In the implementation here, we try not to share state between the session and the stream.
506        if let Err(Error::UnexpectedFlag | Error::RecvWindowExceeded | Error::InvalidMsgType) =
507            self.recv_frames_wake(cx)
508        {
509            // read flag error or read data error
510            self.send_go_away();
511        }
512
513        match self.state {
514            StreamState::Reset => return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
515            StreamState::LocalClosing | StreamState::Closed => {
516                return Poll::Ready(Err(io::Error::new(
517                    io::ErrorKind::BrokenPipe,
518                    "The local is closed and data cannot be written.",
519                )));
520            }
521            _ => (),
522        }
523
524        if self.send_window == 0 {
525            // register writer context waker
526            // when write buf become empty, it can wake the upper layer to write the message again
527            self.writeable_wake = Some(cx.waker().clone());
528            return Poll::Pending;
529        }
530        // Allow n = 0, send an empty frame to remote
531        let n = ::std::cmp::min(self.send_window as usize, buf.len());
532        trace!(
533            "stream-hanlde({}) poll_write self.send_window={}, buf.len={}, n={}",
534            self.id,
535            self.send_window,
536            buf.len(),
537            n,
538        );
539        let data = &buf[0..n];
540        match self.send_data(data) {
541            Ok(_) => {
542                self.send_window -= n as u32;
543
544                Poll::Ready(Ok(n))
545            }
546            Err(Error::WouldBlock) => Poll::Pending,
547            _ => Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
548        }
549    }
550
551    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
552        Poll::Ready(Ok(()))
553    }
554
555    fn poll_shutdown(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
556        debug!("[{}] StreamHandle.shutdown()", self.id);
557        match self.close() {
558            Err(_) => Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
559            Ok(()) => Poll::Ready(Ok(())),
560        }
561    }
562}
563
564impl Drop for StreamHandle {
565    fn drop(&mut self) {
566        if !self.unbound_event_sender.is_closed() && self.state != StreamState::Closed {
567            match self.state {
568                // LocalClosing means that local have sent Fin to the remote and waiting for a response.
569                StreamState::LocalClosing | StreamState::Reset => (),
570                // if not, we should send Rst first
571                StreamState::Established
572                | StreamState::Init
573                | StreamState::RemoteClosing
574                | StreamState::SynReceived
575                | StreamState::SynSent => {
576                    let mut flags = self.get_flags();
577                    flags.add(Flag::Rst);
578                    let frame = Frame::new_window_update(flags, self.id, 0);
579                    let rst_event = StreamEvent::Frame(frame);
580
581                    // Always successful unless the session is dropped
582                    let _ignore = self.unbound_event_sender.unbounded_send(rst_event);
583                }
584                StreamState::Closed => unreachable!(),
585            }
586
587            let event = StreamEvent::Closed(self.id);
588            let _ignore = self.unbound_event_sender.unbounded_send(event);
589        }
590    }
591}
592
593// Stream event
594#[derive(Debug, Eq, PartialEq)]
595pub(crate) enum StreamEvent {
596    Frame(Frame),
597    Closed(StreamId),
598    // Only use on protocol error
599    GoAway,
600}
601
602/// The stream state
603#[derive(Debug, Copy, Clone, Eq, PartialEq)]
604pub enum StreamState {
605    /// Just created
606    Init,
607    /// We sent a Syn message
608    SynSent,
609    /// We received a Syn message
610    SynReceived,
611    /// Stream established
612    Established,
613    /// We closed the stream
614    LocalClosing,
615    /// Remote closed the stream
616    RemoteClosing,
617    /// Both side of the stream closed
618    Closed,
619    /// Stream rejected by remote
620    Reset,
621}
622
623#[cfg(test)]
624mod test {
625    use super::{StreamEvent, StreamHandle, StreamState};
626    use crate::{
627        config::INITIAL_STREAM_WINDOW,
628        frame::{Flag, Flags, Frame, Type},
629        session::rt,
630    };
631    use bytes::BytesMut;
632    use futures::{
633        SinkExt, StreamExt,
634        channel::mpsc::{channel, unbounded},
635    };
636    use std::io::ErrorKind;
637    use tokio::io::{AsyncReadExt, AsyncWriteExt};
638
639    #[test]
640    fn test_drop() {
641        let rt = rt();
642        rt.block_on(async {
643            let (_frame_sender, frame_receiver) = channel(2);
644            let (unbound_sender, mut unbound_receiver) = unbounded();
645            let stream = StreamHandle::new(
646                0,
647                unbound_sender,
648                frame_receiver,
649                StreamState::Init,
650                INITIAL_STREAM_WINDOW,
651            );
652
653            drop(stream);
654            let event = unbound_receiver.next().await.unwrap();
655            match event {
656                StreamEvent::Frame(frame) => assert!(frame.flags().contains(Flag::Rst)),
657                _ => panic!("must be a frame msg contain RST"),
658            }
659            let event = unbound_receiver.next().await.unwrap();
660            match event {
661                StreamEvent::Closed(_) => (),
662                _ => panic!("must be state closed"),
663            }
664        });
665    }
666
667    #[test]
668    fn test_drop_with_state_reset() {
669        let rt = rt();
670        rt.block_on(async {
671            let (mut frame_sender, frame_receiver) = channel(2);
672            let (unbound_sender, mut unbound_receiver) = unbounded();
673            let mut stream = StreamHandle::new(
674                0,
675                unbound_sender,
676                frame_receiver,
677                StreamState::Init,
678                INITIAL_STREAM_WINDOW,
679            );
680
681            let mut flags = Flags::from(Flag::Syn);
682            flags.add(Flag::Rst);
683            let frame = Frame::new_window_update(flags, 0, 0);
684            frame_sender.send(frame).await.unwrap();
685            let mut b = [0; 1024];
686
687            // try poll stream handle, then it will recv RST frame and set self state to reset
688            assert_eq!(
689                stream.read(&mut b).await.unwrap_err().kind(),
690                ErrorKind::ConnectionReset
691            );
692
693            assert_eq!(stream.state, StreamState::Reset);
694
695            drop(stream);
696
697            let event = unbound_receiver.next().await.unwrap();
698            match event {
699                StreamEvent::Closed(_) => (),
700                _ => panic!("must be state closed"),
701            }
702        });
703    }
704
705    #[test]
706    fn test_drop_with_state_local_close() {
707        let rt = rt();
708        rt.block_on(async {
709            let (_frame_sender, frame_receiver) = channel(2);
710            let (unbound_sender, mut unbound_receiver) = unbounded();
711            let mut stream = StreamHandle::new(
712                0,
713                unbound_sender,
714                frame_receiver,
715                StreamState::Init,
716                INITIAL_STREAM_WINDOW,
717            );
718
719            let _ignore = stream.shutdown().await;
720
721            let event = unbound_receiver.next().await.unwrap();
722            match event {
723                StreamEvent::Frame(frame) => {
724                    assert!(frame.flags().contains(Flag::Fin));
725                    assert_eq!(frame.ty(), Type::WindowUpdate);
726                }
727                _ => panic!("must be fin window update"),
728            }
729
730            drop(stream);
731            let event = unbound_receiver.next().await.unwrap();
732            match event {
733                StreamEvent::Closed(_) => (),
734                _ => panic!("must be state closed"),
735            }
736        });
737    }
738
739    #[test]
740    fn test_data_large_than_recv_window() {
741        let rt = rt();
742        rt.block_on(async {
743            let (mut frame_sender, frame_receiver) = channel(2);
744            let (unbound_sender, mut unbound_receiver) = unbounded();
745            let mut stream = StreamHandle::new(
746                0,
747                unbound_sender,
748                frame_receiver,
749                StreamState::Init,
750                INITIAL_STREAM_WINDOW,
751            );
752
753            stream.recv_window = 2;
754
755            let flags = Flags::from(Flag::Syn);
756            let frame = Frame::new_data(flags, 0, BytesMut::from("1234"));
757            frame_sender.send(frame).await.unwrap();
758            let mut b = [0; 1024];
759
760            // try poll stream handle, then it will recv data frame and return Err
761            assert_eq!(
762                stream.read(&mut b).await.unwrap_err().kind(),
763                ErrorKind::InvalidData
764            );
765
766            let event = unbound_receiver.next().await.unwrap();
767            match event {
768                StreamEvent::GoAway => (),
769                _ => panic!("must be go away"),
770            }
771        });
772    }
773
774    // https://github.com/nervosnetwork/tentacle/issues/297
775    //
776    // As you can see from the description, the real cause of the problem
777    // is that the two channels cannot guarantee the consistency of the sending
778    // order, that is, the order of the message to start the stream and the
779    // message to send data is reversed, causing the remote end to receive
780    // an unowned message , Silently discarded, causing the problem that
781    // the protocol cannot be opened
782    #[test]
783    fn test_open_stream_with_data() {
784        let rt = rt();
785        rt.block_on(async {
786            let (_frame_sender, frame_receiver) = channel(2);
787            let (unbound_sender, mut unbound_receiver) = unbounded();
788            let mut stream = StreamHandle::new(
789                0,
790                unbound_sender,
791                frame_receiver,
792                StreamState::Init,
793                INITIAL_STREAM_WINDOW,
794            );
795
796            let data = [0; 8];
797
798            stream.send_window_update().unwrap();
799            stream.write_all(&data).await.unwrap();
800
801            let event = unbound_receiver.next().await.unwrap();
802            match event {
803                StreamEvent::Frame(frame) => assert!(frame.ty() == Type::WindowUpdate),
804                _ => panic!("must be a window update msg"),
805            }
806
807            let event = unbound_receiver.next().await.unwrap();
808            match event {
809                StreamEvent::Frame(frame) => assert!(frame.ty() == Type::Data),
810                _ => panic!("must be a frame msg"),
811            }
812        });
813    }
814
815    #[test]
816    fn test_read_with_half_close() {
817        let rt = rt();
818        rt.block_on(async {
819            let (mut frame_sender, frame_receiver) = channel(2);
820            let (unbound_sender, _unbound_receiver) = unbounded();
821            let mut stream = StreamHandle::new(
822                0,
823                unbound_sender,
824                frame_receiver,
825                StreamState::Init,
826                INITIAL_STREAM_WINDOW,
827            );
828
829            stream.shutdown().await.unwrap();
830
831            assert_eq!(stream.state, StreamState::LocalClosing);
832
833            let flags = Flags::from(Flag::Syn);
834            let frame = Frame::new_data(flags, 0, BytesMut::from("1234"));
835            frame_sender.send(frame).await.unwrap();
836            let mut b = [0; 1024];
837
838            assert_eq!(stream.read(&mut b).await.unwrap(), 4);
839            assert_eq!(&b[..4], b"1234");
840
841            assert_eq!(stream.state, StreamState::LocalClosing);
842        });
843    }
844
845    #[test]
846    fn test_write_with_half_close() {
847        let rt = rt();
848        rt.block_on(async {
849            let (mut frame_sender, frame_receiver) = channel(2);
850            let (unbound_sender, mut unbound_receiver) = unbounded();
851            let mut stream = StreamHandle::new(
852                0,
853                unbound_sender,
854                frame_receiver,
855                StreamState::Init,
856                INITIAL_STREAM_WINDOW,
857            );
858
859            let flags = Flags::from(Flag::Fin);
860            let frame = Frame::new_window_update(flags, 0, 0);
861            frame_sender.send(frame).await.unwrap();
862            let mut b = [0; 1024];
863
864            assert_eq!(stream.read(&mut b).await.unwrap(), 0);
865            assert_eq!(stream.state, StreamState::RemoteClosing);
866
867            const TEXT: &[u8] = b"testtext";
868
869            let jh = tokio::spawn(tokio::time::timeout(std::time::Duration::from_secs(4), async move {
870                loop {
871                    match unbound_receiver.try_next() {
872                        Ok(Some(ref event)) if matches!(event, StreamEvent::Frame(frame) if frame.length() == TEXT.len() as u32) => break,
873                        Err(_) => (),
874                        _ => panic!("must be frame with written text"),
875                    }
876                    tokio::time::sleep(std::time::Duration::from_millis(20)).await;
877                }
878            }));
879
880            stream.write_all(TEXT).await.unwrap();
881
882            jh.await.unwrap().expect("not tiemout");
883
884            assert_eq!(stream.state, StreamState::RemoteClosing);
885        });
886    }
887
888    #[test]
889    fn test_frame_read_more_than_one() {
890        let rt = rt();
891        rt.block_on(async {
892            let (mut frame_sender, frame_receiver) = channel(3);
893            let (unbound_sender, _unbound_receiver) = unbounded();
894            let mut stream = StreamHandle::new(
895                0,
896                unbound_sender,
897                frame_receiver,
898                StreamState::Init,
899                INITIAL_STREAM_WINDOW,
900            );
901
902            let flags = Flags::from(Flag::Syn);
903            let frame = Frame::new_data(flags, 0, BytesMut::from("1234"));
904            frame_sender.send(frame).await.unwrap();
905            let flags = Flags::from(Flag::Syn);
906            let frame = Frame::new_data(flags, 0, BytesMut::default());
907            frame_sender.send(frame).await.unwrap();
908            let flags = Flags::from(Flag::Syn);
909            let frame = Frame::new_data(flags, 0, BytesMut::from("5678"));
910            frame_sender.send(frame).await.unwrap();
911            let mut b = [0; 2];
912
913            assert_eq!(stream.read(&mut b).await.unwrap(), 2);
914            assert_eq!(&b[..2], b"12");
915            assert_eq!(stream.read_buf.len(), 2);
916            assert_eq!(stream.read_buf.capacity(), 4);
917
918            assert_eq!(stream.read(&mut b).await.unwrap(), 2);
919            assert_eq!(&b[..2], b"34");
920            assert_eq!(stream.read_buf.len(), 1);
921            // Drain does not shrink the capacity
922            assert_eq!(stream.read_buf.capacity(), 4);
923
924            let flags = Flags::from(Flag::Syn);
925            let frame = Frame::new_data(flags, 0, BytesMut::default());
926            frame_sender.send(frame).await.unwrap();
927            let flags = Flags::from(Flag::Syn);
928            let frame = Frame::new_data(flags, 0, BytesMut::from("1234"));
929            frame_sender.send(frame).await.unwrap();
930            let mut c = [0; 5];
931
932            assert_eq!(stream.read(&mut c).await.unwrap(), 5);
933            assert_eq!(&c[..5], b"56781");
934            assert_eq!(stream.read_buf.len(), 1);
935            assert_eq!(stream.read_buf.capacity(), 4);
936
937            assert_eq!(stream.read(&mut b).await.unwrap(), 2);
938            assert_eq!(&b[..2], b"23");
939            assert_eq!(stream.read_buf.len(), 1);
940            assert_eq!(stream.read_buf.capacity(), 4);
941        });
942    }
943}