use crate::{
request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig},
types::ProtocolName,
MAX_RESPONSE_SIZE,
};
use cid::{Error as CidError, Version as CidVersion};
use futures::StreamExt;
use log::{debug, error, trace};
use prost::Message;
use sc_client_api::BlockBackend;
use sc_network_types::PeerId;
use schema::bitswap::{
message::{wantlist::WantType, Block as MessageBlock, BlockPresence, BlockPresenceType},
Message as BitswapMessage,
};
use sp_core::H256;
use sp_runtime::traits::Block as BlockT;
use std::{io, sync::Arc, time::Duration};
use unsigned_varint::encode as varint_encode;
mod client;
pub(crate) mod schema;
pub use cid::Cid;
pub use client::{
request_bitswap_blocks, request_bitswap_blocks_unverified, BitswapError, FetchOutcome,
BLAKE2B_256_MULTIHASH_CODE, KECCAK_256_MULTIHASH_CODE, SHA2_256_MULTIHASH_CODE,
};
pub(crate) use schema::bitswap::Message as BitswapProtoMessage;
pub(crate) const LOG_TARGET: &str = "sub-libp2p::bitswap";
const MAX_PACKET_SIZE: u64 = MAX_RESPONSE_SIZE;
const MAX_REQUEST_QUEUE: usize = 20;
pub const MAX_WANTED_BLOCKS: usize = 16;
pub(crate) const PROTOCOL_NAME: &str = "/ipfs/bitswap/1.2.0";
pub const RAW_CODEC: u64 = 0x55;
pub fn is_cid_supported(cid: &Cid) -> bool {
cid.version() != CidVersion::V0 &&
cid.hash().size() == 32 &&
is_supported_multihash_code(cid.hash().code())
}
pub(crate) fn is_supported_multihash_code(code: u64) -> bool {
matches!(code, BLAKE2B_256_MULTIHASH_CODE | SHA2_256_MULTIHASH_CODE | KECCAK_256_MULTIHASH_CODE)
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub(crate) struct Prefix {
pub version: CidVersion,
pub codec: u64,
pub mh_type: u64,
pub mh_len: u8,
}
impl From<&Cid> for Prefix {
fn from(cid: &Cid) -> Self {
Self {
version: cid.version(),
codec: cid.codec(),
mh_type: cid.hash().code(),
mh_len: cid.hash().size(),
}
}
}
impl Prefix {
pub(crate) fn to_bytes(&self) -> Vec<u8> {
let mut res = Vec::with_capacity(4);
let mut buf = varint_encode::u64_buffer();
let version = varint_encode::u64(self.version.into(), &mut buf);
res.extend_from_slice(version);
let mut buf = varint_encode::u64_buffer();
let codec = varint_encode::u64(self.codec, &mut buf);
res.extend_from_slice(codec);
let mut buf = varint_encode::u64_buffer();
let mh_type = varint_encode::u64(self.mh_type, &mut buf);
res.extend_from_slice(mh_type);
let mut buf = varint_encode::u64_buffer();
let mh_len = varint_encode::u64(self.mh_len as u64, &mut buf);
res.extend_from_slice(mh_len);
res
}
}
pub(crate) struct BitswapRequestHandler<B> {
client: Arc<dyn BlockBackend<B> + Send + Sync>,
request_receiver: async_channel::Receiver<IncomingRequest>,
}
impl<B: BlockT> BitswapRequestHandler<B> {
pub(crate) fn new(client: Arc<dyn BlockBackend<B> + Send + Sync>) -> (Self, ProtocolConfig) {
let (tx, request_receiver) = async_channel::bounded(MAX_REQUEST_QUEUE);
let config = ProtocolConfig {
name: ProtocolName::from(PROTOCOL_NAME),
fallback_names: vec![],
max_request_size: MAX_PACKET_SIZE,
max_response_size: MAX_PACKET_SIZE,
request_timeout: Duration::from_secs(15),
inbound_queue: Some(tx),
};
(Self { client, request_receiver }, config)
}
pub(crate) async fn run(mut self) {
while let Some(request) = self.request_receiver.next().await {
let IncomingRequest { peer, payload, pending_response } = request;
match self.handle_message(&peer, &payload) {
Ok(response) => {
let response = OutgoingResponse {
result: Ok(response),
reputation_changes: Vec::new(),
sent_feedback: None,
};
match pending_response.send(response) {
Ok(()) => {
trace!(target: LOG_TARGET, "Handled bitswap request from {peer}.",)
},
Err(_) => debug!(
target: LOG_TARGET,
"Failed to handle bitswap request from {peer}: {}",
RequestHandlerError::SendResponse,
),
}
},
Err(err) => {
error!(target: LOG_TARGET, "Failed to process request from {peer}: {err}");
let response = OutgoingResponse {
result: Err(()),
reputation_changes: vec![],
sent_feedback: None,
};
if pending_response.send(response).is_err() {
debug!(
target: LOG_TARGET,
"Failed to handle bitswap request from {peer}: {}",
RequestHandlerError::SendResponse,
);
}
},
}
}
}
fn handle_message(
&mut self,
peer: &PeerId,
payload: &[u8],
) -> Result<Vec<u8>, RequestHandlerError> {
let request = schema::bitswap::Message::decode(payload)?;
trace!(target: LOG_TARGET, "Received request: {:?} from {}", request, peer);
let mut response = BitswapMessage::default();
let wantlist = match request.wantlist {
Some(wantlist) => wantlist,
None => {
debug!(target: LOG_TARGET, "Unexpected bitswap message from {}", peer);
return Err(RequestHandlerError::InvalidWantList);
},
};
if wantlist.entries.len() > MAX_WANTED_BLOCKS {
trace!(target: LOG_TARGET, "Ignored request: too many entries");
return Err(RequestHandlerError::TooManyEntries);
}
for entry in wantlist.entries {
let cid = match Cid::read_bytes(entry.block.as_slice()) {
Ok(cid) => cid,
Err(e) => {
trace!(target: LOG_TARGET, "Bad CID {:?}: {:?}", entry.block, e);
continue;
},
};
if !is_cid_supported(&cid) {
trace!(target: LOG_TARGET, "Ignoring unsupported CID {}: {}", peer, cid);
continue;
}
let mut hash = H256::default();
hash.as_mut().copy_from_slice(&cid.hash().digest()[0..32]);
let transaction = match self.client.indexed_transaction(hash) {
Ok(ex) => ex,
Err(e) => {
error!(target: LOG_TARGET, "Error retrieving transaction {}: {}", hash, e);
None
},
};
match transaction {
Some(transaction) => {
trace!(target: LOG_TARGET, "Found CID {:?}, hash {:?}", cid, hash);
if entry.want_type == WantType::Block as i32 {
let prefix: Prefix = (&cid).into();
response
.payload
.push(MessageBlock { prefix: prefix.to_bytes(), data: transaction });
} else {
response.block_presences.push(BlockPresence {
r#type: BlockPresenceType::Have as i32,
cid: cid.to_bytes(),
});
}
},
None => {
trace!(target: LOG_TARGET, "Missing CID {:?}, hash {:?}", cid, hash);
if entry.send_dont_have {
response.block_presences.push(BlockPresence {
r#type: BlockPresenceType::DontHave as i32,
cid: cid.to_bytes(),
});
}
},
}
}
Ok(response.encode_to_vec())
}
}
#[derive(Debug, thiserror::Error)]
enum RequestHandlerError {
#[error("Failed to decode request: {0}.")]
DecodeProto(#[from] prost::DecodeError),
#[error("Failed to encode response: {0}.")]
EncodeProto(#[from] prost::EncodeError),
#[error(transparent)]
Client(#[from] sp_blockchain::Error),
#[error(transparent)]
BadCid(#[from] CidError),
#[error(transparent)]
Read(#[from] io::Error),
#[error("Failed to send response.")]
SendResponse,
#[error("Invalid WANT list.")]
InvalidWantList,
#[error("Too many block entries in the request.")]
TooManyEntries,
}
#[cfg(test)]
mod tests {
use super::*;
use futures::channel::oneshot;
use litep2p::types::multihash::Code as LiteP2pCode;
use sc_block_builder::BlockBuilderBuilder;
use schema::bitswap::{
message::{wantlist::Entry, Wantlist},
Message as BitswapMessage,
};
use sp_consensus::BlockOrigin;
use sp_runtime::codec::Encode;
use substrate_test_runtime::ExtrinsicBuilder;
use substrate_test_runtime_client::{self, prelude::*, TestClientBuilder};
#[tokio::test]
async fn undecodable_message() {
let client = substrate_test_runtime_client::new();
let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
tokio::spawn(async move { bitswap.run().await });
let (tx, rx) = oneshot::channel();
config
.inbound_queue
.unwrap()
.send(IncomingRequest {
peer: PeerId::random(),
payload: vec![0x13, 0x37, 0x13, 0x38],
pending_response: tx,
})
.await
.unwrap();
if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
assert_eq!(result, Err(()));
assert_eq!(reputation_changes, Vec::new());
assert!(sent_feedback.is_none());
} else {
panic!("invalid event received");
}
}
#[tokio::test]
async fn empty_want_list() {
let client = substrate_test_runtime_client::new();
let (bitswap, mut config) = BitswapRequestHandler::new(Arc::new(client));
tokio::spawn(async move { bitswap.run().await });
let (tx, rx) = oneshot::channel();
config
.inbound_queue
.as_mut()
.unwrap()
.send(IncomingRequest {
peer: PeerId::random(),
payload: BitswapMessage { wantlist: None, ..Default::default() }.encode_to_vec(),
pending_response: tx,
})
.await
.unwrap();
if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
assert_eq!(result, Err(()));
assert_eq!(reputation_changes, Vec::new());
assert!(sent_feedback.is_none());
} else {
panic!("invalid event received");
}
let (tx, rx) = oneshot::channel();
config
.inbound_queue
.unwrap()
.send(IncomingRequest {
peer: PeerId::random(),
payload: BitswapMessage {
wantlist: Some(Default::default()),
..Default::default()
}
.encode_to_vec(),
pending_response: tx,
})
.await
.unwrap();
if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
assert_eq!(result, Ok(BitswapMessage::default().encode_to_vec()));
assert_eq!(reputation_changes, Vec::new());
assert!(sent_feedback.is_none());
} else {
panic!("invalid event received");
}
}
#[tokio::test]
async fn too_long_want_list() {
let client = substrate_test_runtime_client::new();
let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
tokio::spawn(async move { bitswap.run().await });
let (tx, rx) = oneshot::channel();
config
.inbound_queue
.unwrap()
.send(IncomingRequest {
peer: PeerId::random(),
payload: BitswapMessage {
wantlist: Some(Wantlist {
entries: (0..MAX_WANTED_BLOCKS + 1)
.map(|_| Entry::default())
.collect::<Vec<_>>(),
full: false,
}),
..Default::default()
}
.encode_to_vec(),
pending_response: tx,
})
.await
.unwrap();
if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
assert_eq!(result, Err(()));
assert_eq!(reputation_changes, Vec::new());
assert!(sent_feedback.is_none());
} else {
panic!("invalid event received");
}
}
#[tokio::test]
async fn transaction_not_found() {
let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
tokio::spawn(async move { bitswap.run().await });
let (tx, rx) = oneshot::channel();
config
.inbound_queue
.unwrap()
.send(IncomingRequest {
peer: PeerId::random(),
payload: BitswapMessage {
wantlist: Some(Wantlist {
entries: vec![Entry {
block: cid::Cid::new_v1(
0x70,
cid::multihash::Multihash::wrap(
u64::from(LiteP2pCode::Blake2b256),
&[0u8; 32],
)
.unwrap(),
)
.to_bytes(),
..Default::default()
}],
full: false,
}),
..Default::default()
}
.encode_to_vec(),
pending_response: tx,
})
.await
.unwrap();
if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
assert_eq!(result, Ok(vec![]));
assert_eq!(reputation_changes, Vec::new());
assert!(sent_feedback.is_none());
} else {
panic!("invalid event received");
}
}
#[tokio::test]
async fn transaction_found() {
let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
let mut block_builder = BlockBuilderBuilder::new(&client)
.on_parent_block(client.chain_info().genesis_hash)
.with_parent_block_number(0)
.build()
.unwrap();
let ext = ExtrinsicBuilder::new_indexed_call(vec![0x13, 0x37, 0x13, 0x38]).build();
let pattern_index = ext.encoded_size() - 4;
block_builder.push(ext.clone()).unwrap();
let block = block_builder.build().unwrap().block;
client.import(BlockOrigin::File, block).await.unwrap();
let (bitswap, config) = BitswapRequestHandler::new(Arc::new(client));
tokio::spawn(async move { bitswap.run().await });
let (tx, rx) = oneshot::channel();
config
.inbound_queue
.unwrap()
.send(IncomingRequest {
peer: PeerId::random(),
payload: BitswapMessage {
wantlist: Some(Wantlist {
entries: vec![Entry {
block: cid::Cid::new_v1(
0x70,
cid::multihash::Multihash::wrap(
u64::from(LiteP2pCode::Blake2b256),
&sp_crypto_hashing::blake2_256(&ext.encode()[pattern_index..]),
)
.unwrap(),
)
.to_bytes(),
..Default::default()
}],
full: false,
}),
..Default::default()
}
.encode_to_vec(),
pending_response: tx,
})
.await
.unwrap();
if let Ok(OutgoingResponse { result, reputation_changes, sent_feedback }) = rx.await {
assert_eq!(reputation_changes, Vec::new());
assert!(sent_feedback.is_none());
let response =
schema::bitswap::Message::decode(&result.expect("fetch to succeed")[..]).unwrap();
assert_eq!(response.payload[0].data, vec![0x13, 0x37, 0x13, 0x38]);
} else {
panic!("invalid event received");
}
}
#[tokio::test]
async fn transaction_not_found_sends_dont_have_when_requested() {
let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
let (mut bitswap, _config) = BitswapRequestHandler::new(Arc::new(client));
let cid = cid::Cid::new_v1(
0x70,
cid::multihash::Multihash::wrap(u64::from(LiteP2pCode::Blake2b256), &[0u8; 32])
.unwrap(),
);
let request = BitswapMessage {
wantlist: Some(Wantlist {
entries: vec![Entry {
block: cid.to_bytes(),
send_dont_have: true,
..Default::default()
}],
full: false,
}),
..Default::default()
}
.encode_to_vec();
let response = BitswapMessage::decode(
bitswap.handle_message(&PeerId::random(), &request).unwrap().as_slice(),
)
.unwrap();
assert!(response.payload.is_empty());
assert_eq!(response.block_presences.len(), 1);
assert_eq!(response.block_presences[0].cid, cid.to_bytes());
assert_eq!(response.block_presences[0].r#type, BlockPresenceType::DontHave as i32);
}
#[tokio::test]
async fn transaction_found_sends_have_for_want_have() {
let client = TestClientBuilder::with_tx_storage(u32::MAX).build();
let mut block_builder = BlockBuilderBuilder::new(&client)
.on_parent_block(client.chain_info().genesis_hash)
.with_parent_block_number(0)
.build()
.unwrap();
let ext = ExtrinsicBuilder::new_indexed_call(vec![0x13, 0x37, 0x13, 0x38]).build();
let pattern_index = ext.encoded_size() - 4;
let cid = cid::Cid::new_v1(
0x70,
cid::multihash::Multihash::wrap(
u64::from(LiteP2pCode::Blake2b256),
&sp_crypto_hashing::blake2_256(&ext.encode()[pattern_index..]),
)
.unwrap(),
);
block_builder.push(ext).unwrap();
let block = block_builder.build().unwrap().block;
client.import(BlockOrigin::File, block).await.unwrap();
let (mut bitswap, _config) = BitswapRequestHandler::new(Arc::new(client));
let request = BitswapMessage {
wantlist: Some(Wantlist {
entries: vec![Entry {
block: cid.to_bytes(),
want_type: WantType::Have as i32,
..Default::default()
}],
full: false,
}),
..Default::default()
}
.encode_to_vec();
let response = BitswapMessage::decode(
bitswap.handle_message(&PeerId::random(), &request).unwrap().as_slice(),
)
.unwrap();
assert!(response.payload.is_empty());
assert_eq!(response.block_presences.len(), 1);
assert_eq!(response.block_presences[0].cid, cid.to_bytes());
assert_eq!(response.block_presences[0].r#type, BlockPresenceType::Have as i32);
}
#[test]
fn is_cid_supported_accepts_all_three_supported_hashings() {
use cid::multihash::Multihash;
for multihash_code in
[BLAKE2B_256_MULTIHASH_CODE, SHA2_256_MULTIHASH_CODE, KECCAK_256_MULTIHASH_CODE]
{
let digest = [9u8; 32];
let mh = Multihash::<64>::wrap(multihash_code, &digest).unwrap();
let cid = Cid::new_v1(RAW_CODEC, mh);
assert!(is_cid_supported(&cid), "{multihash_code} CID should be supported");
}
}
#[test]
fn is_cid_supported_rejects_unknown_multihash_code() {
use cid::multihash::Multihash;
let digest = [9u8; 32];
let mh = Multihash::<64>::wrap(0x99, &digest).unwrap();
let cid = Cid::new_v1(RAW_CODEC, mh);
assert!(!is_cid_supported(&cid));
}
}