forest/libp2p/rpc/
decoder.rs1use std::{io, marker::PhantomData, pin::Pin, task::Poll};
5
6use bytes::BytesMut;
7use futures::prelude::*;
8use pin_project_lite::pin_project;
9use tracing::warn;
10
11pin_project! {
12 #[derive(Debug)]
13 pub(super) struct DagCborDecodingReader<B, T> {
14 #[pin]
15 io: B,
16 max_bytes_allowed: usize,
17 bytes: BytesMut,
18 bytes_read: usize,
19 _pd: PhantomData<T>,
20 }
21}
22
23impl<B, T> DagCborDecodingReader<B, T> {
24 pub(super) fn new(io: B, max_bytes_allowed: usize) -> Self {
26 Self {
27 io,
28 max_bytes_allowed,
29 bytes: BytesMut::new(),
30 bytes_read: 0,
31 _pd: Default::default(),
32 }
33 }
34}
35
36impl<B, T> Future for DagCborDecodingReader<B, T>
37where
38 B: AsyncRead,
39 T: serde::de::DeserializeOwned,
40{
41 type Output = io::Result<T>;
42
43 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
44 let mut buf = [0u8; 8 * 1024];
46 loop {
47 let this = self.as_mut().project();
48 let n = std::task::ready!(this.io.poll_read(cx, &mut buf))?;
49 if n == 0 {
51 let item =
52 serde_ipld_dagcbor::de::from_reader(&self.bytes[..]).map_err(io::Error::other);
53 return Poll::Ready(item);
54 }
55 *this.bytes_read += n;
56 if *this.max_bytes_allowed > 0 && *this.bytes_read > *this.max_bytes_allowed {
57 let err = io::Error::other(format!(
58 "Buffer size exceeds the maximum allowed {}B",
59 *this.max_bytes_allowed,
60 ));
61 warn!("{err}");
62 return Poll::Ready(Err(err));
63 }
64 #[allow(clippy::indexing_slicing)]
65 this.bytes.extend_from_slice(&buf[..n.min(buf.len())]);
66 if let Ok(r) = serde_ipld_dagcbor::de::from_reader(&this.bytes[..]) {
71 return Poll::Ready(Ok(r));
72 }
73 }
74 }
75}