use crate::counters;
use anyhow::anyhow;
use aptos_config::network_id::{NetworkId, PeerNetworkId};
use aptos_logger::prelude::*;
use aptos_types::{epoch_change::EpochChangeProof, PeerId};
use async_trait::async_trait;
use channel::{aptos_channel, message_queues::QueueStyle};
use consensus_types::{
block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse},
epoch_retrieval::EpochRetrievalRequest,
experimental::{commit_decision::CommitDecision, commit_vote::CommitVote},
proposal_msg::ProposalMsg,
sync_info::SyncInfo,
vote_msg::VoteMsg,
};
use network::{
application::storage::PeerMetadataStorage,
constants::NETWORK_CHANNEL_SIZE,
error::NetworkError,
peer_manager::{ConnectionRequestSender, PeerManagerRequestSender},
protocols::{
network::{
AppConfig, ApplicationNetworkSender, NetworkEvents, NetworkSender, NewNetworkSender,
},
rpc::error::RpcError,
wire::handshake::v1::ProtocolIdSet,
},
ProtocolId,
};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, sync::Arc, time::Duration};
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum ConsensusMsg {
BlockRetrievalRequest(Box<BlockRetrievalRequest>),
BlockRetrievalResponse(Box<BlockRetrievalResponse>),
EpochRetrievalRequest(Box<EpochRetrievalRequest>),
ProposalMsg(Box<ProposalMsg>),
SyncInfo(Box<SyncInfo>),
EpochChangeProof(Box<EpochChangeProof>),
VoteMsg(Box<VoteMsg>),
CommitVoteMsg(Box<CommitVote>),
CommitDecisionMsg(Box<CommitDecision>),
}
pub type ConsensusNetworkEvents = NetworkEvents<ConsensusMsg>;
#[derive(Clone)]
pub struct ConsensusNetworkSender {
network_sender: NetworkSender<ConsensusMsg>,
peer_metadata_storage: Option<Arc<PeerMetadataStorage>>,
}
pub const RPC: &[ProtocolId] = &[
ProtocolId::ConsensusRpcCompressed,
ProtocolId::ConsensusRpcBcs,
ProtocolId::ConsensusRpcJson,
];
pub const DIRECT_SEND: &[ProtocolId] = &[
ProtocolId::ConsensusDirectSendCompressed,
ProtocolId::ConsensusDirectSendBcs,
ProtocolId::ConsensusDirectSendJson,
];
pub fn network_endpoint_config() -> AppConfig {
let protos = RPC.iter().chain(DIRECT_SEND.iter()).copied();
AppConfig::p2p(
protos,
aptos_channel::Config::new(NETWORK_CHANNEL_SIZE)
.queue_style(QueueStyle::LIFO)
.counters(&counters::PENDING_CONSENSUS_NETWORK_EVENTS),
)
}
impl NewNetworkSender for ConsensusNetworkSender {
fn new(
peer_mgr_reqs_tx: PeerManagerRequestSender,
connection_reqs_tx: ConnectionRequestSender,
) -> Self {
Self {
network_sender: NetworkSender::new(peer_mgr_reqs_tx, connection_reqs_tx),
peer_metadata_storage: None,
}
}
}
impl ConsensusNetworkSender {
pub fn initialize(&mut self, peer_metadata_storage: Arc<PeerMetadataStorage>) {
self.peer_metadata_storage = Some(peer_metadata_storage);
}
fn supported_protocols(&self, peer: PeerId) -> anyhow::Result<ProtocolIdSet> {
if let Some(peer_metadata_storage) = &self.peer_metadata_storage {
let peer_network_id = PeerNetworkId::new(NetworkId::Validator, peer);
peer_metadata_storage
.read(peer_network_id)
.map(|peer_info| peer_info.active_connection.application_protocols)
.ok_or_else(|| anyhow!("Peer not connected"))
} else {
Err(anyhow!("ConsensusNetworkSender not initialized"))
}
}
fn preferred_protocol_for_peer(
&self,
peer: PeerId,
local_protocols: &[ProtocolId],
) -> anyhow::Result<ProtocolId> {
let remote_protocols = self.supported_protocols(peer)?;
for protocol in local_protocols {
if remote_protocols.contains(*protocol) {
return Ok(*protocol);
}
}
Err(anyhow!("No available protocols for peer {}", peer))
}
}
#[async_trait]
impl ApplicationNetworkSender<ConsensusMsg> for ConsensusNetworkSender {
fn send_to(&self, recipient: PeerId, message: ConsensusMsg) -> Result<(), NetworkError> {
let protocol = self.preferred_protocol_for_peer(recipient, DIRECT_SEND)?;
self.network_sender.send_to(recipient, protocol, message)
}
fn send_to_many(
&self,
recipients: impl Iterator<Item = PeerId>,
message: ConsensusMsg,
) -> Result<(), NetworkError> {
let mut peers_per_protocol = HashMap::new();
let mut not_available = vec![];
for peer in recipients {
match self.preferred_protocol_for_peer(peer, DIRECT_SEND) {
Ok(protocol) => peers_per_protocol
.entry(protocol)
.or_insert_with(Vec::new)
.push(peer),
Err(_) => not_available.push(peer),
}
}
sample!(
SampleRate::Duration(Duration::from_secs(10)),
error!("Unavailable peers: {:?}", not_available)
);
for (protocol, peers) in peers_per_protocol {
self.network_sender
.send_to_many(peers.into_iter(), protocol, message.clone())?;
}
Ok(())
}
async fn send_rpc(
&self,
recipient: PeerId,
message: ConsensusMsg,
timeout: Duration,
) -> Result<ConsensusMsg, RpcError> {
let protocol = self.preferred_protocol_for_peer(recipient, RPC)?;
self.network_sender
.send_rpc(recipient, protocol, message, timeout)
.await
}
}