forest/libp2p_bitswap/internals/
codec.rs1use std::io;
5
6use async_trait::async_trait;
7use asynchronous_codec::{FramedRead, FramedWrite};
8use futures::{
9 SinkExt, StreamExt,
10 io::{AsyncRead, AsyncWrite},
11};
12use libp2p::request_response;
13
14use crate::libp2p_bitswap::{bitswap_pb::mod_Message::BlockPresenceType, prefix::Prefix, *};
15
16const MAX_BUF_SIZE: usize = 1024 * 1024 * 2;
18
19fn codec() -> quick_protobuf_codec::Codec<bitswap_pb::Message> {
20 quick_protobuf_codec::Codec::<bitswap_pb::Message>::new(MAX_BUF_SIZE)
21}
22
23#[derive(Default, Debug, Clone)]
24pub struct BitswapRequestResponseCodec;
25
26#[async_trait]
27impl request_response::Codec for BitswapRequestResponseCodec {
28 type Protocol = &'static str;
29 type Request = Vec<BitswapMessage>;
30 type Response = ();
31
32 async fn read_request<T>(&mut self, _: &Self::Protocol, io: &mut T) -> IOResult<Self::Request>
33 where
34 T: AsyncRead + Send + Unpin,
35 {
36 let pb_msg: bitswap_pb::Message = FramedRead::new(io, codec())
37 .next()
38 .await
39 .ok_or(std::io::ErrorKind::UnexpectedEof)??;
40
41 metrics::inbound_stream_count().inc();
42
43 let mut parts = vec![];
44 for entry in pb_msg.wantlist.unwrap_or_default().entries {
45 let cid = Cid::try_from(entry.block).map_err(io::Error::other)?;
46 parts.push(BitswapMessage::Request(BitswapRequest {
47 ty: entry.wantType.into(),
48 cid,
49 send_dont_have: entry.sendDontHave,
50 cancel: entry.cancel,
51 }));
52 }
53
54 for payload in pb_msg.payload {
55 let prefix = Prefix::new(&payload.prefix).map_err(io::Error::other)?;
56 let cid = prefix.to_cid(&payload.data).map_err(io::Error::other)?;
57 parts.push(BitswapMessage::Response(
58 cid,
59 BitswapResponse::Block(payload.data.to_vec()),
60 ));
61 }
62
63 for presence in pb_msg.blockPresences {
64 let cid = Cid::try_from(presence.cid).map_err(io::Error::other)?;
65 let have = presence.type_pb == BlockPresenceType::Have;
66 parts.push(BitswapMessage::Response(cid, BitswapResponse::Have(have)));
67 }
68
69 Ok(parts)
70 }
71
72 async fn read_response<T>(&mut self, _: &Self::Protocol, _: &mut T) -> IOResult<Self::Response>
76 where
77 T: AsyncRead + Send + Unpin,
78 {
79 Ok(())
80 }
81
82 async fn write_request<T>(
84 &mut self,
85 _: &Self::Protocol,
86 io: &mut T,
87 mut messages: Self::Request,
88 ) -> IOResult<()>
89 where
90 T: AsyncWrite + Send + Unpin,
91 {
92 assert_eq!(
93 messages.len(),
94 1,
95 "It's only supported to send a single message" );
97
98 let data = messages.swap_remove(0).into_proto()?;
99 let mut framed = FramedWrite::new(io, codec());
100 framed.send(data).await?;
101 framed.close().await?;
102
103 metrics::outbound_stream_count().inc();
104
105 Ok(())
106 }
107
108 async fn write_response<T>(
110 &mut self,
111 _: &Self::Protocol,
112 _: &mut T,
113 _: Self::Response,
114 ) -> IOResult<()>
115 where
116 T: AsyncWrite + Send + Unpin,
117 {
118 Ok(())
119 }
120}