use std::{task::Poll, time::Duration};
use futures::stream::StreamExt;
use tokio_util::codec::Framed;
use crate::{
constants::CURRENT_NETWORK_PROTOCOL_VERSION,
protocol::external::{AddrInVersion, Codec, Message},
types::PeerServices,
VersionMessage,
};
use super::super::*;
#[tokio::test]
async fn connect_isolated_sends_anonymised_version_message_tcp() {
let _init_guard = zebra_test::init();
if zebra_test::net::zebra_skip_network_tests() {
return;
}
for network in Network::iter() {
connect_isolated_sends_anonymised_version_message_tcp_net(network).await;
}
}
async fn connect_isolated_sends_anonymised_version_message_tcp_net(network: Network) {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let listen_addr = listener.local_addr().unwrap();
let mut outbound_join_handle = tokio::spawn(connect_isolated_tcp_direct(
&network,
listen_addr,
"".to_string(),
));
let (inbound_conn, _) = listener.accept().await.unwrap();
let mut inbound_stream = Framed::new(
inbound_conn,
Codec::builder().for_network(&network).finish(),
);
check_version_message(network, &mut inbound_stream).await;
tokio::time::sleep(Duration::from_secs(1)).await;
let outbound_result = futures::poll!(&mut outbound_join_handle);
assert!(matches!(outbound_result, Poll::Pending));
outbound_join_handle.abort();
}
#[tokio::test]
async fn connect_isolated_sends_anonymised_version_message_mem() {
let _init_guard = zebra_test::init();
for network in Network::iter() {
connect_isolated_sends_anonymised_version_message_mem_net(network).await;
}
}
async fn connect_isolated_sends_anonymised_version_message_mem_net(network: Network) {
let (inbound_stream, outbound_stream) = tokio::io::duplex(1024);
let mut outbound_join_handle =
tokio::spawn(connect_isolated(&network, outbound_stream, "".to_string()));
let mut inbound_stream = Framed::new(
inbound_stream,
Codec::builder().for_network(&network).finish(),
);
check_version_message(network, &mut inbound_stream).await;
tokio::time::sleep(Duration::from_secs(1)).await;
let outbound_result = futures::poll!(&mut outbound_join_handle);
assert!(matches!(
outbound_result,
Poll::Pending | Poll::Ready(Ok(Err(_)))
));
outbound_join_handle.abort();
}
async fn check_version_message<PeerTransport>(
network: Network,
inbound_stream: &mut Framed<PeerTransport, Codec>,
) where
PeerTransport: AsyncRead + Unpin,
{
if let Message::Version(VersionMessage {
version,
services,
timestamp,
address_recv,
address_from,
nonce: _,
user_agent,
start_height,
relay,
}) = inbound_stream
.next()
.await
.expect("stream item")
.expect("item is Ok(msg)")
{
let mut fixed_isolated_addr: PeerSocketAddr = "0.0.0.0:0".parse().unwrap();
fixed_isolated_addr.set_port(network.default_port());
assert_eq!(version, CURRENT_NETWORK_PROTOCOL_VERSION);
assert_eq!(timestamp.timestamp() % (5 * 60), 0);
assert_eq!(services, PeerServices::empty());
assert_eq!(
address_recv,
AddrInVersion::new(fixed_isolated_addr, PeerServices::NODE_NETWORK),
);
assert_eq!(
address_from,
AddrInVersion::new(fixed_isolated_addr, PeerServices::empty()),
);
assert_eq!(user_agent, "");
assert_eq!(start_height.0, 0);
assert!(!relay);
} else {
panic!("handshake did not send version message");
}
}