use std::iter;
use futures::poll;
use tokio::sync::broadcast;
use tower::ServiceExt;
use zebra_chain::block;
use zebra_test::service_extensions::IsReady;
use crate::{
peer::{client::MissingInventoryCollector, ClientTestHarness},
protocol::external::InventoryHash,
PeerError, Request, SharedPeerError,
};
#[tokio::test]
async fn client_service_ok_without_readiness_check() {
let _init_guard = zebra_test::init();
let (_client, mut harness) = ClientTestHarness::build().finish();
assert!(harness.current_error().is_none());
assert!(harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_empty());
}
#[tokio::test]
async fn client_service_ready_ok() {
let _init_guard = zebra_test::init();
let (mut client, mut harness) = ClientTestHarness::build().finish();
assert!(client.is_ready().await);
assert!(harness.current_error().is_none());
assert!(harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_empty());
}
#[tokio::test]
async fn client_service_ready_drop_ok() {
let _init_guard = zebra_test::init();
let (mut client, mut harness) = ClientTestHarness::build().finish();
#[allow(unknown_lints)]
#[allow(clippy::drop_non_drop)]
std::mem::drop(client.ready());
assert!(client.is_ready().await);
assert!(harness.current_error().is_none());
assert!(harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_empty());
}
#[tokio::test]
async fn client_service_ready_multiple_ok() {
let _init_guard = zebra_test::init();
let (mut client, mut harness) = ClientTestHarness::build().finish();
assert!(client.is_ready().await);
assert!(client.is_ready().await);
assert!(harness.current_error().is_none());
assert!(harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_empty());
}
#[tokio::test]
async fn client_service_ready_heartbeat_exit() {
let _init_guard = zebra_test::init();
let (mut client, mut harness) = ClientTestHarness::build().finish();
harness.set_error(PeerError::HeartbeatTaskExited("some error".to_string()));
harness.drop_heartbeat_shutdown_receiver();
assert!(client.is_failed().await);
assert!(harness.current_error().is_some());
assert!(harness.try_to_receive_outbound_client_request().is_closed());
}
#[tokio::test]
async fn client_service_ready_request_drop() {
let _init_guard = zebra_test::init();
let (mut client, mut harness) = ClientTestHarness::build().finish();
harness.set_error(PeerError::ConnectionDropped);
harness.drop_outbound_client_request_receiver();
assert!(client.is_failed().await);
assert!(harness.current_error().is_some());
assert!(!harness.wants_connection_heartbeats());
}
#[tokio::test]
async fn client_service_ready_request_close() {
let _init_guard = zebra_test::init();
let (mut client, mut harness) = ClientTestHarness::build().finish();
harness.set_error(PeerError::ConnectionClosed);
harness.close_outbound_client_request_receiver();
assert!(client.is_failed().await);
assert!(harness.current_error().is_some());
assert!(!harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_closed());
}
#[tokio::test]
async fn client_service_ready_error_in_slot() {
let _init_guard = zebra_test::init();
let (mut client, mut harness) = ClientTestHarness::build().finish();
harness.set_error(PeerError::Overloaded);
assert!(client.is_failed().await);
assert!(harness.current_error().is_some());
assert!(!harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_closed());
}
#[tokio::test]
async fn client_service_ready_multiple_errors() {
let _init_guard = zebra_test::init();
let (mut client, mut harness) = ClientTestHarness::build().finish();
harness.set_error(PeerError::DuplicateHandshake);
harness.drop_heartbeat_shutdown_receiver();
harness.close_outbound_client_request_receiver();
assert!(client.is_failed().await);
assert!(harness.current_error().is_some());
assert!(harness.try_to_receive_outbound_client_request().is_closed());
}
#[tokio::test]
async fn client_service_drop_cleanup() {
let _init_guard = zebra_test::init();
let (client, mut harness) = ClientTestHarness::build().finish();
std::mem::drop(client);
assert!(harness.current_error().is_some());
assert!(!harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_closed());
}
#[tokio::test]
async fn client_service_handles_exited_connection_task() {
let _init_guard = zebra_test::init();
let (mut client, mut harness) = ClientTestHarness::build().finish();
harness.stop_connection_task().await;
assert!(client.is_failed().await);
assert!(harness.current_error().is_some());
assert!(!harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_closed());
}
#[tokio::test]
#[should_panic]
async fn client_service_propagates_panic_from_connection_task() {
let _init_guard = zebra_test::init();
let (mut client, _harness) = ClientTestHarness::build()
.with_connection_task(async move {
panic!("connection task failure");
})
.finish();
tokio::task::yield_now().await;
let _ = poll!(client.ready());
}
#[tokio::test]
async fn client_service_handles_exited_heartbeat_task() {
let _init_guard = zebra_test::init();
let (mut client, mut harness) = ClientTestHarness::build().finish();
harness.stop_heartbeat_task().await;
assert!(client.is_failed().await);
assert!(harness.current_error().is_some());
assert!(!harness.wants_connection_heartbeats());
assert!(harness.try_to_receive_outbound_client_request().is_closed());
}
#[tokio::test]
#[should_panic]
async fn client_service_propagates_panic_from_heartbeat_task() {
let _init_guard = zebra_test::init();
let (mut client, _harness) = ClientTestHarness::build()
.with_heartbeat_task(async move {
panic!("heartbeat task failure");
})
.finish();
tokio::task::yield_now().await;
let _ = poll!(client.ready());
}
#[test]
fn missing_inv_collector_ignores_local_registry_errors() {
let _init_guard = zebra_test::init();
let block_hash = block::Hash([0; 32]);
let request = Request::BlocksByHash(iter::once(block_hash).collect());
let response = Err(SharedPeerError::from(PeerError::NotFoundRegistry(vec![
InventoryHash::from(block_hash),
])));
let (inv_collector, mut inv_receiver) = broadcast::channel(1);
let transient_addr = "0.0.0.0:0".parse().unwrap();
let _inv_channel_guard = inv_collector.clone();
let missing_inv =
MissingInventoryCollector::new(&request, Some(inv_collector), Some(transient_addr))
.expect("unexpected invalid collector: arguments should be valid");
missing_inv.send(&response);
let recv_result = inv_receiver.try_recv();
assert_eq!(recv_result, Err(broadcast::error::TryRecvError::Empty));
}