Skip to main content

wtx/
stream.rs

1//! Abstractions over different types of data streams.
2
3macro_rules! _local_write_all {
4  ($bytes:expr, $write:expr) => {{
5    while !$bytes.is_empty() {
6      match $write {
7        Err(e) => return Err(e.into()),
8        Ok(0) => return { Err(crate::Error::UnexpectedStreamWriteEOF) },
9        Ok(n) => $bytes = $bytes.get(n..).unwrap_or_default(),
10      }
11    }
12  }};
13}
14
15macro_rules! _local_write_all_vectored {
16  ($bytes:expr, $this:ident, |$io_slices:ident| $write_many:expr) => {{
17    match $bytes {
18      [] => return Ok(()),
19      [single] => {
20        <Self as crate::stream::StreamWriter>::write_all($this, single).await?;
21      }
22      _ => {
23        let mut buffer = [std::io::IoSlice::new(&[]); _];
24        let mut $io_slices = crate::stream::convert_to_io_slices(&mut buffer, $bytes)?;
25        while !$io_slices.is_empty() {
26          match $write_many {
27            Err(e) => return Err(e.into()),
28            Ok(0) => return Err(crate::Error::UnexpectedStreamWriteEOF),
29            Ok(n) => std::io::IoSlice::advance_slices(&mut $io_slices, n),
30          }
31        }
32      }
33    }
34  }};
35}
36
37#[cfg(feature = "async-net")]
38mod async_net;
39mod bytes_stream;
40#[cfg(feature = "embassy-net")]
41mod embassy_net;
42#[cfg(feature = "std")]
43mod std;
44mod stream_reader;
45mod stream_with_tls;
46mod stream_writer;
47#[cfg(feature = "tokio")]
48mod tokio;
49#[cfg(feature = "tokio-rustls")]
50mod tokio_rustls;
51
52pub use bytes_stream::BytesStream;
53pub use stream_reader::StreamReader;
54pub use stream_with_tls::StreamWithTls;
55pub use stream_writer::StreamWriter;
56
57/// A stream of values produced asynchronously.
58pub trait Stream: StreamReader + StreamWriter {}
59
60impl<T> Stream for T where T: StreamReader + StreamWriter {}
61
62#[cfg(feature = "std")]
63fn convert_to_io_slices<'buffer, 'bytes>(
64  buffer: &'buffer mut [::std::io::IoSlice<'bytes>; 8],
65  elems: &[&'bytes [u8]],
66) -> crate::Result<&'buffer mut [::std::io::IoSlice<'bytes>]> {
67  if elems.len() > 8 {
68    return crate::misc::unlikely_elem(Err(crate::Error::VectoredWriteOverflow));
69  }
70  for (elem, io_slice) in elems.iter().zip(&mut *buffer) {
71    *io_slice = ::std::io::IoSlice::new(elem);
72  }
73  Ok(buffer.get_mut(..elems.len()).unwrap_or_default())
74}