use crate::{
behavior::{
FuelBehaviour,
FuelBehaviourEvent,
},
codecs::NetworkCodec,
config::{
build_transport,
P2PConfig,
},
discovery::DiscoveryEvent,
gossipsub::{
messages::{
GossipsubBroadcastRequest,
GossipsubMessage as FuelGossipsubMessage,
},
topics::GossipsubTopics,
},
peer_info::{
PeerInfo,
PeerInfoEvent,
},
request_response::messages::{
IntermediateResponse,
OutboundResponse,
RequestError,
RequestMessage,
ResponseChannelItem,
ResponseError,
ResponseMessage,
},
};
use fuel_metrics::p2p_metrics::P2P_METRICS;
use futures::prelude::*;
use libp2p::{
gossipsub::{
error::PublishError,
GossipsubEvent,
MessageAcceptance,
MessageId,
Topic,
TopicHash,
},
multiaddr::Protocol,
request_response::{
RequestId,
RequestResponseEvent,
RequestResponseMessage,
ResponseChannel,
},
swarm::{
AddressScore,
SwarmBuilder,
SwarmEvent,
},
Multiaddr,
PeerId,
Swarm,
};
use rand::Rng;
use std::collections::HashMap;
use tracing::{
debug,
info,
warn,
};
pub struct FuelP2PService<Codec: NetworkCodec> {
pub local_peer_id: PeerId,
swarm: Swarm<FuelBehaviour<Codec>>,
outbound_requests_table: HashMap<RequestId, ResponseChannelItem>,
inbound_requests_table: HashMap<RequestId, ResponseChannel<IntermediateResponse>>,
network_codec: Codec,
network_metadata: NetworkMetadata,
metrics: bool,
}
#[derive(Debug)]
struct NetworkMetadata {
gossipsub_topics: GossipsubTopics,
}
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum FuelP2PEvent {
GossipsubMessage {
peer_id: PeerId,
message_id: MessageId,
topic_hash: TopicHash,
message: FuelGossipsubMessage,
},
RequestMessage {
request_id: RequestId,
request_message: RequestMessage,
},
PeerConnected(PeerId),
PeerDisconnected(PeerId),
PeerInfoUpdated(PeerId),
}
impl<Codec: NetworkCodec> FuelP2PService<Codec> {
pub fn new(config: P2PConfig, codec: Codec) -> anyhow::Result<Self> {
let local_peer_id = PeerId::from(config.keypair.public());
let transport = build_transport(&config);
let behaviour = FuelBehaviour::new(&config, codec.clone());
let mut swarm =
SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id)
.build();
let listen_multiaddr = {
let mut m = Multiaddr::from(config.address);
m.push(Protocol::Tcp(config.tcp_port));
m
};
for topic in config.topics {
let t = Topic::new(format!("{}/{}", topic, config.network_name));
swarm.behaviour_mut().subscribe_to_topic(&t).unwrap();
}
swarm.listen_on(listen_multiaddr)?;
let gossipsub_topics = GossipsubTopics::new(&config.network_name);
let network_metadata = NetworkMetadata { gossipsub_topics };
let metrics = config.metrics;
if let Some(public_address) = config.public_address {
let _ = swarm.add_external_address(public_address, AddressScore::Infinite);
}
Ok(Self {
local_peer_id,
swarm,
network_codec: codec,
outbound_requests_table: HashMap::default(),
inbound_requests_table: HashMap::default(),
network_metadata,
metrics,
})
}
pub fn get_peers_info(&self) -> &HashMap<PeerId, PeerInfo> {
self.swarm.behaviour().get_peers()
}
pub fn get_peers_ids(&self) -> Vec<PeerId> {
self.get_peers_info().iter().map(|(id, _)| *id).collect()
}
pub fn publish_message(
&mut self,
message: GossipsubBroadcastRequest,
) -> Result<MessageId, PublishError> {
let topic = self
.network_metadata
.gossipsub_topics
.get_gossipsub_topic(&message);
match self.network_codec.encode(message) {
Ok(encoded_data) => self
.swarm
.behaviour_mut()
.publish_message(topic, encoded_data),
Err(e) => Err(PublishError::TransformFailed(e)),
}
}
pub fn send_request_msg(
&mut self,
peer_id: Option<PeerId>,
message_request: RequestMessage,
channel_item: ResponseChannelItem,
) -> Result<RequestId, RequestError> {
let peer_id = match peer_id {
Some(peer_id) => peer_id,
_ => {
let connected_peers = self.get_peers_info();
if connected_peers.is_empty() {
return Err(RequestError::NoPeersConnected)
}
let rand_index = rand::thread_rng().gen_range(0..connected_peers.len());
*connected_peers.keys().nth(rand_index).unwrap()
}
};
let request_id = self
.swarm
.behaviour_mut()
.send_request_msg(message_request, peer_id);
self.outbound_requests_table
.insert(request_id, channel_item);
Ok(request_id)
}
pub fn send_response_msg(
&mut self,
request_id: RequestId,
message: Option<OutboundResponse>,
) -> Result<(), ResponseError> {
if message.is_none() {
self.inbound_requests_table.remove(&request_id);
return Ok(())
}
match (
self.network_codec
.convert_to_intermediate(&message.unwrap()),
self.inbound_requests_table.remove(&request_id),
) {
(Ok(message), Some(channel)) => {
if self
.swarm
.behaviour_mut()
.send_response_msg(channel, message)
.is_err()
{
debug!("Failed to send ResponseMessage for {:?}", request_id);
return Err(ResponseError::SendingResponseFailed)
}
}
(Ok(_), None) => {
debug!("ResponseChannel for {:?} does not exist!", request_id);
return Err(ResponseError::ResponseChannelDoesNotExist)
}
(Err(e), _) => {
debug!("Failed to convert to IntermediateResponse with {:?}", e);
return Err(ResponseError::ConversionToIntermediateFailed)
}
}
Ok(())
}
pub fn report_message_validation_result(
&mut self,
msg_id: &MessageId,
propagation_source: &PeerId,
acceptance: MessageAcceptance,
) -> Result<bool, PublishError> {
self.swarm.behaviour_mut().report_message_validation_result(
msg_id,
propagation_source,
acceptance,
)
}
pub async fn next_event(&mut self) -> Option<FuelP2PEvent> {
if let SwarmEvent::Behaviour(fuel_behaviour) = self.swarm.select_next_some().await
{
self.handle_behaviour_event(fuel_behaviour)
} else {
None
}
}
fn handle_behaviour_event(
&mut self,
event: FuelBehaviourEvent,
) -> Option<FuelP2PEvent> {
match event {
FuelBehaviourEvent::Discovery(discovery_event) => match discovery_event {
DiscoveryEvent::Connected(peer_id, addresses) => {
self.swarm
.behaviour_mut()
.add_addresses_to_peer_info(&peer_id, addresses);
return Some(FuelP2PEvent::PeerConnected(peer_id))
}
DiscoveryEvent::Disconnected(peer_id) => {
return Some(FuelP2PEvent::PeerDisconnected(peer_id))
}
_ => {}
},
FuelBehaviourEvent::Gossipsub(gossipsub_event) => {
if let GossipsubEvent::Message {
propagation_source,
message,
message_id,
} = gossipsub_event
{
if let Some(correct_topic) = self
.network_metadata
.gossipsub_topics
.get_gossipsub_tag(&message.topic)
{
match self.network_codec.decode(&message.data, correct_topic) {
Ok(decoded_message) => {
return Some(FuelP2PEvent::GossipsubMessage {
peer_id: propagation_source,
message_id,
topic_hash: message.topic,
message: decoded_message,
})
}
Err(err) => {
warn!(target: "fuel-libp2p", "Failed to decode a message. ID: {}, Message: {:?} with error: {:?}", message_id, &message.data, err);
match self.report_message_validation_result(
&message_id,
&propagation_source,
MessageAcceptance::Reject,
) {
Ok(false) => {
warn!(target: "fuel-libp2p", "Message was not found in the cache, peer with PeerId: {} has been reported.", propagation_source);
}
Ok(true) => {
info!(target: "fuel-libp2p", "Message found in the cache, peer with PeerId: {} has been reported.", propagation_source);
}
Err(e) => {
warn!(target: "fuel-libp2p", "Failed to publish the message with following error: {:?}.", e);
}
}
}
}
} else {
warn!(target: "fuel-libp2p", "GossipTopicTag does not exist for {:?}", &message.topic);
}
}
}
FuelBehaviourEvent::PeerInfo(peer_info_event) => match peer_info_event {
PeerInfoEvent::PeerIdentified { peer_id, addresses } => {
if self.metrics {
P2P_METRICS.unique_peers.inc();
}
self.swarm
.behaviour_mut()
.add_addresses_to_discovery(&peer_id, addresses);
}
PeerInfoEvent::PeerInfoUpdated { peer_id } => {
return Some(FuelP2PEvent::PeerInfoUpdated(peer_id))
}
},
FuelBehaviourEvent::RequestResponse(req_res_event) => match req_res_event {
RequestResponseEvent::Message { message, .. } => match message {
RequestResponseMessage::Request {
request,
channel,
request_id,
} => {
self.inbound_requests_table.insert(request_id, channel);
return Some(FuelP2PEvent::RequestMessage {
request_id,
request_message: request,
})
}
RequestResponseMessage::Response {
request_id,
response,
} => {
match (
self.outbound_requests_table.remove(&request_id),
self.network_codec.convert_to_response(&response),
) {
(
Some(ResponseChannelItem::ResponseBlock(channel)),
Ok(ResponseMessage::ResponseBlock(block)),
) => {
if channel.send(block).is_err() {
debug!(
"Failed to send through the channel for {:?}",
request_id
);
}
}
(Some(_), Err(e)) => {
debug!("Failed to convert IntermediateResponse into a ResponseMessage {:?} with {:?}", response, e);
}
(None, Ok(_)) => {
debug!("Send channel not found for {:?}", request_id);
}
_ => {}
}
}
},
RequestResponseEvent::InboundFailure {
peer,
error,
request_id,
} => {
debug!("RequestResponse inbound error for peer: {:?} with id: {:?} and error: {:?}", peer, request_id, error);
}
RequestResponseEvent::OutboundFailure {
peer,
error,
request_id,
} => {
debug!("RequestResponse outbound error for peer: {:?} with id: {:?} and error: {:?}", peer, request_id, error);
let _ = self.outbound_requests_table.remove(&request_id);
}
_ => {}
},
}
None
}
}
#[cfg(test)]
mod tests {
use super::FuelP2PService;
use crate::{
codecs::bincode::BincodeCodec,
config::P2PConfig,
gossipsub::{
messages::{
GossipsubBroadcastRequest,
GossipsubMessage,
},
topics::{
GossipTopic,
CON_VOTE_GOSSIP_TOPIC,
NEW_BLOCK_GOSSIP_TOPIC,
NEW_TX_GOSSIP_TOPIC,
},
},
gossipsub_config::default_gossipsub_builder,
peer_info::PeerInfo,
request_response::messages::{
OutboundResponse,
RequestMessage,
ResponseChannelItem,
},
service::FuelP2PEvent,
};
use ctor::ctor;
use fuel_core_interfaces::{
common::fuel_tx::Transaction,
model::{
ConsensusVote,
FuelBlock,
FuelBlockConsensus,
PartialFuelBlockHeader,
},
};
use futures::StreamExt;
use libp2p::{
gossipsub::{
error::PublishError,
Topic,
},
identity::Keypair,
multiaddr::Protocol,
swarm::SwarmEvent,
Multiaddr,
PeerId,
};
use std::{
collections::HashSet,
net::{
IpAddr,
Ipv4Addr,
SocketAddrV4,
TcpListener,
},
sync::Arc,
time::Duration,
};
use tokio::sync::{
mpsc,
oneshot,
};
use tracing_attributes::instrument;
use tracing_subscriber::{
fmt,
layer::SubscriberExt,
EnvFilter,
};
#[ctor]
fn initialize_tracing() {
if std::env::var_os("RUST_LOG").is_some() {
let log_file = tracing_appender::rolling::daily("./logs", "p2p_logfile");
let subscriber = tracing_subscriber::registry()
.with(EnvFilter::from_default_env())
.with(fmt::Layer::new().with_writer(std::io::stderr))
.with(
fmt::Layer::new()
.compact()
.pretty()
.with_ansi(false) .with_writer(log_file),
);
tracing::subscriber::set_global_default(subscriber)
.expect("Unable to set a global subscriber");
}
}
fn build_service_from_config(
mut p2p_config: P2PConfig,
) -> FuelP2PService<BincodeCodec> {
p2p_config.keypair = Keypair::generate_secp256k1(); let max_block_size = p2p_config.max_block_size;
FuelP2PService::new(p2p_config, BincodeCodec::new(max_block_size)).unwrap()
}
fn get_unused_port() -> u16 {
let socket = SocketAddrV4::new(Ipv4Addr::from([0, 0, 0, 0]), 0);
TcpListener::bind(socket)
.and_then(|listener| listener.local_addr())
.map(|addr| addr.port())
.expect("A free tcp port exists")
}
#[derive(Debug, Clone)]
struct NodeData {
keypair: Keypair,
tcp_port: u16,
multiaddr: Multiaddr,
}
impl NodeData {
fn random() -> Self {
let keypair = Keypair::generate_secp256k1();
let tcp_port = get_unused_port();
let multiaddr = {
let mut addr =
Multiaddr::from(IpAddr::V4(Ipv4Addr::from([127, 0, 0, 1])));
addr.push(Protocol::Tcp(tcp_port));
let peer_id = PeerId::from_public_key(&keypair.public());
format!("{}/p2p/{}", addr, peer_id).parse().unwrap()
};
Self {
keypair,
tcp_port,
multiaddr,
}
}
fn create_service(
&self,
mut p2p_config: P2PConfig,
) -> FuelP2PService<BincodeCodec> {
let max_block_size = p2p_config.max_block_size;
p2p_config.tcp_port = self.tcp_port;
p2p_config.keypair = self.keypair.clone();
FuelP2PService::new(p2p_config, BincodeCodec::new(max_block_size)).unwrap()
}
}
#[tokio::test]
#[instrument]
async fn p2p_service_works() {
let mut fuel_p2p_service = build_service_from_config(
P2PConfig::default_initialized("p2p_service_works"),
);
loop {
match fuel_p2p_service.swarm.select_next_some().await {
SwarmEvent::NewListenAddr { .. } => {
break
}
other_event => {
tracing::error!("Unexpected event: {:?}", other_event);
panic!("Unexpected event")
}
}
}
}
#[tokio::test]
#[instrument]
async fn sentry_nodes_working() {
let reserved_nodes_size = 4;
let double_reserved_nodes_size = reserved_nodes_size * 2;
let mut p2p_config = P2PConfig::default_initialized("sentry_nodes_working");
p2p_config.enable_mdns = true;
let gossipsub_config = default_gossipsub_builder()
.mesh_n(double_reserved_nodes_size)
.mesh_n_low(double_reserved_nodes_size)
.mesh_n_high(double_reserved_nodes_size)
.build()
.expect("valid gossipsub configuration");
p2p_config.gossipsub_config = gossipsub_config;
let build_sentry_nodes = || {
let guarded_node = NodeData::random();
let reserved_nodes: Vec<NodeData> = (0..reserved_nodes_size)
.map(|_| NodeData::random())
.collect();
let guarded_node_service = {
let mut p2p_config = p2p_config.clone();
p2p_config.reserved_nodes = reserved_nodes
.iter()
.map(|node| node.multiaddr.clone())
.collect();
p2p_config.reserved_nodes_only_mode = true;
guarded_node.create_service(p2p_config)
};
let sentry_nodes: Vec<FuelP2PService<BincodeCodec>> = reserved_nodes
.into_iter()
.map(|node| {
let mut p2p_config = p2p_config.clone();
p2p_config.reserved_nodes = vec![guarded_node.multiaddr.clone()];
node.create_service(p2p_config)
})
.collect();
(guarded_node_service, sentry_nodes)
};
let (mut first_guarded_node, mut first_sentry_nodes) = build_sentry_nodes();
let (mut second_guarded_node, mut second_sentry_nodes) = build_sentry_nodes();
let mut first_sentry_set: HashSet<_> = first_sentry_nodes
.iter()
.map(|node| node.local_peer_id)
.collect();
let mut second_sentry_set: HashSet<_> = second_sentry_nodes
.iter()
.map(|node| node.local_peer_id)
.collect();
let mut single_sentry_node = first_sentry_nodes.pop().unwrap();
let mut sentry_node_connetions = HashSet::new();
loop {
tokio::select! {
event_from_first_guarded = first_guarded_node.next_event() => {
if let Some(FuelP2PEvent::PeerConnected(peer_id)) = event_from_first_guarded {
if !first_sentry_set.remove(&peer_id) {
panic!("The node should only connect to the specified reserved nodes!");
}
}
tracing::info!("Event from the first guarded node: {:?}", event_from_first_guarded);
},
event_from_second_guarded = second_guarded_node.next_event() => {
if let Some(FuelP2PEvent::PeerConnected(peer_id)) = event_from_second_guarded {
if !second_sentry_set.remove(&peer_id) {
panic!("The node should only connect to the specified reserved nodes!");
}
}
tracing::info!("Event from the second guarded node: {:?}", event_from_second_guarded);
},
sentry_node_event = single_sentry_node.next_event() => {
if let Some(FuelP2PEvent::PeerConnected(peer_id)) = sentry_node_event {
sentry_node_connetions.insert(peer_id);
}
if sentry_node_connetions.len() > reserved_nodes_size {
if first_sentry_set.is_empty() && first_sentry_set.is_empty() {
break;
}
}
}
_ = async {
for sentry_node in &mut first_sentry_nodes {
sentry_node.next_event().await;
}
} => {},
_ = async {
for sentry_node in &mut second_sentry_nodes {
sentry_node.next_event().await;
}
} => {}
};
}
}
#[tokio::test]
#[instrument]
async fn nodes_connected_via_mdns() {
let mut p2p_config = P2PConfig::default_initialized("nodes_connected_via_mdns");
p2p_config.enable_mdns = true;
let mut node_a = build_service_from_config(p2p_config.clone());
let mut node_b = build_service_from_config(p2p_config);
loop {
tokio::select! {
node_b_event = node_b.next_event() => {
if let Some(FuelP2PEvent::PeerConnected(_)) = node_b_event {
break
}
tracing::info!("Node B Event: {:?}", node_b_event);
},
node_a_event = node_a.swarm.select_next_some() => {
tracing::info!("Node A Event: {:?}", node_a_event);
}
};
}
}
#[tokio::test]
#[instrument]
async fn nodes_cannot_connect_due_to_different_checksum() {
use libp2p::{
swarm::DialError,
TransportError,
};
let mut p2p_config = P2PConfig::default_initialized(
"nodes_cannot_connect_due_to_different_checksum",
);
p2p_config.enable_mdns = true;
let mut node_a = build_service_from_config(p2p_config.clone());
p2p_config.checksum = [1u8; 32].into();
let mut node_b = build_service_from_config(p2p_config);
loop {
tokio::select! {
node_a_event = node_a.swarm.select_next_some() => {
tracing::info!("Node A Event: {:?}", node_a_event);
if let SwarmEvent::OutgoingConnectionError { peer_id: _, error: DialError::Transport(mut errors) } = node_a_event {
if let TransportError::Other(_) = errors.pop().unwrap().1 {
break
}
}
},
node_b_event = node_b.next_event() => {
if let Some(FuelP2PEvent::PeerConnected(_)) = node_b_event {
panic!("Node B should not connect to Node A!")
}
tracing::info!("Node B Event: {:?}", node_b_event);
},
};
}
}
#[tokio::test]
#[instrument]
async fn nodes_connected_via_identify() {
let mut p2p_config =
P2PConfig::default_initialized("nodes_connected_via_identify");
let node_a_data = NodeData::random();
let mut node_a = node_a_data.create_service(p2p_config.clone());
p2p_config.bootstrap_nodes = vec![node_a_data.multiaddr];
let mut node_b = build_service_from_config(p2p_config.clone());
let mut node_c = build_service_from_config(p2p_config);
loop {
tokio::select! {
node_a_event = node_a.next_event() => {
tracing::info!("Node A Event: {:?}", node_a_event);
},
node_b_event = node_b.next_event() => {
tracing::info!("Node B Event: {:?}", node_b_event);
},
node_c_event = node_c.next_event() => {
if let Some(FuelP2PEvent::PeerConnected(peer_id)) = node_c_event {
if peer_id == node_b.local_peer_id {
break
}
}
tracing::info!("Node C Event: {:?}", node_c_event);
}
};
}
}
#[tokio::test]
#[instrument]
async fn peer_info_updates_work() {
let mut p2p_config = P2PConfig::default_initialized("peer_info_updates_work");
let node_a_data = NodeData::random();
let mut node_a = node_a_data.create_service(p2p_config.clone());
p2p_config.bootstrap_nodes = vec![node_a_data.multiaddr];
let mut node_b = build_service_from_config(p2p_config);
loop {
tokio::select! {
node_a_event = node_a.next_event() => {
if let Some(FuelP2PEvent::PeerInfoUpdated(peer_id)) = node_a_event {
if let Some(PeerInfo { peer_addresses, latest_ping, client_version, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) {
if !peer_addresses.is_empty() && client_version.is_some() && latest_ping.is_some() {
break;
}
}
}
tracing::info!("Node A Event: {:?}", node_a_event);
},
node_b_event = node_b.next_event() => {
tracing::info!("Node B Event: {:?}", node_b_event);
}
};
}
}
#[tokio::test]
#[instrument]
async fn gossipsub_broadcast_tx() {
gossipsub_broadcast(GossipsubBroadcastRequest::NewTx(Arc::new(
Transaction::default(),
)))
.await;
}
#[tokio::test]
#[instrument]
async fn gossipsub_broadcast_vote() {
gossipsub_broadcast(GossipsubBroadcastRequest::ConsensusVote(Arc::new(
ConsensusVote::default(),
)))
.await;
}
#[tokio::test]
#[instrument]
async fn gossipsub_broadcast_block() {
gossipsub_broadcast(GossipsubBroadcastRequest::NewBlock(Arc::new(
FuelBlock::default(),
)))
.await;
}
async fn gossipsub_broadcast(broadcast_request: GossipsubBroadcastRequest) {
let mut p2p_config =
P2PConfig::default_initialized("gossipsub_exchanges_messages");
let selected_topic: GossipTopic = {
let topic = match broadcast_request {
GossipsubBroadcastRequest::ConsensusVote(_) => CON_VOTE_GOSSIP_TOPIC,
GossipsubBroadcastRequest::NewBlock(_) => NEW_BLOCK_GOSSIP_TOPIC,
GossipsubBroadcastRequest::NewTx(_) => NEW_TX_GOSSIP_TOPIC,
};
Topic::new(format!("{}/{}", topic, p2p_config.network_name))
};
let mut message_sent = false;
let node_a_data = NodeData::random();
let mut node_a = node_a_data.create_service(p2p_config.clone());
p2p_config.bootstrap_nodes = vec![node_a_data.multiaddr];
let mut node_b = build_service_from_config(p2p_config.clone());
loop {
tokio::select! {
node_a_event = node_a.next_event() => {
if let Some(FuelP2PEvent::PeerInfoUpdated(peer_id)) = node_a_event {
if let Some(PeerInfo { peer_addresses, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) {
if !peer_addresses.is_empty() && !message_sent {
message_sent = true;
let broadcast_request = broadcast_request.clone();
node_a.publish_message(broadcast_request).unwrap();
}
}
}
tracing::info!("Node A Event: {:?}", node_a_event);
},
node_b_event = node_b.next_event() => {
if let Some(FuelP2PEvent::GossipsubMessage { topic_hash, message, .. }) = node_b_event.clone() {
if topic_hash != selected_topic.hash() {
tracing::error!("Wrong topic hash, expected: {} - actual: {}", selected_topic.hash(), topic_hash);
panic!("Wrong Topic");
}
match &message {
GossipsubMessage::NewTx(tx) => {
if tx != &Transaction::default() {
tracing::error!("Wrong p2p message {:?}", message);
panic!("Wrong GossipsubMessage")
}
}
GossipsubMessage::NewBlock(block) => {
if block.header().height() != FuelBlock::default().header().height() {
tracing::error!("Wrong p2p message {:?}", message);
panic!("Wrong GossipsubMessage")
}
}
GossipsubMessage::ConsensusVote(vote) => {
if vote != &ConsensusVote::default() {
tracing::error!("Wrong p2p message {:?}", message);
panic!("Wrong GossipsubMessage")
}
}
}
let broadcast_request = broadcast_request.clone();
matches!(node_b.publish_message(broadcast_request), Err(PublishError::Duplicate));
break
}
tracing::info!("Node B Event: {:?}", node_b_event);
}
};
}
}
#[tokio::test]
#[instrument]
async fn request_response_works() {
use fuel_core_interfaces::{
common::fuel_tx::Transaction,
model::{
FuelBlock,
FuelBlockPoAConsensus,
SealedFuelBlock,
},
};
let mut p2p_config = P2PConfig::default_initialized("request_response_works");
let node_a_data = NodeData::random();
let mut node_a = node_a_data.create_service(p2p_config.clone());
p2p_config.bootstrap_nodes = vec![node_a_data.multiaddr];
let mut node_b = build_service_from_config(p2p_config.clone());
let (tx_test_end, mut rx_test_end) = mpsc::channel(1);
let mut request_sent = false;
loop {
tokio::select! {
message_sent = rx_test_end.recv() => {
assert_eq!(message_sent, Some(true), "Received wrong block height!");
break;
}
node_a_event = node_a.next_event() => {
if let Some(FuelP2PEvent::PeerInfoUpdated(peer_id)) = node_a_event {
if let Some(PeerInfo { peer_addresses, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) {
if !peer_addresses.is_empty() && !request_sent {
request_sent = true;
let (tx_orchestrator, rx_orchestrator) = oneshot::channel();
let requested_block_height = RequestMessage::RequestBlock(0_u64.into());
assert!(node_a.send_request_msg(None, requested_block_height, ResponseChannelItem::ResponseBlock(tx_orchestrator)).is_ok());
let tx_test_end = tx_test_end.clone();
tokio::spawn(async move {
let response_message = rx_orchestrator.await;
if let Ok(sealed_block) = response_message {
let _ = tx_test_end.send(*sealed_block.block.header().height() == 0_u64.into()).await;
} else {
tracing::error!("Orchestrator failed to receive a message: {:?}", response_message);
panic!("Message not received successfully!")
}
});
}
}
}
tracing::info!("Node A Event: {:?}", node_a_event);
},
node_b_event = node_b.next_event() => {
if let Some(FuelP2PEvent::RequestMessage{ request_id, .. }) = node_b_event {
let block = FuelBlock::new(PartialFuelBlockHeader::default(), vec![Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default(), Transaction::default()], &[]);
let sealed_block = SealedFuelBlock {
block,
consensus: FuelBlockConsensus::PoA(FuelBlockPoAConsensus::new(Default::default())),
};
let _ = node_b.send_response_msg(request_id, Some(OutboundResponse::ResponseBlock(Arc::new(sealed_block))));
}
tracing::info!("Node B Event: {:?}", node_b_event);
}
};
}
}
#[tokio::test]
#[instrument]
async fn req_res_outbound_timeout_works() {
let mut p2p_config =
P2PConfig::default_initialized("req_res_outbound_timeout_works");
p2p_config.set_request_timeout = Duration::from_secs(0);
let node_a_data = NodeData::random();
let mut node_a = node_a_data.create_service(p2p_config.clone());
p2p_config.bootstrap_nodes = vec![node_a_data.multiaddr];
let mut node_b = build_service_from_config(p2p_config.clone());
let (tx_test_end, mut rx_test_end) = tokio::sync::mpsc::channel(1);
let mut request_sent = false;
loop {
tokio::select! {
node_a_event = node_a.next_event() => {
if let Some(FuelP2PEvent::PeerInfoUpdated(peer_id)) = node_a_event {
if let Some(PeerInfo { peer_addresses, .. }) = node_a.swarm.behaviour().get_peer_info(&peer_id) {
if !peer_addresses.is_empty() && !request_sent {
request_sent = true;
let (tx_orchestrator, rx_orchestrator) = oneshot::channel();
assert_eq!(node_a.outbound_requests_table.len(), 0);
let requested_block_height = RequestMessage::RequestBlock(0_u64.into());
assert!(node_a.send_request_msg(None, requested_block_height, ResponseChannelItem::ResponseBlock(tx_orchestrator)).is_ok());
assert_eq!(node_a.outbound_requests_table.len(), 1);
let tx_test_end = tx_test_end.clone();
tokio::spawn(async move {
if (rx_orchestrator.await).is_err() {
let _ = tx_test_end.send(()).await;
}
});
}
}
}
tracing::info!("Node A Event: {:?}", node_a_event);
},
_ = rx_test_end.recv() => {
assert_eq!(node_a.outbound_requests_table.len(), 0);
break;
},
node_b_event = node_b.next_event() => {
tracing::info!("Node B Event: {:?}", node_b_event);
}
};
}
}
}