use crate::{
service::AllowedKeys,
service_handle::NetworkServiceHandle,
test_utils::{
TestNode, create_whitelisted_nodes, setup_log, wait_for_all_handshakes,
wait_for_handshake_completion,
},
types::MessageRouting,
};
use blueprint_core::info;
use blueprint_crypto::k256::K256Ecdsa;
use std::{collections::HashSet, time::Duration};
use tokio::time::timeout;
const TEST_TIMEOUT: Duration = Duration::from_secs(20);
const NETWORK_NAME: &str = "gossip";
const INSTANCE_NAME: &str = "1.0.0";
const PROTOCOL_NAME: &str = "/gossip/1.0.0";
#[tokio::test]
#[serial_test::serial]
#[ignore = "CI-flaky: libp2p peer discovery + gossip convergence hangs until TEST_TIMEOUT on shared GH runners — protocol never settles in that environment. Tracked separately; test still runs locally via `cargo test -p blueprint-networking -- --ignored`. See discussion in PR #1366."]
async fn test_gossip_between_verified_peers() {
setup_log();
info!("Starting gossip test between verified peers");
let mut nodes = create_whitelisted_nodes::<K256Ecdsa>(2, NETWORK_NAME, INSTANCE_NAME, false);
let mut node2 = nodes.pop().unwrap();
let mut node1 = nodes.pop().unwrap();
info!("Starting nodes");
let handle1 = node1.start().await.expect("Failed to start node1");
let mut handle2 = node2.start().await.expect("Failed to start node2");
info!("Waiting for handshake completion");
wait_for_handshake_completion(&handle1, &handle2, TEST_TIMEOUT).await;
info!("Sending gossip message from node1");
let test_payload = b"Hello, gossip network!".to_vec();
let routing = MessageRouting {
message_id: 1,
round_id: 0,
sender: handle1.local_peer_id,
recipient: None, };
handle1
.send(routing, test_payload.clone())
.expect("Failed to send gossip message");
info!("Waiting for node2 to receive the message");
let received_message = timeout(TEST_TIMEOUT, async {
loop {
if let Some(msg) = handle2.next_protocol_message() {
if msg.protocol == PROTOCOL_NAME {
return msg;
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
.await
.expect("Timeout waiting for gossip message");
assert_eq!(received_message.payload, test_payload);
assert_eq!(received_message.protocol, PROTOCOL_NAME);
assert_eq!(received_message.routing.message_id, 1);
assert_eq!(received_message.routing.round_id, 0);
assert_eq!(received_message.routing.sender, node1.peer_id);
assert!(received_message.routing.recipient.is_none());
info!("Gossip test completed successfully");
}
#[tokio::test]
#[serial_test::serial]
#[ignore = "CI-flaky: same libp2p convergence issue as test_gossip_between_verified_peers. Runs locally via `cargo test -p blueprint-networking -- --ignored`. See PR #1366."]
async fn test_multi_node_gossip() {
setup_log();
info!("Starting multi-node gossip test");
let mut nodes = create_whitelisted_nodes::<K256Ecdsa>(3, NETWORK_NAME, INSTANCE_NAME, false);
info!("Starting all nodes");
let mut handles: Vec<_> = Vec::new();
for node in &mut nodes {
handles.push(node.start().await.expect("Failed to start node"));
}
info!("Waiting for all handshakes to complete");
let handles_refs: Vec<&mut NetworkServiceHandle<K256Ecdsa>> = handles.iter_mut().collect();
wait_for_all_handshakes(&handles_refs, TEST_TIMEOUT).await;
let test_payload = b"Multi-node gossip test".to_vec();
let routing = MessageRouting {
message_id: 1,
round_id: 0,
sender: handles[0].local_peer_id,
recipient: None,
};
info!("Sending gossip message from node 0");
handles[0]
.send(routing, test_payload.clone())
.expect("Failed to send gossip message");
info!("Waiting for all nodes to receive the message");
timeout(TEST_TIMEOUT, async {
let first_handle_peer_id = handles[0].local_peer_id;
for (i, handle) in handles.iter_mut().enumerate().skip(1) {
let received = loop {
if let Some(msg) = handle.next_protocol_message() {
if msg.protocol == PROTOCOL_NAME {
break msg;
}
}
tokio::time::sleep(Duration::from_millis(100)).await;
};
assert_eq!(
received.payload, test_payload,
"Node {} received wrong payload",
i
);
assert_eq!(received.protocol, PROTOCOL_NAME);
assert_eq!(received.routing.sender, first_handle_peer_id);
info!("Node {} received the gossip message correctly", i);
}
})
.await
.expect("Timeout waiting for gossip messages");
info!("Multi-node gossip test completed successfully");
}
#[tokio::test]
#[serial_test::serial]
async fn test_unverified_peer_gossip() {
setup_log();
info!("Starting unverified peer gossip test");
let mut node1 = TestNode::<K256Ecdsa>::new(
"test-net",
"gossip-test",
AllowedKeys::InstancePublicKeys(HashSet::new()),
vec![],
false,
);
let mut node2 = TestNode::<K256Ecdsa>::new(
"test-net",
"gossip-test",
AllowedKeys::InstancePublicKeys(HashSet::new()),
vec![],
false,
);
info!("Starting nodes");
let handle1 = node1.start().await.expect("Failed to start node1");
let mut handle2 = node2.start().await.expect("Failed to start node2");
let test_payload = b"This message should not be received".to_vec();
let routing = MessageRouting {
message_id: 1,
round_id: 0,
sender: handle1.local_peer_id,
recipient: None,
};
info!("Attempting to send gossip message from unverified node");
handle1
.send(routing, test_payload.clone())
.expect("Failed to send gossip message");
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(handle2.next_protocol_message().is_none());
info!("Unverified peer gossip test completed successfully");
}