Skip to main content

forest/libp2p/rpc/
decoder.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::{io, marker::PhantomData, pin::Pin, task::Poll};
5
6use bytes::BytesMut;
7use futures::prelude::*;
8use pin_project_lite::pin_project;
9use tracing::warn;
10
11pin_project! {
12    #[derive(Debug)]
13    pub(super) struct DagCborDecodingReader<B, T> {
14        #[pin]
15        io: B,
16        max_bytes_allowed: usize,
17        bytes: BytesMut,
18        bytes_read: usize,
19        _pd: PhantomData<T>,
20    }
21}
22
23impl<B, T> DagCborDecodingReader<B, T> {
24    /// `max_bytes_allowed == 0` means unlimited
25    pub(super) fn new(io: B, max_bytes_allowed: usize) -> Self {
26        Self {
27            io,
28            max_bytes_allowed,
29            bytes: BytesMut::new(),
30            bytes_read: 0,
31            _pd: Default::default(),
32        }
33    }
34}
35
36impl<B, T> Future for DagCborDecodingReader<B, T>
37where
38    B: AsyncRead,
39    T: serde::de::DeserializeOwned,
40{
41    type Output = io::Result<T>;
42
43    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
44        // https://github.com/mxinden/asynchronous-codec/blob/master/src/framed_read.rs#L161
45        let mut buf = [0u8; 8 * 1024];
46        loop {
47            let this = self.as_mut().project();
48            let n = std::task::ready!(this.io.poll_read(cx, &mut buf))?;
49            // Terminated
50            if n == 0 {
51                let item =
52                    serde_ipld_dagcbor::de::from_reader(&self.bytes[..]).map_err(io::Error::other);
53                return Poll::Ready(item);
54            }
55            *this.bytes_read += n;
56            if *this.max_bytes_allowed > 0 && *this.bytes_read > *this.max_bytes_allowed {
57                let err = io::Error::other(format!(
58                    "Buffer size exceeds the maximum allowed {}B",
59                    *this.max_bytes_allowed,
60                ));
61                warn!("{err}");
62                return Poll::Ready(Err(err));
63            }
64            #[allow(clippy::indexing_slicing)]
65            this.bytes.extend_from_slice(&buf[..n.min(buf.len())]);
66            // This is what `FramedRead` does internally
67            // Assuming io will be re-used to send new messages.
68            //
69            // Note: `from_reader` ensures no trailing data left in `bytes`
70            if let Ok(r) = serde_ipld_dagcbor::de::from_reader(&this.bytes[..]) {
71                return Poll::Ready(Ok(r));
72            }
73        }
74    }
75}