async_proto/impls/
bytes.rs1use {
2 std::{
3 future::Future,
4 io::prelude::*,
5 pin::Pin,
6 },
7 bytes::Bytes,
8 fallible_collections::FallibleVec as _,
9 tokio::io::{
10 AsyncRead,
11 AsyncReadExt as _,
12 AsyncWrite,
13 AsyncWriteExt as _,
14 },
15 crate::{
16 ErrorContext,
17 Protocol,
18 ReadError,
19 WriteError,
20 },
21};
22
23#[cfg_attr(docsrs, doc(cfg(feature = "bytes")))]
27impl Protocol for Bytes {
28 fn read<'a, R: AsyncRead + Unpin + Send + 'a>(stream: &'a mut R) -> Pin<Box<dyn Future<Output = Result<Self, ReadError>> + Send + 'a>> {
29 Box::pin(async move {
30 let len = u64::read(stream).await?;
31 let mut buf = Vec::default();
32 buf.try_resize(usize::try_from(len).map_err(|e| ReadError {
33 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
34 kind: e.into(),
35 })?, 0).map_err(|e| ReadError {
36 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
37 kind: e.into(),
38 })?;
39 stream.read_exact(&mut buf).await.map_err(|e| ReadError {
40 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
41 kind: e.into(),
42 })?;
43 Ok(buf.into())
44 })
45 }
46
47 fn write<'a, W: AsyncWrite + Unpin + Send + 'a>(&'a self, sink: &'a mut W) -> Pin<Box<dyn Future<Output = Result<(), WriteError>> + Send + 'a>> {
48 Box::pin(async move {
49 u64::try_from(self.len()).map_err(|e| WriteError {
50 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
51 kind: e.into(),
52 })?.write(sink).await?;
53 sink.write_all(self).await.map_err(|e| WriteError {
54 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
55 kind: e.into(),
56 })?;
57 Ok(())
58 })
59 }
60
61 fn read_sync(stream: &mut impl Read) -> Result<Self, ReadError> {
62 let len = u64::read_sync(stream)?;
63 let mut buf = Vec::default();
64 buf.try_resize(usize::try_from(len).map_err(|e| ReadError {
65 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
66 kind: e.into(),
67 })?, 0).map_err(|e| ReadError {
68 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
69 kind: e.into(),
70 })?;
71 stream.read_exact(&mut buf).map_err(|e| ReadError {
72 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
73 kind: e.into(),
74 })?;
75 Ok(buf.into())
76 }
77
78 fn write_sync(&self, sink: &mut impl Write) -> Result<(), WriteError> {
79 u64::try_from(self.len()).map_err(|e| WriteError {
80 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
81 kind: e.into(),
82 })?.write_sync(sink)?;
83 sink.write_all(self).map_err(|e| WriteError {
84 context: ErrorContext::BuiltIn { for_type: "bytes::Bytes" },
85 kind: e.into(),
86 })?;
87 Ok(())
88 }
89}