async_proto/impls/
bytes.rs

1use {
2    std::{
3        future::Future,
4        io::prelude::*,
5        pin::Pin,
6    },
7    bytes::Bytes,
8    fallible_collections::FallibleVec as _,
9    tokio::io::{
10        AsyncRead,
11        AsyncReadExt as _,
12        AsyncWrite,
13        AsyncWriteExt as _,
14    },
15    crate::{
16        ErrorContext,
17        LengthPrefixed,
18        Protocol,
19        ReadError,
20        WriteError,
21    },
22};
23
24/// A [`Bytes`] is prefixed with the length as a [`u64`].
25///
26/// Using [`Bytes`] is recommended for sending large amounts of data, since the [`Protocol`] implementation for `Vec<u8>` reads and writes each byte individually.
27#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
28impl Protocol for Bytes {
29    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
30        Self::read_length_prefixed(stream, u64::MAX)
31    }
32
33    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
34        self.write_length_prefixed(sink, u64::MAX)
35    }
36
37    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
38        Self::read_length_prefixed_sync(stream, u64::MAX)
39    }
40
41    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
42        self.write_length_prefixed_sync(sink, u64::MAX)
43    }
44}
45
46/// Using [`Bytes`] is recommended for sending large amounts of data, since the [`Protocol`] implementation for `Vec<u8>` reads and writes each byte individually.
47#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
48impl LengthPrefixed for Bytes {
49    fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
50        Box::pin(async move {
51            let len = super::read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "bytes::Bytes" }).await?;
52            let mut buf = Vec::default();
53            buf.try_resize(len, 0).map_err(|e| ReadError {
54                context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
55                kind: e.into(),
56            })?;
57            stream.read_exact(&mut buf).await.map_err(|e| ReadError {
58                context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
59                kind: e.into(),
60            })?;
61            Ok(buf.into())
62        })
63    }
64
65    fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
66        Box::pin(async move {
67            super::write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "bytes::Bytes" }).await?;
68            sink.write_all(self).await.map_err(|e| WriteError {
69                context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
70                kind: e.into(),
71            })?;
72            Ok(())
73        })
74    }
75
76    fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
77        let len = super::read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "bytes::Bytes" })?;
78        let mut buf = Vec::default();
79        buf.try_resize(len, 0).map_err(|e| ReadError {
80            context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
81            kind: e.into(),
82        })?;
83        stream.read_exact(&mut buf).map_err(|e| ReadError {
84            context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
85            kind: e.into(),
86        })?;
87        Ok(buf.into())
88    }
89
90    fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
91        super::write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "bytes::Bytes" })?;
92        sink.write_all(self).map_err(|e| WriteError {
93            context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
94            kind: e.into(),
95        })?;
96        Ok(())
97    }
98}