Skip to main content

forest/libp2p_bitswap/internals/
codec.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::io;
5
6use async_trait::async_trait;
7use asynchronous_codec::{FramedRead, FramedWrite};
8use futures::{
9    SinkExt, StreamExt,
10    io::{AsyncRead, AsyncWrite},
11};
12use libp2p::request_response;
13
14use crate::libp2p_bitswap::{bitswap_pb::mod_Message::BlockPresenceType, prefix::Prefix, *};
15
16// 2MB Block Size according to the specs at https://github.com/ipfs/specs/blob/main/BITSWAP.md
17const MAX_BUF_SIZE: usize = 1024 * 1024 * 2;
18
19fn codec() -> quick_protobuf_codec::Codec<bitswap_pb::Message> {
20    quick_protobuf_codec::Codec::<bitswap_pb::Message>::new(MAX_BUF_SIZE)
21}
22
23#[derive(Default, Debug, Clone)]
24pub struct BitswapRequestResponseCodec;
25
26#[async_trait]
27impl request_response::Codec for BitswapRequestResponseCodec {
28    type Protocol = &'static str;
29    type Request = Vec<BitswapMessage>;
30    type Response = ();
31
32    async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> IOResult<Self::Request>
33    where
34        T: AsyncRead + Send + Unpin,
35    {
36        let pb_msg: bitswap_pb::Message = FramedRead::new(io, codec())
37            .next()
38            .await
39            .ok_or(std::io::ErrorKind::UnexpectedEof)??;
40
41        metrics::inbound_stream_count().inc();
42
43        let mut parts = vec![];
44        for entry in pb_msg.wantlist.unwrap_or_default().entries {
45            let cid = Cid::try_from(entry.block).map_err(io::Error::other)?;
46            parts.push(BitswapMessage::Request(BitswapRequest {
47                ty: entry.wantType.into(),
48                cid,
49                send_dont_have: entry.sendDontHave,
50                cancel: entry.cancel,
51            }));
52        }
53
54        for payload in pb_msg.payload {
55            let prefix = Prefix::new(&payload.prefix).map_err(io::Error::other)?;
56            let cid = prefix.to_cid(&payload.data).map_err(io::Error::other)?;
57            parts.push(BitswapMessage::Response(
58                cid,
59                BitswapResponse::Block(payload.data.to_vec()),
60            ));
61        }
62
63        for presence in pb_msg.blockPresences {
64            let cid = Cid::try_from(presence.cid).map_err(io::Error::other)?;
65            let have = presence.type_pb == BlockPresenceType::Have;
66            parts.push(BitswapMessage::Response(cid, BitswapResponse::Have(have)));
67        }
68
69        Ok(parts)
70    }
71
72    /// Just close the outbound stream,
73    /// the actual responses will come from new inbound stream
74    /// and be received in `read_request`
75    async fn read_response<T>(&mut self, _: &Self::Protocol, _: &mut T) -> IOResult<Self::Response>
76    where
77        T: AsyncRead + Send + Unpin,
78    {
79        Ok(())
80    }
81
82    /// Sending both `bitswap` requests and responses
83    async fn write_request<T>(
84        &mut self,
85        _: &Self::Protocol,
86        io: &mut T,
87        mut messages: Self::Request,
88    ) -> IOResult<()>
89    where
90        T: AsyncWrite + Send + Unpin,
91    {
92        assert_eq!(
93            messages.len(),
94            1,
95            "It's only supported to send a single message" // libp2p-bitswap doesn't support batch sending
96        );
97
98        let data = messages.swap_remove(0).into_proto()?;
99        let mut framed = FramedWrite::new(io, codec());
100        framed.send(data).await?;
101        framed.close().await?;
102
103        metrics::outbound_stream_count().inc();
104
105        Ok(())
106    }
107
108    // Sending `FIN` header and close the stream
109    async fn write_response<T>(
110        &mut self,
111        _: &Self::Protocol,
112        _: &mut T,
113        _: Self::Response,
114    ) -> IOResult<()>
115    where
116        T: AsyncWrite + Send + Unpin,
117    {
118        Ok(())
119    }
120}