mod connection_request;
mod connection_request_delivered;
mod connection_sync;
pub(crate) use connection_request::ConnectionRequestProcess;
pub(crate) use connection_request_delivered::ConnectionRequestDeliveredProcess;
pub(crate) use connection_sync::ConnectionSyncProcess;
use std::{
net::{IpAddr, SocketAddr},
time::Duration,
};
use ckb_logger::debug;
use ckb_systemtime::Instant;
use ckb_types::{packed, prelude::*};
use p2p::{multiaddr::Multiaddr, runtime, utils::multiaddr_to_socketaddr};
use tokio::net::{TcpSocket, TcpStream};
use crate::{PeerId, protocols::hole_punching::MAX_HOPS};
pub(crate) async fn try_nat_traversal(
bind_addr: Option<SocketAddr>,
addr: Multiaddr,
) -> Result<(TcpStream, Multiaddr), std::io::Error> {
let net_addr = match multiaddr_to_socketaddr(&addr) {
Some(addr) => addr,
None => {
debug!("Failed to convert multiaddr to socketaddr");
return Err(std::io::ErrorKind::InvalidInput.into());
}
};
let base_retry_interval = Duration::from_millis(200);
let timeout_duration = Duration::from_secs(30);
let start_time = Instant::now();
let mut retry_count = 0u32;
while start_time.elapsed() < timeout_duration {
retry_count += 1;
let jitter = Duration::from_millis(rand::random::<u64>() % 50);
let actual_interval = if rand::random::<bool>() {
base_retry_interval + jitter
} else {
base_retry_interval.saturating_sub(jitter)
};
let socket = create_socket(bind_addr, net_addr)?;
match runtime::timeout(
std::time::Duration::from_millis(200),
socket.connect(net_addr),
)
.await
{
Ok(Ok(stream)) => {
if let Err(err) = check_connection(&stream) {
debug!("Failed to connect to NAT(base check): {}", err);
}
return Ok((stream, addr));
}
Err(err) => {
debug!("Failed to connect to NAT(timeout): {}", err);
}
Ok(Err(err)) => {
if err.kind() == std::io::ErrorKind::AddrNotAvailable {
return Err(err);
}
debug!(
"Failed to connect to NAT(other error): {}, {}",
err.kind(),
err
);
}
}
runtime::delay_for(actual_interval).await;
}
debug!("Failed to connect to NAT after {} retries", retry_count);
Err(std::io::ErrorKind::TimedOut.into())
}
fn create_socket(
bind_addr: Option<SocketAddr>,
target_addr: SocketAddr,
) -> Result<TcpSocket, std::io::Error> {
let socket = match bind_addr {
Some(listen_addr) => match (listen_addr.ip(), target_addr.ip()) {
(IpAddr::V4(_), IpAddr::V4(_)) => {
let socket = TcpSocket::new_v4()?;
socket.set_reuseaddr(true)?;
#[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
socket.set_reuseport(true)?;
socket.bind(listen_addr)?;
socket
}
(IpAddr::V6(_), IpAddr::V6(_)) => {
let socket = TcpSocket::new_v6()?;
socket.set_reuseaddr(true)?;
#[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
socket.set_reuseport(true)?;
socket.bind(listen_addr)?;
socket
}
(IpAddr::V4(_), IpAddr::V6(_)) => TcpSocket::new_v6()?,
(IpAddr::V6(_), IpAddr::V4(_)) => TcpSocket::new_v4()?,
},
None => match target_addr.ip() {
IpAddr::V4(_) => TcpSocket::new_v4()?,
IpAddr::V6(_) => TcpSocket::new_v6()?,
},
};
Ok(socket)
}
fn check_connection(stream: &TcpStream) -> Result<(), std::io::Error> {
match stream.take_error() {
Ok(Some(err)) => Err(err),
Ok(None) => Ok(()),
Err(err) => Err(err),
}
}
pub(crate) fn init_request(
from: &PeerId,
to: &PeerId,
listen_addrs: packed::AddressVec,
) -> packed::ConnectionRequest {
packed::ConnectionRequest::new_builder()
.from(from.as_bytes())
.to(to.as_bytes())
.max_hops(MAX_HOPS)
.listen_addrs(listen_addrs)
.build()
}
pub(crate) fn forward_request(
request: packed::ConnectionRequestReader<'_>,
current_id: &PeerId,
) -> packed::ConnectionRequest {
let max_hops: u8 = request.max_hops().into();
let message = request.to_entity();
let new_route = message
.route()
.as_builder()
.push(current_id.as_bytes())
.build();
message
.as_builder()
.max_hops(max_hops.saturating_sub(1))
.route(new_route)
.build()
}
pub(crate) fn init_delivered(
request: packed::ConnectionRequestReader<'_>,
listen_addrs: packed::AddressVec,
) -> packed::ConnectionRequestDelivered {
let route = request.route();
let message = request.to_entity();
let new_route = packed::BytesVec::new_builder()
.extend(
message
.route()
.into_iter()
.take(route.len().saturating_sub(1)),
)
.build();
let sync_route = packed::BytesVec::new_builder()
.extend(
message
.route()
.into_iter()
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect::<Vec<_>>(),
)
.build();
packed::ConnectionRequestDelivered::new_builder()
.from(message.from())
.to(message.to())
.route(new_route)
.sync_route(sync_route)
.listen_addrs(listen_addrs)
.build()
}
pub(crate) fn forward_delivered(
delivered: packed::ConnectionRequestDeliveredReader<'_>,
) -> packed::ConnectionRequestDelivered {
let route = delivered.route();
let message = delivered.to_entity();
let new_route = if route.is_empty() {
packed::BytesVec::new_builder().build()
} else {
packed::BytesVec::new_builder()
.extend(
message
.route()
.into_iter()
.take(route.len().saturating_sub(1)),
)
.build()
};
message.as_builder().route(new_route).build()
}
pub(crate) fn init_sync(
delivered: packed::ConnectionRequestDeliveredReader<'_>,
) -> packed::ConnectionSync {
let sync_route = delivered.sync_route();
let message = delivered.to_entity();
let new_route = packed::BytesVec::new_builder()
.extend(
message
.sync_route()
.into_iter()
.take(sync_route.len().saturating_sub(1)),
)
.build();
packed::ConnectionSync::new_builder()
.from(message.from())
.to(message.to())
.route(new_route)
.build()
}
pub(crate) fn forward_sync(sync: packed::ConnectionSyncReader<'_>) -> packed::ConnectionSync {
let route = sync.route();
let message = sync.to_entity();
let new_route = if route.is_empty() {
packed::BytesVec::new_builder().build()
} else {
packed::BytesVec::new_builder()
.extend(
message
.route()
.into_iter()
.take(route.len().saturating_sub(1)),
)
.build()
};
message.as_builder().route(new_route).build()
}
#[cfg(test)]
mod test {
use super::*;
use crate::protocols::hole_punching::MAX_HOPS;
use ckb_types::packed;
#[test]
fn test_route() {
let from = PeerId::random();
let to = PeerId::random();
let forward_a = PeerId::random();
let forward_b = PeerId::random();
let listen_addrs = packed::AddressVec::new_builder().build();
let init_request = init_request(&from, &to, listen_addrs.clone());
assert_eq!(init_request.from(), from.as_bytes().into());
assert_eq!(init_request.to(), to.as_bytes().into());
assert_eq!(init_request.max_hops(), MAX_HOPS.into());
assert_eq!(
init_request.route().as_bytes(),
packed::BytesVec::new_builder().build().as_bytes()
);
let forward_request_a = forward_request(init_request.as_reader(), &forward_a);
assert_eq!(forward_request_a.from(), from.as_bytes().into());
assert_eq!(forward_request_a.to(), to.as_bytes().into());
assert_eq!(forward_request_a.max_hops(), (MAX_HOPS - 1).into());
assert_eq!(
forward_request_a.route().as_bytes(),
packed::BytesVec::new_builder()
.push(forward_a.as_bytes())
.build()
.as_bytes()
);
let forward_request_b = forward_request(forward_request_a.as_reader(), &forward_b);
assert_eq!(forward_request_b.from(), from.as_bytes().into());
assert_eq!(forward_request_b.to(), to.as_bytes().into());
assert_eq!(forward_request_b.max_hops(), (MAX_HOPS - 2).into());
assert_eq!(
forward_request_b.route().as_bytes(),
packed::BytesVec::new_builder()
.push(forward_a.as_bytes())
.push(forward_b.as_bytes())
.build()
.as_bytes()
);
let init_delivered = init_delivered(forward_request_b.as_reader(), listen_addrs);
assert_eq!(init_delivered.from(), from.as_bytes().into());
assert_eq!(init_delivered.to(), to.as_bytes().into());
assert_eq!(
init_delivered.route().as_bytes(),
packed::BytesVec::new_builder()
.push(forward_a.as_bytes())
.build()
.as_bytes()
);
assert_eq!(
init_delivered.sync_route().as_bytes(),
packed::BytesVec::new_builder()
.push(forward_b.as_bytes())
.push(forward_a.as_bytes())
.build()
.as_bytes()
);
assert_eq!(
init_delivered
.as_reader()
.route()
.iter()
.last()
.unwrap()
.as_slice(),
Into::<packed::Bytes>::into(forward_a.as_bytes()).as_slice()
);
let forward_delivered_b = forward_delivered(init_delivered.as_reader());
assert_eq!(forward_delivered_b.from(), from.as_bytes().into());
assert_eq!(forward_delivered_b.to(), to.as_bytes().into());
assert_eq!(
forward_delivered_b.route().as_bytes(),
packed::BytesVec::new_builder().build().as_bytes()
);
assert_eq!(
forward_delivered_b.sync_route().as_bytes(),
init_delivered.sync_route().as_bytes()
);
assert!(
forward_delivered_b
.as_reader()
.route()
.iter()
.last()
.is_none()
);
let forward_delivered_a = forward_delivered(forward_delivered_b.as_reader());
assert_eq!(forward_delivered_a.from(), from.as_bytes().into());
assert_eq!(forward_delivered_a.to(), to.as_bytes().into());
assert_eq!(
forward_delivered_a.route().as_bytes(),
packed::BytesVec::new_builder().build().as_bytes()
);
assert_eq!(
forward_delivered_a.sync_route().as_bytes(),
init_delivered.sync_route().as_bytes()
);
assert!(
forward_delivered_a
.as_reader()
.route()
.iter()
.last()
.is_none()
);
let init_sync = init_sync(forward_delivered_a.as_reader());
assert_eq!(init_sync.from(), from.as_bytes().into());
assert_eq!(init_sync.to(), to.as_bytes().into());
assert_eq!(
init_sync.route().as_bytes(),
packed::BytesVec::new_builder()
.push(forward_b.as_bytes())
.build()
.as_bytes()
);
assert_eq!(
init_sync
.as_reader()
.route()
.iter()
.last()
.unwrap()
.as_slice(),
Into::<packed::Bytes>::into(forward_b.as_bytes()).as_slice()
);
let forward_sync_a = forward_sync(init_sync.as_reader());
assert_eq!(forward_sync_a.from(), from.as_bytes().into());
assert_eq!(forward_sync_a.to(), to.as_bytes().into());
assert_eq!(
forward_sync_a.route().as_bytes(),
packed::BytesVec::new_builder().build().as_bytes()
);
assert!(forward_sync_a.as_reader().route().iter().last().is_none());
let forward_sync_b = forward_sync(forward_sync_a.as_reader());
assert_eq!(forward_sync_b.from(), from.as_bytes().into());
assert_eq!(forward_sync_b.to(), to.as_bytes().into());
assert_eq!(
forward_sync_b.route().as_bytes(),
packed::BytesVec::new_builder().build().as_bytes()
);
assert!(forward_sync_b.as_reader().route().iter().last().is_none());
}
}