async_proto/impls/
bytes.rs

1use {
2    std::{
3        future::Future,
4        io::prelude::*,
5        pin::Pin,
6    },
7    bytes::Bytes,
8    fallible_collections::FallibleVec as _,
9    tokio::io::{
10        AsyncRead,
11        AsyncReadExt as _,
12        AsyncWrite,
13        AsyncWriteExt as _,
14    },
15    crate::{
16        ErrorContext,
17        Protocol,
18        ReadError,
19        WriteError,
20    },
21};
22
23/// A [`Bytes`] is prefixed with the length as a [`u64`].
24///
25/// Using [`Bytes`] is recommended for sending large amounts of data, since the [`Protocol`] implementation for `Vec<u8>` reads and writes each byte individually.
26#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
27impl Protocol for Bytes {
28    fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
29        Box::pin(async move {
30            let len = u64::read(stream).await?;
31            let mut buf = Vec::default();
32            buf.try_resize(usize::try_from(len).map_err(|e| ReadError {
33                context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
34                kind: e.into(),
35            })?, 0).map_err(|e| ReadError {
36                context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
37                kind: e.into(),
38            })?;
39            stream.read_exact(&mut buf).await.map_err(|e| ReadError {
40                context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
41                kind: e.into(),
42            })?;
43            Ok(buf.into())
44        })
45    }
46
47    fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
48        Box::pin(async move {
49            u64::try_from(self.len()).map_err(|e| WriteError {
50                context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
51                kind: e.into(),
52            })?.write(sink).await?;
53            sink.write_all(self).await.map_err(|e| WriteError {
54                context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
55                kind: e.into(),
56            })?;
57            Ok(())
58        })
59    }
60
61    fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
62        let len = u64::read_sync(stream)?;
63        let mut buf = Vec::default();
64        buf.try_resize(usize::try_from(len).map_err(|e| ReadError {
65            context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
66            kind: e.into(),
67        })?, 0).map_err(|e| ReadError {
68            context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
69            kind: e.into(),
70        })?;
71        stream.read_exact(&mut buf).map_err(|e| ReadError {
72            context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
73            kind: e.into(),
74        })?;
75        Ok(buf.into())
76    }
77
78    fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
79        u64::try_from(self.len()).map_err(|e| WriteError {
80            context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
81            kind: e.into(),
82        })?.write_sync(sink)?;
83        sink.write_all(self).map_err(|e| WriteError {
84            context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
85            kind: e.into(),
86        })?;
87        Ok(())
88    }
89}