forest/libp2p/rpc/
mod.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4mod decoder;
5use std::{io, marker::PhantomData, time::Duration};
6
7use async_trait::async_trait;
8use decoder::DagCborDecodingReader;
9use futures::prelude::*;
10use libp2p::request_response::{self, OutboundFailure};
11use serde::{Serialize, de::DeserializeOwned};
12
13/// Generic `Cbor` `RequestResponse` type. This is just needed to satisfy
14/// [`request_response::Codec`] for Hello and `ChainExchange` protocols without
15/// duplication.
16#[derive(Clone)]
17pub struct CborRequestResponse<P, RQ, RS> {
18    protocol: PhantomData<P>,
19    request: PhantomData<RQ>,
20    response: PhantomData<RS>,
21}
22
23impl<P, RQ, RS> Default for CborRequestResponse<P, RQ, RS> {
24    fn default() -> Self {
25        Self {
26            protocol: PhantomData::<P>,
27            request: PhantomData::<RQ>,
28            response: PhantomData::<RS>,
29        }
30    }
31}
32
33/// Libp2p request response outbound error type. This indicates a failure
34/// sending a request to a peer. This is different from a failure response from
35/// a node, as this is an error that prevented a response.
36///
37/// This type mirrors the internal libp2p type, but this avoids having to expose
38/// that internal type.
39#[derive(Debug, thiserror::Error)]
40pub enum RequestResponseError {
41    /// The request could not be sent because a dialing attempt failed.
42    #[error("DialFailure")]
43    DialFailure,
44    /// The request timed out before a response was received.
45    ///
46    /// It is not known whether the request may have been
47    /// received (and processed) by the remote peer.
48    #[error("Timeout")]
49    Timeout,
50    /// The connection closed before a response was received.
51    ///
52    /// It is not known whether the request may have been
53    /// received (and processed) by the remote peer.
54    #[error("ConnectionClosed")]
55    ConnectionClosed,
56    /// The remote supports none of the requested protocols.
57    #[error("UnsupportedProtocols")]
58    UnsupportedProtocols,
59    /// An IO failure happened on an outbound stream.
60    #[error("{0}")]
61    Io(io::Error),
62}
63
64impl From<OutboundFailure> for RequestResponseError {
65    fn from(err: OutboundFailure) -> Self {
66        match err {
67            OutboundFailure::DialFailure => Self::DialFailure,
68            OutboundFailure::Timeout => Self::Timeout,
69            OutboundFailure::ConnectionClosed => Self::ConnectionClosed,
70            OutboundFailure::UnsupportedProtocols => Self::UnsupportedProtocols,
71            OutboundFailure::Io(e) => Self::Io(e),
72        }
73    }
74}
75
76#[async_trait]
77impl<P, RQ, RS> request_response::Codec for CborRequestResponse<P, RQ, RS>
78where
79    P: AsRef<str> + Send + Clone,
80    RQ: Serialize + DeserializeOwned + Send + Sync,
81    RS: Serialize + DeserializeOwned + Send + Sync,
82{
83    type Protocol = P;
84    type Request = RQ;
85    type Response = RS;
86
87    async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Self::Request>
88    where
89        T: AsyncRead + Unpin + Send,
90    {
91        read_request_and_decode(io).await
92    }
93
94    async fn read_response<T>(
95        &mut self,
96        _: &Self::Protocol,
97        io: &mut T,
98    ) -> io::Result<Self::Response>
99    where
100        T: AsyncRead + Unpin + Send,
101    {
102        let mut bytes = vec![];
103        io.read_to_end(&mut bytes).await?;
104        serde_ipld_dagcbor::de::from_reader(bytes.as_slice()).map_err(io::Error::other)
105    }
106
107    async fn write_request<T>(
108        &mut self,
109        _: &Self::Protocol,
110        io: &mut T,
111        req: Self::Request,
112    ) -> io::Result<()>
113    where
114        T: AsyncWrite + Unpin + Send,
115    {
116        encode_and_write(io, req).await
117    }
118
119    async fn write_response<T>(
120        &mut self,
121        _: &Self::Protocol,
122        io: &mut T,
123        res: Self::Response,
124    ) -> io::Result<()>
125    where
126        T: AsyncWrite + Unpin + Send,
127    {
128        encode_and_write(io, res).await
129    }
130}
131
132// Because of how lotus implements the protocol, it will deadlock when calling
133// `io.ReadToEnd` on requests.
134//
135// for sending requests, the flow in lotus is
136// 1. write encoded request bytes
137// 2. wait for response
138// 3. close request stream, which sends `FIN` header over `yamux` protocol
139// if we call `io.ReadToEnd` before `FIN` is sent, it will deadlock
140//
141// but for sending responses, the flow in lotus is
142// 1. receive request
143// 2. write encode response bytes
144// 3. close response stream, which sends `FIN` header over `yamux` protocol
145// and we call `io.ReadToEnd` after `FIN` is sent, it will not deadlock
146//
147// Note: `FIN` - Performs a half-close of a stream. May be sent with a data
148// message or window update. See <https://github.com/libp2p/go-yamux/blob/master/spec.md#flag-field>
149//
150// `io` is essentially [yamux::Stream](https://docs.rs/yamux/0.11.0/yamux/struct.Stream.html)
151//
152async fn read_request_and_decode<IO, T>(io: &mut IO) -> io::Result<T>
153where
154    IO: AsyncRead + Unpin,
155    T: serde::de::DeserializeOwned,
156{
157    const MAX_BYTES_ALLOWED: usize = 2 * 1024 * 1024; // messages over 2MB are likely malicious
158    const TIMEOUT: Duration = Duration::from_secs(30);
159
160    // Currently the protocol does not send length encoded message,
161    // and we use `decode-success-with-no-trailing-data` to detect end of frame
162    // just like what `FramedRead` does, so it's possible to cause deadlock at
163    // `io.poll_ready` Adding timeout here to mitigate the issue
164    match tokio::time::timeout(TIMEOUT, DagCborDecodingReader::new(io, MAX_BYTES_ALLOWED)).await {
165        Ok(r) => r,
166        Err(_) => {
167            let err = io::Error::other("read_and_decode timeout");
168            tracing::warn!("{err}");
169            Err(err)
170        }
171    }
172}
173
174async fn encode_and_write<IO, T>(io: &mut IO, data: T) -> io::Result<()>
175where
176    IO: AsyncWrite + Unpin,
177    T: serde::Serialize,
178{
179    let bytes = fvm_ipld_encoding::to_vec(&data).map_err(io::Error::other)?;
180    io.write_all(&bytes).await?;
181    io.close().await?;
182    Ok(())
183}