async_quic/
stream.rs

1use std::{
2    io,
3    pin::Pin,
4    sync::Arc,
5    task::{Context, Poll},
6};
7
8use crate::ConnectionInner;
9use futures::prelude::*;
10
11pub struct QuicStream<const R: bool, const W: bool> {
12    conn: Arc<ConnectionInner>,
13    id: quinn_proto::StreamId,
14    read_err: Option<quinn_proto::VarInt>,
15}
16
17impl<const R: bool, const W: bool> QuicStream<R, W> {
18    pub(crate) fn new(conn: Arc<ConnectionInner>, id: quinn_proto::StreamId) -> Self {
19        Self {
20            conn,
21            id,
22            read_err: None,
23        }
24    }
25}
26
27impl<const W: bool> AsyncRead for QuicStream<true, W> {
28    fn poll_read(
29        self: Pin<&mut Self>,
30        cx: &mut Context<'_>,
31        buf: &mut [u8],
32    ) -> Poll<io::Result<usize>> {
33        if self.read_err.is_some() {
34            return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
35        }
36        let (n, err) = match self.conn.poll_read(self.id, cx, buf) {
37            Poll::Ready(ret) => ret,
38            Poll::Pending => return Poll::Pending,
39        };
40        if let Some(err) = err {
41            self.get_mut().read_err = Some(err);
42            if n == 0 {
43                return Poll::Ready(Err(io::ErrorKind::ConnectionReset.into()));
44            }
45        }
46        Poll::Ready(Ok(n))
47    }
48}
49
50impl<const R: bool> AsyncWrite for QuicStream<R, true> {
51    fn poll_write(
52        self: Pin<&mut Self>,
53        cx: &mut Context<'_>,
54        buf: &[u8],
55    ) -> Poll<io::Result<usize>> {
56        match self.conn.poll_write(self.id, cx, buf) {
57            Poll::Ready(Ok(n)) => Poll::Ready(Ok(n)),
58            Poll::Ready(Err(_)) => Poll::Ready(Err(io::ErrorKind::ConnectionReset.into())),
59            Poll::Pending => Poll::Pending,
60        }
61    }
62
63    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
64        // TODO: Check if all data has been acked?
65        Poll::Ready(Ok(()))
66    }
67
68    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
69        // TODO: Check if all data and finish have been acked?
70        match self.conn.close(self.id, cx) {
71            Ok(()) => Poll::Ready(Ok(())),
72            Err(_) => Poll::Ready(Err(io::ErrorKind::ConnectionReset.into())),
73        }
74    }
75}