async_proto/impls/
bytes.rs1use {
2 std::{
3 future::Future,
4 io::prelude::*,
5 pin::Pin,
6 },
7 bytes::Bytes,
8 fallible_collections::FallibleVec as _,
9 tokio::io::{
10 AsyncRead,
11 AsyncReadExt as _,
12 AsyncWrite,
13 AsyncWriteExt as _,
14 },
15 crate::{
16 ErrorContext,
17 LengthPrefixed,
18 Protocol,
19 ReadError,
20 WriteError,
21 },
22};
23
24#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
28impl Protocol for Bytes {
29 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
30 Self::read_length_prefixed(stream, u64::MAX)
31 }
32
33 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
34 self.write_length_prefixed(sink, u64::MAX)
35 }
36
37 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
38 Self::read_length_prefixed_sync(stream, u64::MAX)
39 }
40
41 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
42 self.write_length_prefixed_sync(sink, u64::MAX)
43 }
44}
45
46#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
48impl LengthPrefixed for Bytes {
49 fn read_length_prefixed<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R, max_len: u64) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
50 Box::pin(async move {
51 let len = super::read_len(stream, max_len, || ErrorContext::BuiltIn { for_type: "bytes::Bytes" }).await?;
52 let mut buf = Vec::default();
53 buf.try_resize(len, 0).map_err(|e| ReadError {
54 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
55 kind: e.into(),
56 })?;
57 stream.read_exact(&mut buf).await.map_err(|e| ReadError {
58 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
59 kind: e.into(),
60 })?;
61 Ok(buf.into())
62 })
63 }
64
65 fn write_length_prefixed<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W, max_len: u64) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
66 Box::pin(async move {
67 super::write_len(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "bytes::Bytes" }).await?;
68 sink.write_all(self).await.map_err(|e| WriteError {
69 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
70 kind: e.into(),
71 })?;
72 Ok(())
73 })
74 }
75
76 fn read_length_prefixed_sync(stream: &mut impl Read, max_len: u64) -> Result<Self, ReadError> {
77 let len = super::read_len_sync(stream, max_len, || ErrorContext::BuiltIn { for_type: "bytes::Bytes" })?;
78 let mut buf = Vec::default();
79 buf.try_resize(len, 0).map_err(|e| ReadError {
80 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
81 kind: e.into(),
82 })?;
83 stream.read_exact(&mut buf).map_err(|e| ReadError {
84 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
85 kind: e.into(),
86 })?;
87 Ok(buf.into())
88 }
89
90 fn write_length_prefixed_sync(&self, sink: &mut impl Write, max_len: u64) -> Result<(), WriteError> {
91 super::write_len_sync(sink, self.len(), max_len, || ErrorContext::BuiltIn { for_type: "bytes::Bytes" })?;
92 sink.write_all(self).map_err(|e| WriteError {
93 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
94 kind: e.into(),
95 })?;
96 Ok(())
97 }
98}