xwt_tests/tests/
tokio_io_read_small_buf.rs1use xwt_core::prelude::*;
2
3#[derive(Debug, thiserror::Error)]
4pub enum Error<Endpoint>
5where
6 Endpoint: xwt_core::endpoint::Connect + std::fmt::Debug,
7 Endpoint::Connecting: std::fmt::Debug,
8 ConnectSessionFor<Endpoint>: xwt_core::session::stream::OpenBi + std::fmt::Debug,
9{
10 #[error("connect: {0}")]
11 Connect(#[source] xwt_error::Connect<Endpoint>),
12 #[error("open: {0}")]
13 Open(#[source] xwt_error::OpenBi<ConnectSessionFor<Endpoint>>),
14 #[error("write: {0}")]
15 Write(#[source] std::io::Error),
16 #[error("read: {0}")]
17 Read(#[source] std::io::Error),
18 #[error("no response")]
19 NoResponse,
20 #[error("bad data")]
21 BadData(Vec<u8>),
22}
23
24pub async fn run<Endpoint>(endpoint: Endpoint, url: &str) -> Result<(), Error<Endpoint>>
25where
26 Endpoint: xwt_core::endpoint::Connect + std::fmt::Debug,
27 Endpoint::Connecting: std::fmt::Debug,
28 ConnectSessionFor<Endpoint>: xwt_core::session::stream::OpenBi + std::fmt::Debug,
29 SendStreamFor<ConnectSessionFor<Endpoint>>: tokio::io::AsyncWrite,
30 RecvStreamFor<ConnectSessionFor<Endpoint>>: tokio::io::AsyncRead,
31{
32 let session = crate::utils::connect(&endpoint, url)
33 .await
34 .map_err(Error::Connect)?;
35
36 let (send_stream, recv_stream) = crate::utils::open_bi(&session).await.map_err(Error::Open)?;
37
38 tokio::pin!(send_stream);
39
40 let mut to_write = &b"hello"[..];
41 loop {
42 let written = tokio::io::AsyncWriteExt::write(&mut send_stream, to_write)
43 .await
44 .map_err(Error::Write)?;
45 to_write = &to_write[written..];
46 if to_write.is_empty() {
47 break;
48 }
49 }
50
51 tokio::pin!(recv_stream);
52
53 let mut read_buf = vec![0u8; 2];
54
55 let read = tokio::io::AsyncReadExt::read(&mut recv_stream, &mut read_buf[..])
56 .await
57 .map_err(Error::Read)?;
58 if read == 0 {
59 return Err(Error::NoResponse);
60 };
61 read_buf.truncate(read);
62
63 if read_buf != b"he" {
64 return Err(Error::BadData(read_buf));
65 }
66
67 let read = tokio::io::AsyncReadExt::read(&mut recv_stream, &mut read_buf[..])
68 .await
69 .map_err(Error::Read)?;
70 if read == 0 {
71 return Err(Error::NoResponse);
72 };
73 read_buf.truncate(read);
74
75 if read_buf != b"ll" {
76 return Err(Error::BadData(read_buf));
77 }
78
79 let read = tokio::io::AsyncReadExt::read(&mut recv_stream, &mut read_buf[..])
80 .await
81 .map_err(Error::Read)?;
82 if read == 0 {
83 return Err(Error::NoResponse);
84 };
85 read_buf.truncate(read);
86
87 if read_buf != b"o" {
88 return Err(Error::BadData(read_buf));
89 }
90
91 Ok(())
92}