xwt_tests/tests/
tokio_io_read_small_buf.rs

1use xwt_core::prelude::*;
2
3#[derive(Debug, thiserror::Error)]
4pub enum Error<Endpoint>
5where
6    Endpoint: xwt_core::endpoint::Connect + std::fmt::Debug,
7    Endpoint::Connecting: std::fmt::Debug,
8    ConnectSessionFor<Endpoint>: xwt_core::session::stream::OpenBi + std::fmt::Debug,
9{
10    #[error("connect: {0}")]
11    Connect(#[source] xwt_error::Connect<Endpoint>),
12    #[error("open: {0}")]
13    Open(#[source] xwt_error::OpenBi<ConnectSessionFor<Endpoint>>),
14    #[error("write: {0}")]
15    Write(#[source] std::io::Error),
16    #[error("read: {0}")]
17    Read(#[source] std::io::Error),
18    #[error("no response")]
19    NoResponse,
20    #[error("bad data")]
21    BadData(Vec<u8>),
22}
23
24pub async fn run<Endpoint>(endpoint: Endpoint, url: &str) -> Result<(), Error<Endpoint>>
25where
26    Endpoint: xwt_core::endpoint::Connect + std::fmt::Debug,
27    Endpoint::Connecting: std::fmt::Debug,
28    ConnectSessionFor<Endpoint>: xwt_core::session::stream::OpenBi + std::fmt::Debug,
29    SendStreamFor<ConnectSessionFor<Endpoint>>: tokio::io::AsyncWrite,
30    RecvStreamFor<ConnectSessionFor<Endpoint>>: tokio::io::AsyncRead,
31{
32    let session = crate::utils::connect(&endpoint, url)
33        .await
34        .map_err(Error::Connect)?;
35
36    let (send_stream, recv_stream) = crate::utils::open_bi(&session).await.map_err(Error::Open)?;
37
38    tokio::pin!(send_stream);
39
40    let mut to_write = &b"hello"[..];
41    loop {
42        let written = tokio::io::AsyncWriteExt::write(&mut send_stream, to_write)
43            .await
44            .map_err(Error::Write)?;
45        to_write = &to_write[written..];
46        if to_write.is_empty() {
47            break;
48        }
49    }
50
51    tokio::pin!(recv_stream);
52
53    let mut read_buf = vec![0u8; 2];
54
55    let read = tokio::io::AsyncReadExt::read(&mut recv_stream, &mut read_buf[..])
56        .await
57        .map_err(Error::Read)?;
58    if read == 0 {
59        return Err(Error::NoResponse);
60    };
61    read_buf.truncate(read);
62
63    if read_buf != b"he" {
64        return Err(Error::BadData(read_buf));
65    }
66
67    let read = tokio::io::AsyncReadExt::read(&mut recv_stream, &mut read_buf[..])
68        .await
69        .map_err(Error::Read)?;
70    if read == 0 {
71        return Err(Error::NoResponse);
72    };
73    read_buf.truncate(read);
74
75    if read_buf != b"ll" {
76        return Err(Error::BadData(read_buf));
77    }
78
79    let read = tokio::io::AsyncReadExt::read(&mut recv_stream, &mut read_buf[..])
80        .await
81        .map_err(Error::Read)?;
82    if read == 0 {
83        return Err(Error::NoResponse);
84    };
85    read_buf.truncate(read);
86
87    if read_buf != b"o" {
88        return Err(Error::BadData(read_buf));
89    }
90
91    Ok(())
92}