use std::borrow::Cow;
use ckb_logger::debug;
use ckb_systemtime::unix_time_as_millis;
use ckb_types::{packed, prelude::*};
use p2p::{
multiaddr::{Multiaddr, Protocol},
service::{ServiceAsyncControl, TargetSession},
utils::{TransportType, extract_peer_id, find_type},
};
use crate::{
PeerId, PeerIndex,
protocols::{
SupportProtocols,
hole_punching::{
ADDRS_COUNT_LIMIT, HOLE_PUNCHING_INTERVAL, HolePunching, MAX_HOPS,
component::{forward_request, init_delivered},
status::{Status, StatusCode},
},
},
};
struct RequestContent {
from: PeerId,
to: PeerId,
listen_addrs: Vec<Multiaddr>,
route: Vec<PeerId>,
max_hops: u8,
}
impl TryFrom<&packed::ConnectionRequestReader<'_>> for RequestContent {
type Error = Status;
fn try_from(value: &packed::ConnectionRequestReader<'_>) -> Result<Self, Self::Error> {
let from = PeerId::from_bytes(value.from().raw_data().to_vec()).map_err(|_| {
StatusCode::InvalidFromPeerId.with_context("the from peer id is invalid")
})?;
let to = PeerId::from_bytes(value.to().raw_data().to_vec())
.map_err(|_| StatusCode::InvalidToPeerId.with_context("the to peer id is invalid"))?;
let listen_addrs: Vec<Multiaddr> = value
.listen_addrs()
.iter()
.map(
|raw| match Multiaddr::try_from(raw.bytes().raw_data().to_vec()) {
Ok(mut addr) => {
if let Some(peer_id) = extract_peer_id(&addr) {
if peer_id != from {
return Err(StatusCode::InvalidListenAddrLen
.with_context("peer id in listen address is invalid"));
}
} else {
addr.push(Protocol::P2P(Cow::Borrowed(from.as_bytes())));
}
Ok(addr)
}
Err(_) => Err(StatusCode::InvalidListenAddrLen
.with_context("the listen address is invalid")),
},
)
.collect::<Result<Vec<_>, _>>()?;
let route: Vec<PeerId> = value
.route()
.iter()
.map(|raw| {
PeerId::from_bytes(raw.raw_data().to_vec()).map_err(|_| {
StatusCode::InvalidRoute.with_context("the route peer id is invalid")
})
})
.collect::<Result<Vec<_>, _>>()?;
let max_hops: u8 = value.max_hops().into();
Ok(Self {
from,
to,
listen_addrs,
route,
max_hops,
})
}
}
pub(crate) struct ConnectionRequestProcess<'a> {
message: packed::ConnectionRequestReader<'a>,
protocol: &'a mut HolePunching,
peer: PeerIndex,
p2p_control: &'a ServiceAsyncControl,
msg_item_id: u32,
}
impl<'a> ConnectionRequestProcess<'a> {
pub(crate) fn new(
message: packed::ConnectionRequestReader<'a>,
protocol: &'a mut HolePunching,
peer: PeerIndex,
p2p_control: &'a ServiceAsyncControl,
msg_item_id: u32,
) -> Self {
Self {
message,
protocol,
peer,
p2p_control,
msg_item_id,
}
}
pub(crate) async fn execute(mut self) -> Status {
let content = match RequestContent::try_from(&self.message) {
Ok(content) => content,
Err(status) => return status,
};
if content.listen_addrs.len() > ADDRS_COUNT_LIMIT || content.listen_addrs.is_empty() {
return StatusCode::InvalidListenAddrLen
.with_context("the listen address count is too large or empty");
}
if content.max_hops > MAX_HOPS {
return StatusCode::InvalidMaxTTL.into();
}
if content.route.len() > MAX_HOPS as usize {
return StatusCode::InvalidRoute.with_context("the route length is too long");
}
let self_peer_id = self.protocol.network_state.local_peer_id();
if content.route.contains(self_peer_id) {
return StatusCode::Ignore.with_context("the message is passed, ignore it");
}
if self
.protocol
.forward_rate_limiter
.check_key(&(content.from.clone(), content.to.clone(), self.msg_item_id))
.is_err()
{
debug!(
"from: {}, to {}, item_name: {}, rate limit is reached",
content.from, content.to, "ConnectionRequest",
);
return StatusCode::TooManyRequests.with_context("ConnectionRequest");
}
if self_peer_id == &content.to {
self.respond_delivered(content.from, &content.to, content.listen_addrs)
.await
} else if content.max_hops == 0u8 {
StatusCode::ReachedMaxHops.into()
} else {
self.forward_message(self_peer_id, &content.to).await
}
}
async fn respond_delivered(
&mut self,
from_peer_id: PeerId,
to_peer_id: &PeerId,
remote_listens: Vec<Multiaddr>,
) -> Status {
if let Some((_, t)) = self.protocol.pending_delivered.get(&from_peer_id) {
let now = unix_time_as_millis();
if now - t < HOLE_PUNCHING_INTERVAL {
return StatusCode::Ignore
.with_context("a same message is already replied in a moment ago");
}
}
let listen_addrs = {
let public_addr = self.protocol.network_state.public_addrs(ADDRS_COUNT_LIMIT);
if public_addr.len() < ADDRS_COUNT_LIMIT {
let observed_addrs = self
.protocol
.network_state
.observed_addrs(ADDRS_COUNT_LIMIT - public_addr.len());
let iter = public_addr
.iter()
.chain(observed_addrs.iter())
.map(Multiaddr::to_vec)
.map(|v| packed::Address::new_builder().bytes(v).build());
packed::AddressVec::new_builder().extend(iter).build()
} else {
let iter = public_addr
.iter()
.map(Multiaddr::to_vec)
.map(|v| packed::Address::new_builder().bytes(v).build());
packed::AddressVec::new_builder().extend(iter).build()
}
};
let content = init_delivered(self.message, listen_addrs);
let new_message = packed::HolePunchingMessage::new_builder()
.set(content)
.build()
.as_bytes();
let proto_id = SupportProtocols::HolePunching.protocol_id();
let remote_listens: Vec<Multiaddr> = remote_listens
.into_iter()
.filter_map(|addr| match find_type(&addr) {
TransportType::Memory
| TransportType::Onion
| TransportType::Ws
| TransportType::Wss
| TransportType::Tls => None,
TransportType::Tcp => {
if addr
.iter()
.any(|p| matches!(p, Protocol::Ip4(_) | Protocol::Ip6(_)))
{
Some(addr)
} else {
None
}
}
})
.collect();
if remote_listens.is_empty() {
return StatusCode::Ignore.with_context("remote listen address is empty");
}
debug!(
"current peer is the target peer {}, send a response back",
to_peer_id
);
if let Err(error) = self
.p2p_control
.send_message_to(self.peer, proto_id, new_message)
.await
{
return StatusCode::ForwardError.with_context(error);
}
let now = unix_time_as_millis();
self.protocol
.pending_delivered
.insert(from_peer_id, (remote_listens, now));
Status::ok()
}
async fn forward_message(&self, self_peer_id: &PeerId, to_peer_id: &PeerId) -> Status {
let content = forward_request(self.message, self_peer_id);
let new_message = packed::HolePunchingMessage::new_builder()
.set(content)
.build()
.as_bytes();
let proto_id = SupportProtocols::HolePunching.protocol_id();
let target_sid = self
.protocol
.network_state
.peer_registry
.read()
.get_key_by_peer_id(to_peer_id);
match target_sid {
Some(to_peer) => {
debug!(
"target peer {} is found, forward the request to it",
to_peer_id
);
if let Err(error) = self
.p2p_control
.send_message_to(to_peer, proto_id, new_message)
.await
{
StatusCode::ForwardError.with_context(error)
} else {
Status::ok()
}
}
None => {
debug!(
"target peer {} is not found, broadcast the request to more peers",
to_peer_id
);
let sid = self.peer;
let mut total = self
.protocol
.network_state
.with_peer_registry(|p| p.peers().len())
.isqrt();
if let Err(error) = self
.p2p_control
.filter_broadcast(
TargetSession::Filter(Box::new(move |id| {
if id == &sid {
return false;
}
total = total.saturating_sub(1);
total != 0
})),
proto_id,
new_message,
)
.await
{
StatusCode::BroadcastError.with_context(error)
} else {
Status::ok()
}
}
}
}
}