use crate::util::protobuf;
use alloc::vec::Vec;
pub const MAX_BITSWAP_MESSAGE_SIZE: usize = 4 * 1024 * 1024;
pub const MAX_WANTED_BLOCKS: usize = 1024;
pub const MAX_RESPONSE_BLOCKS: usize = 1024;
pub const MAX_BLOCK_PRESENCES: usize = 1024;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WantType {
Block = 0,
Have = 1,
}
impl WantType {
fn from_u64(val: u64) -> Option<Self> {
match val {
0 => Some(WantType::Block),
1 => Some(WantType::Have),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BlockPresenceType {
Have = 0,
DontHave = 1,
}
impl BlockPresenceType {
fn from_u64(val: u64) -> Option<Self> {
match val {
0 => Some(BlockPresenceType::Have),
1 => Some(BlockPresenceType::DontHave),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct WantlistEntry<'a> {
pub cid: &'a [u8],
pub priority: u32,
pub cancel: bool,
pub want_type: WantType,
pub send_dont_have: bool,
}
#[derive(Debug, Clone)]
pub struct Wantlist<'a> {
pub entries: Vec<WantlistEntry<'a>>,
pub full: bool,
}
#[derive(Debug, Clone)]
pub struct Block<'a> {
pub prefix: &'a [u8],
pub data: &'a [u8],
}
#[derive(Debug, Clone)]
pub struct BlockPresence<'a> {
pub cid: &'a [u8],
pub presence_type: BlockPresenceType,
}
#[derive(Debug, Clone, Default)]
pub struct BitswapMessageRef<'a> {
pub wantlist: Option<Wantlist<'a>>,
pub blocks_legacy: Vec<&'a [u8]>,
pub payload: Vec<Block<'a>>,
pub block_presences: Vec<BlockPresence<'a>>,
pub pending_bytes: u32,
}
pub fn build_bitswap_message(
cids: impl Iterator<Item = impl AsRef<[u8]>>,
want_type: WantType,
send_dont_have: bool,
full: bool,
) -> Vec<u8> {
let cids: Vec<_> = cids.collect();
let mut entries_encoded = Vec::new();
for cid in cids {
let entry = build_wantlist_entry(cid.as_ref(), 1, want_type, send_dont_have);
for slice in protobuf::message_tag_encode(1, core::iter::once(entry.as_slice())) {
entries_encoded.extend_from_slice(slice.as_ref());
}
}
if full {
for slice in protobuf::bool_tag_encode(2, true) {
entries_encoded.extend_from_slice(slice.as_ref());
}
}
let mut out = Vec::with_capacity(entries_encoded.len() + 16);
for slice in protobuf::message_tag_encode(1, core::iter::once(entries_encoded.as_slice())) {
out.extend_from_slice(slice.as_ref());
}
out
}
fn build_wantlist_entry(
cid: &[u8],
priority: u32,
want_type: WantType,
send_dont_have: bool,
) -> Vec<u8> {
let mut entry = Vec::new();
for slice in protobuf::bytes_tag_encode(1, cid) {
entry.extend_from_slice(slice.as_ref());
}
for slice in protobuf::uint32_tag_encode(2, priority) {
entry.extend_from_slice(slice.as_ref());
}
if want_type != WantType::Block {
for slice in protobuf::enum_tag_encode(4, want_type as u64) {
entry.extend_from_slice(slice.as_ref());
}
}
if send_dont_have {
for slice in protobuf::bool_tag_encode(5, true) {
entry.extend_from_slice(slice.as_ref());
}
}
entry
}
pub fn build_bitswap_block_response(
blocks: impl Iterator<Item = (impl AsRef<[u8]>, impl AsRef<[u8]>)>,
) -> Vec<u8> {
let mut out = Vec::new();
for (prefix, data) in blocks {
let mut block_msg = Vec::new();
for slice in protobuf::bytes_tag_encode(1, prefix.as_ref()) {
block_msg.extend_from_slice(slice.as_ref());
}
for slice in protobuf::bytes_tag_encode(2, data.as_ref()) {
block_msg.extend_from_slice(slice.as_ref());
}
for slice in protobuf::message_tag_encode(3, core::iter::once(block_msg.as_slice())) {
out.extend_from_slice(slice.as_ref());
}
}
out
}
pub fn build_bitswap_presence_response(
presences: impl Iterator<Item = (impl AsRef<[u8]>, BlockPresenceType)>,
) -> Vec<u8> {
let mut out = Vec::new();
for (cid, presence_type) in presences {
let mut presence_msg = Vec::new();
for slice in protobuf::bytes_tag_encode(1, cid.as_ref()) {
presence_msg.extend_from_slice(slice.as_ref());
}
for slice in protobuf::enum_tag_encode(2, presence_type as u64) {
presence_msg.extend_from_slice(slice.as_ref());
}
for slice in protobuf::message_tag_encode(4, core::iter::once(presence_msg.as_slice())) {
out.extend_from_slice(slice.as_ref());
}
}
out
}
pub fn decode_bitswap_message(
bytes: &[u8],
) -> Result<BitswapMessageRef<'_>, DecodeBitswapMessageError> {
let mut parser = nom::combinator::all_consuming::<_, nom::error::Error<&[u8]>, _>(
nom::combinator::complete(protobuf::message_decode! {
#[optional] wantlist = 1 => protobuf::message_tag_decode(protobuf::message_decode! {
#[repeated(max = MAX_WANTED_BLOCKS)] entries = 1 => protobuf::message_tag_decode(protobuf::message_decode! {
#[optional] block = 1 => protobuf::bytes_tag_decode,
#[optional] priority = 2 => protobuf::uint32_tag_decode,
#[optional] cancel = 3 => protobuf::bool_tag_decode,
#[optional] want_type = 4 => protobuf::enum_tag_decode,
#[optional] send_dont_have = 5 => protobuf::bool_tag_decode,
}),
#[optional] full = 2 => protobuf::bool_tag_decode,
}),
#[repeated(max = MAX_RESPONSE_BLOCKS)] blocks_legacy = 2 => protobuf::bytes_tag_decode,
#[repeated(max = MAX_RESPONSE_BLOCKS)] payload = 3 => protobuf::message_tag_decode(protobuf::message_decode! {
#[optional] prefix = 1 => protobuf::bytes_tag_decode,
#[optional] data = 2 => protobuf::bytes_tag_decode,
}),
#[repeated(max = MAX_BLOCK_PRESENCES)] block_presences = 4 => protobuf::message_tag_decode(protobuf::message_decode! {
#[optional] cid = 1 => protobuf::bytes_tag_decode,
#[optional] presence_type = 2 => protobuf::enum_tag_decode,
}),
#[optional] pending_bytes = 5 => protobuf::uint32_tag_decode,
}),
);
let parsed = match nom::Finish::finish(nom::Parser::parse(&mut parser, bytes)) {
Ok((_, out)) => out,
Err(_) => return Err(DecodeBitswapMessageError::ProtobufDecode),
};
let wantlist = if let Some(wl) = parsed.wantlist {
let entries = wl
.entries
.into_iter()
.map(|e| {
Ok(WantlistEntry {
cid: e.block.ok_or(DecodeBitswapMessageError::MissingCid)?,
priority: e.priority.unwrap_or(1),
cancel: e.cancel.unwrap_or(false),
want_type: WantType::from_u64(e.want_type.unwrap_or(0))
.ok_or(DecodeBitswapMessageError::InvalidWantType)?,
send_dont_have: e.send_dont_have.unwrap_or(false),
})
})
.collect::<Result<Vec<_>, _>>()?;
Some(Wantlist {
entries,
full: wl.full.unwrap_or(false),
})
} else {
None
};
let payload = parsed
.payload
.into_iter()
.map(|b| {
Ok(Block {
prefix: b.prefix.ok_or(DecodeBitswapMessageError::MissingPrefix)?,
data: b.data.ok_or(DecodeBitswapMessageError::MissingData)?,
})
})
.collect::<Result<Vec<_>, _>>()?;
let block_presences = parsed
.block_presences
.into_iter()
.map(|bp| {
Ok(BlockPresence {
cid: bp.cid.ok_or(DecodeBitswapMessageError::MissingCid)?,
presence_type: BlockPresenceType::from_u64(bp.presence_type.unwrap_or(0))
.ok_or(DecodeBitswapMessageError::InvalidPresenceType)?,
})
})
.collect::<Result<Vec<_>, _>>()?;
Ok(BitswapMessageRef {
wantlist,
blocks_legacy: parsed.blocks_legacy,
payload,
block_presences,
pending_bytes: parsed.pending_bytes.unwrap_or(0),
})
}
#[derive(Debug, Clone, derive_more::Display, derive_more::Error)]
pub enum DecodeBitswapMessageError {
#[display("Protobuf decode error")]
ProtobufDecode,
#[display("Missing CID in wantlist entry")]
MissingCid,
#[display("Invalid want type")]
InvalidWantType,
#[display("Invalid block presence type")]
InvalidPresenceType,
#[display("Missing CID prefix in payload")]
MissingPrefix,
#[display("Missing block data in payload")]
MissingData,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn encode_decode_want_message() {
let cids = vec![[1u8; 32], [2u8; 32]];
let encoded = build_bitswap_message(cids.iter(), WantType::Block, true, false);
let decoded = decode_bitswap_message(&encoded).unwrap();
let wantlist = decoded.wantlist.unwrap();
assert_eq!(wantlist.entries.len(), 2);
assert_eq!(wantlist.entries[0].cid, &[1u8; 32]);
assert_eq!(wantlist.entries[1].cid, &[2u8; 32]);
assert_eq!(wantlist.entries[0].want_type, WantType::Block);
assert!(wantlist.entries[0].send_dont_have);
assert!(!wantlist.full);
}
#[test]
fn encode_decode_want_have() {
let cids = vec![[0xABu8; 32]];
let encoded = build_bitswap_message(cids.iter(), WantType::Have, false, true);
let decoded = decode_bitswap_message(&encoded).unwrap();
let wantlist = decoded.wantlist.unwrap();
assert_eq!(wantlist.entries.len(), 1);
assert_eq!(wantlist.entries[0].want_type, WantType::Have);
assert!(!wantlist.entries[0].send_dont_have);
assert!(wantlist.full);
}
#[test]
fn encode_decode_block_response() {
let blocks = vec![
([1u8, 2, 3, 4].as_slice(), [5u8, 6, 7, 8].as_slice()),
([9u8, 10].as_slice(), [11u8, 12, 13].as_slice()),
];
let encoded = build_bitswap_block_response(blocks.into_iter());
let decoded = decode_bitswap_message(&encoded).unwrap();
assert_eq!(decoded.payload.len(), 2);
assert_eq!(decoded.payload[0].prefix, &[1, 2, 3, 4]);
assert_eq!(decoded.payload[0].data, &[5, 6, 7, 8]);
assert_eq!(decoded.payload[1].prefix, &[9, 10]);
assert_eq!(decoded.payload[1].data, &[11, 12, 13]);
}
#[test]
fn encode_decode_presence_response() {
let presences = vec![
([1u8; 32].as_slice(), BlockPresenceType::Have),
([2u8; 32].as_slice(), BlockPresenceType::DontHave),
];
let encoded = build_bitswap_presence_response(presences.into_iter());
let decoded = decode_bitswap_message(&encoded).unwrap();
assert_eq!(decoded.block_presences.len(), 2);
assert_eq!(decoded.block_presences[0].cid, &[1u8; 32]);
assert_eq!(
decoded.block_presences[0].presence_type,
BlockPresenceType::Have
);
assert_eq!(decoded.block_presences[1].cid, &[2u8; 32]);
assert_eq!(
decoded.block_presences[1].presence_type,
BlockPresenceType::DontHave
);
}
#[test]
fn decode_empty_message() {
let decoded = decode_bitswap_message(&[]).unwrap();
assert!(decoded.wantlist.is_none());
assert!(decoded.payload.is_empty());
assert!(decoded.block_presences.is_empty());
}
#[test]
fn decode_garbage_bytes() {
assert!(matches!(
decode_bitswap_message(&[0xFF, 0xFE, 0xFD, 0xFC]),
Err(DecodeBitswapMessageError::ProtobufDecode)
));
}
#[test]
fn decode_wantlist_entry_missing_cid() {
let mut entry = Vec::new();
for slice in protobuf::uint32_tag_encode(2, 1) {
entry.extend_from_slice(slice.as_ref());
}
let mut wantlist_inner = Vec::new();
for slice in protobuf::message_tag_encode(1, core::iter::once(entry.as_slice())) {
wantlist_inner.extend_from_slice(slice.as_ref());
}
let mut message = Vec::new();
for slice in protobuf::message_tag_encode(1, core::iter::once(wantlist_inner.as_slice())) {
message.extend_from_slice(slice.as_ref());
}
assert!(matches!(
decode_bitswap_message(&message),
Err(DecodeBitswapMessageError::MissingCid)
));
}
#[test]
fn decode_invalid_want_type() {
let mut entry = Vec::new();
for slice in protobuf::bytes_tag_encode(1, &[1u8; 32]) {
entry.extend_from_slice(slice.as_ref());
}
for slice in protobuf::enum_tag_encode(4, 99) {
entry.extend_from_slice(slice.as_ref());
}
let mut wantlist_inner = Vec::new();
for slice in protobuf::message_tag_encode(1, core::iter::once(entry.as_slice())) {
wantlist_inner.extend_from_slice(slice.as_ref());
}
let mut message = Vec::new();
for slice in protobuf::message_tag_encode(1, core::iter::once(wantlist_inner.as_slice())) {
message.extend_from_slice(slice.as_ref());
}
assert!(matches!(
decode_bitswap_message(&message),
Err(DecodeBitswapMessageError::InvalidWantType)
));
}
#[test]
fn decode_invalid_presence_type() {
let mut presence = Vec::new();
for slice in protobuf::bytes_tag_encode(1, &[1u8; 32]) {
presence.extend_from_slice(slice.as_ref());
}
for slice in protobuf::enum_tag_encode(2, 5) {
presence.extend_from_slice(slice.as_ref());
}
let mut message = Vec::new();
for slice in protobuf::message_tag_encode(4, core::iter::once(presence.as_slice())) {
message.extend_from_slice(slice.as_ref());
}
assert!(matches!(
decode_bitswap_message(&message),
Err(DecodeBitswapMessageError::InvalidPresenceType)
));
}
#[test]
fn decode_presence_missing_cid() {
let mut presence = Vec::new();
for slice in protobuf::enum_tag_encode(2, 0) {
presence.extend_from_slice(slice.as_ref());
}
let mut message = Vec::new();
for slice in protobuf::message_tag_encode(4, core::iter::once(presence.as_slice())) {
message.extend_from_slice(slice.as_ref());
}
assert!(matches!(
decode_bitswap_message(&message),
Err(DecodeBitswapMessageError::MissingCid)
));
}
#[test]
fn encode_default_fields_roundtrip() {
let cids = vec![[0xAA_u8; 32]];
let encoded = build_bitswap_message(cids.iter(), WantType::Block, false, false);
let decoded = decode_bitswap_message(&encoded).unwrap();
let wantlist = decoded.wantlist.unwrap();
assert_eq!(wantlist.entries[0].want_type, WantType::Block);
assert!(!wantlist.entries[0].send_dont_have);
assert!(!wantlist.entries[0].cancel);
assert_eq!(wantlist.entries[0].priority, 1);
assert!(!wantlist.full);
}
#[test]
fn decode_message_with_pending_bytes() {
let mut message = Vec::new();
for slice in protobuf::uint32_tag_encode(5, 42) {
message.extend_from_slice(slice.as_ref());
}
let decoded = decode_bitswap_message(&message).unwrap();
assert_eq!(decoded.pending_bytes, 42);
assert!(decoded.wantlist.is_none());
}
#[test]
fn encode_empty_cid_list() {
let cids: Vec<[u8; 32]> = vec![];
let encoded = build_bitswap_message(cids.iter(), WantType::Block, false, false);
let decoded = decode_bitswap_message(&encoded).unwrap();
if let Some(wantlist) = &decoded.wantlist {
assert!(wantlist.entries.is_empty());
}
}
#[test]
fn roundtrip_single_cid() {
let cids = vec![[0xBB_u8; 32]];
let encoded = build_bitswap_message(cids.iter(), WantType::Have, true, true);
let decoded = decode_bitswap_message(&encoded).unwrap();
let wantlist = decoded.wantlist.unwrap();
assert_eq!(wantlist.entries.len(), 1);
assert_eq!(wantlist.entries[0].cid, &[0xBB_u8; 32]);
assert_eq!(wantlist.entries[0].want_type, WantType::Have);
assert!(wantlist.entries[0].send_dont_have);
assert!(wantlist.full);
}
#[test]
fn decode_blocks_legacy_field() {
let mut message = Vec::new();
for slice in protobuf::bytes_tag_encode(2, &[1u8, 2, 3, 4]) {
message.extend_from_slice(slice.as_ref());
}
for slice in protobuf::bytes_tag_encode(2, &[5u8, 6]) {
message.extend_from_slice(slice.as_ref());
}
let decoded = decode_bitswap_message(&message).unwrap();
assert_eq!(decoded.blocks_legacy.len(), 2);
assert_eq!(decoded.blocks_legacy[0], &[1, 2, 3, 4]);
assert_eq!(decoded.blocks_legacy[1], &[5, 6]);
}
}