Skip to main content

forest/libp2p/rpc/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4mod decoder;
5use std::{io, marker::PhantomData, time::Duration};
6
7use async_trait::async_trait;
8use decoder::DagCborDecodingReader;
9use futures::prelude::*;
10use libp2p::request_response::{self, OutboundFailure};
11use serde::{Serialize, de::DeserializeOwned};
12
13/// Per-protocol codec limits. Implementors set tight values for fixed-shape
14/// protocols (Hello) and generous ones for bulk transfers (`ChainExchange`).
15pub trait CodecConfig {
16    const MAX_REQUEST_BYTES: usize;
17    const MAX_RESPONSE_BYTES: usize;
18    /// Aborts the read if the peer hasn't finished writing within this window.
19    const DECODE_TIMEOUT: Duration;
20}
21
22/// Generic `Cbor` `RequestResponse` type. This is just needed to satisfy
23/// [`request_response::Codec`] for Hello and `ChainExchange` protocols without
24/// duplication.
25pub struct CborRequestResponse<P, RQ, RS, C> {
26    protocol: PhantomData<P>,
27    request: PhantomData<RQ>,
28    response: PhantomData<RS>,
29    config: PhantomData<C>,
30}
31
32// Manual impls so we don't pin `C: Copy + Clone` (auto-derive would).
33// All fields are `PhantomData`, so the type is unconditionally `Copy`.
34impl<P, RQ, RS, C> Copy for CborRequestResponse<P, RQ, RS, C> {}
35impl<P, RQ, RS, C> Clone for CborRequestResponse<P, RQ, RS, C> {
36    fn clone(&self) -> Self {
37        *self
38    }
39}
40
41impl<P, RQ, RS, C> Default for CborRequestResponse<P, RQ, RS, C> {
42    fn default() -> Self {
43        Self {
44            protocol: PhantomData,
45            request: PhantomData,
46            response: PhantomData,
47            config: PhantomData,
48        }
49    }
50}
51
52/// Libp2p request response outbound error type. This indicates a failure
53/// sending a request to a peer. This is different from a failure response from
54/// a node, as this is an error that prevented a response.
55///
56/// This type mirrors the internal libp2p type, but this avoids having to expose
57/// that internal type.
58#[derive(Debug, thiserror::Error)]
59pub enum RequestResponseError {
60    /// The request could not be sent because a dialing attempt failed.
61    #[error("DialFailure")]
62    DialFailure,
63    /// The request timed out before a response was received.
64    ///
65    /// It is not known whether the request may have been
66    /// received (and processed) by the remote peer.
67    #[error("Timeout")]
68    Timeout,
69    /// The connection closed before a response was received.
70    ///
71    /// It is not known whether the request may have been
72    /// received (and processed) by the remote peer.
73    #[error("ConnectionClosed")]
74    ConnectionClosed,
75    /// The remote supports none of the requested protocols.
76    #[error("UnsupportedProtocols")]
77    UnsupportedProtocols,
78    /// An IO failure happened on an outbound stream.
79    #[error("{0}")]
80    Io(io::Error),
81}
82
83impl From<OutboundFailure> for RequestResponseError {
84    fn from(err: OutboundFailure) -> Self {
85        match err {
86            OutboundFailure::DialFailure => Self::DialFailure,
87            OutboundFailure::Timeout => Self::Timeout,
88            OutboundFailure::ConnectionClosed => Self::ConnectionClosed,
89            OutboundFailure::UnsupportedProtocols => Self::UnsupportedProtocols,
90            OutboundFailure::Io(e) => Self::Io(e),
91        }
92    }
93}
94
95#[async_trait]
96impl<P, RQ, RS, C> request_response::Codec for CborRequestResponse<P, RQ, RS, C>
97where
98    P: AsRef<str> + Send + Clone,
99    RQ: Serialize + DeserializeOwned + Send + Sync,
100    RS: Serialize + DeserializeOwned + Send + Sync,
101    C: CodecConfig + Send + Sync,
102{
103    type Protocol = P;
104    type Request = RQ;
105    type Response = RS;
106
107    async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result<Self::Request>
108    where
109        T: AsyncRead + Unpin + Send,
110    {
111        timed_decode(io, C::MAX_REQUEST_BYTES, C::DECODE_TIMEOUT).await
112    }
113
114    async fn read_response<T>(
115        &mut self,
116        _: &Self::Protocol,
117        io: &mut T,
118    ) -> io::Result<Self::Response>
119    where
120        T: AsyncRead + Unpin + Send,
121    {
122        timed_decode(io, C::MAX_RESPONSE_BYTES, C::DECODE_TIMEOUT).await
123    }
124
125    async fn write_request<T>(
126        &mut self,
127        _: &Self::Protocol,
128        io: &mut T,
129        req: Self::Request,
130    ) -> io::Result<()>
131    where
132        T: AsyncWrite + Unpin + Send,
133    {
134        encode_and_write(io, req).await
135    }
136
137    async fn write_response<T>(
138        &mut self,
139        _: &Self::Protocol,
140        io: &mut T,
141        res: Self::Response,
142    ) -> io::Result<()>
143    where
144        T: AsyncWrite + Unpin + Send,
145    {
146        encode_and_write(io, res).await
147    }
148}
149
150// Because of how lotus implements the protocol, it will deadlock when calling
151// `io.ReadToEnd` on requests.
152//
153// for sending requests, the flow in lotus is
154// 1. write encoded request bytes
155// 2. wait for response
156// 3. close request stream, which sends `FIN` header over `yamux` protocol
157// if we call `io.ReadToEnd` before `FIN` is sent, it will deadlock
158//
159// but for sending responses, the flow in lotus is
160// 1. receive request
161// 2. write encode response bytes
162// 3. close response stream, which sends `FIN` header over `yamux` protocol
163// and we call `io.ReadToEnd` after `FIN` is sent, it will not deadlock
164//
165// Note: `FIN` - Performs a half-close of a stream. May be sent with a data
166// message or window update. See <https://github.com/libp2p/go-yamux/blob/master/spec.md#flag-field>
167//
168// `io` is essentially [yamux::Stream](https://docs.rs/yamux/0.11.0/yamux/struct.Stream.html)
169//
170/// Decodes a CBOR value from `io` with a timeout. Used by both `read_request`
171/// and `read_response` to prevent hanging on a peer that fails to send `FIN`
172/// in a timely manner.
173async fn timed_decode<IO, T>(io: &mut IO, max_bytes: usize, timeout: Duration) -> io::Result<T>
174where
175    IO: AsyncRead + Unpin,
176    T: serde::de::DeserializeOwned,
177{
178    match tokio::time::timeout(timeout, DagCborDecodingReader::new(io, max_bytes)).await {
179        Ok(r) => r,
180        Err(_) => {
181            let err = io::Error::from(io::ErrorKind::TimedOut);
182            tracing::debug!("{err}");
183            Err(err)
184        }
185    }
186}
187
188async fn encode_and_write<IO, T>(io: &mut IO, data: T) -> io::Result<()>
189where
190    IO: AsyncWrite + Unpin,
191    T: serde::Serialize,
192{
193    let bytes = fvm_ipld_encoding::to_vec(&data).map_err(io::Error::other)?;
194    io.write_all(&bytes).await?;
195    io.close().await?;
196    Ok(())
197}