use crate::{
chunk_request::GetChunkRequest,
counters,
error::Error,
logging::{LogEntry, LogEvent, LogSchema},
network::{StateSyncMessage, StateSyncSender},
};
use aptos_config::{
config::PeerRole,
network_id::{NetworkId, PeerNetworkId},
};
use aptos_logger::prelude::*;
use itertools::Itertools;
use netcore::transport::ConnectionOrigin;
use network::{protocols::network::ApplicationNetworkSender, transport::ConnectionMetadata};
use rand::{
distributions::{Distribution, WeightedIndex},
thread_rng,
};
use short_hex_str::AsShortHexStr;
use std::{
cmp::Ordering,
collections::{
hash_map::Entry::{Occupied, Vacant},
BTreeMap, HashMap,
},
time::{Duration, SystemTime, UNIX_EPOCH},
};
const MAX_SCORE: f64 = 100.0;
const MIN_SCORE: f64 = 1.0;
const STARTING_SCORE: f64 = 50.0;
const STARTING_SCORE_PREFERRED: f64 = 100.0;
#[derive(Clone, Debug)]
pub struct ChunkRequestInfo {
first_request_time: SystemTime,
last_request_time: SystemTime,
multicast_level: NetworkId,
multicast_start_time: SystemTime,
last_request_peers: Vec<PeerNetworkId>,
}
impl ChunkRequestInfo {
pub fn new(peers: Vec<PeerNetworkId>, multicast_level: NetworkId) -> Self {
let now = SystemTime::now();
Self {
first_request_time: now,
last_request_time: now,
multicast_level,
multicast_start_time: now,
last_request_peers: peers,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
enum PeerScoreUpdateType {
Success,
EmptyChunk,
ChunkVersionCannotBeApplied,
InvalidChunk,
InvalidChunkRequest,
TimeOut,
}
pub struct RequestManager {
peer_scores: HashMap<PeerNetworkId, f64>,
requests: BTreeMap<u64, ChunkRequestInfo>,
request_timeout: Duration,
multicast_timeout: Duration,
multicast_network_level: NetworkId,
network_senders: HashMap<NetworkId, StateSyncSender>,
}
impl RequestManager {
pub fn new(
request_timeout: Duration,
multicast_timeout: Duration,
network_senders: HashMap<NetworkId, StateSyncSender>,
) -> Self {
let multicast_network_level = NetworkId::Validator;
update_multicast_network_counter(multicast_network_level);
Self {
peer_scores: HashMap::new(),
requests: BTreeMap::new(),
request_timeout,
multicast_timeout,
multicast_network_level,
network_senders,
}
}
pub fn enable_peer(
&mut self,
peer: PeerNetworkId,
metadata: ConnectionMetadata,
) -> Result<(), Error> {
if !self.is_valid_state_sync_peer(&peer, metadata.origin) {
return Err(Error::InvalidStateSyncPeer(
peer.to_string(),
metadata.origin.to_string(),
));
}
info!(LogSchema::new(LogEntry::NewPeer)
.peer(&peer)
.is_valid_peer(true));
counters::ACTIVE_UPSTREAM_PEERS
.with_label_values(&[peer.network_id().as_str()])
.inc();
match self.peer_scores.entry(peer) {
Occupied(occupied_entry) => {
warn!(LogSchema::new(LogEntry::NewPeerAlreadyExists).peer(occupied_entry.key()));
}
Vacant(vacant_entry) => {
let peer_score = if metadata.role == PeerRole::PreferredUpstream {
STARTING_SCORE_PREFERRED
} else {
STARTING_SCORE
};
vacant_entry.insert(peer_score);
}
}
Ok(())
}
pub fn disable_peer(&mut self, peer: &PeerNetworkId) -> Result<(), Error> {
info!(LogSchema::new(LogEntry::LostPeer).peer(peer));
if self.peer_scores.contains_key(peer) {
counters::ACTIVE_UPSTREAM_PEERS
.with_label_values(&[peer.network_id().as_str()])
.dec();
self.peer_scores.remove(peer);
} else {
warn!(LogSchema::new(LogEntry::LostPeerNotKnown).peer(peer));
}
Ok(())
}
pub fn no_available_peers(&self) -> bool {
self.peer_scores.is_empty()
}
fn update_score(&mut self, peer: &PeerNetworkId, update_type: PeerScoreUpdateType) {
if let Some(score) = self.peer_scores.get_mut(peer) {
let old_score = *score;
let new_score = match update_type {
PeerScoreUpdateType::Success => {
let new_score = old_score + 1.0;
new_score.min(MAX_SCORE)
}
PeerScoreUpdateType::InvalidChunk
| PeerScoreUpdateType::ChunkVersionCannotBeApplied => {
let new_score = old_score * 0.8;
new_score.max(MIN_SCORE)
}
PeerScoreUpdateType::TimeOut
| PeerScoreUpdateType::EmptyChunk
| PeerScoreUpdateType::InvalidChunkRequest => {
let new_score = old_score * 0.95;
new_score.max(MIN_SCORE)
}
};
*score = new_score;
}
}
fn calculate_weighted_peers_per_network(
&mut self,
) -> BTreeMap<NetworkId, (Vec<PeerNetworkId>, Option<WeightedIndex<f64>>)> {
let peers_by_network_level = self
.peer_scores
.iter()
.map(|(peer, peer_score)| (peer.network_id(), (peer, peer_score)))
.into_group_map();
peers_by_network_level
.into_iter()
.map(|(network_level, peers)| {
let mut eligible_peers = vec![];
let weights: Vec<_> = peers
.iter()
.map(|(peer, peer_score)| {
eligible_peers.push(**peer);
*peer_score
})
.collect();
let weighted_index = WeightedIndex::new(weights)
.map_err(|error| {
error!(
"Failed to compute weighted index for eligible peers: {:?}",
error
);
error
})
.ok();
(network_level, (eligible_peers, weighted_index))
})
.collect()
}
fn pick_peers(&mut self) -> Vec<PeerNetworkId> {
let weighted_peers_per_network = self.calculate_weighted_peers_per_network();
let mut chosen_peers = vec![];
let mut new_multicast_network_level = None;
for (network_level, (peers, weighted_index)) in &weighted_peers_per_network {
if let Some(peer) = pick_peer(peers, weighted_index) {
chosen_peers.push(peer)
}
if !chosen_peers.is_empty() && *network_level >= self.multicast_network_level {
new_multicast_network_level = Some(network_level);
break;
}
}
if let Some(network_level) = new_multicast_network_level {
self.update_multicast_network_level(*network_level, None);
}
chosen_peers
}
pub fn send_chunk_request(&mut self, req: GetChunkRequest) -> Result<(), Error> {
let log = LogSchema::new(LogEntry::SendChunkRequest).chunk_request(req.clone());
let peers = self.pick_peers();
if peers.is_empty() {
warn!(log.event(LogEvent::MissingPeers));
return Err(Error::NoAvailablePeers(
"No peers to send chunk request to".into(),
));
}
let req_info = self.add_request(req.known_version, peers.clone());
debug!(log
.clone()
.event(LogEvent::ChunkRequestInfo)
.chunk_req_info(&req_info));
let msg = StateSyncMessage::GetChunkRequest(Box::new(req));
let mut failed_peer_sends = vec![];
for peer in peers {
let sender = self.get_network_sender(&peer);
let peer_id = peer.peer_id();
let send_result = sender.send_to(peer_id, msg.clone());
let curr_log = log.clone().peer(&peer);
let result_label = if let Err(e) = send_result {
failed_peer_sends.push(peer);
error!(curr_log.event(LogEvent::NetworkSendError).error(&e.into()));
counters::SEND_FAIL_LABEL
} else {
debug!(curr_log.event(LogEvent::Success));
counters::SEND_SUCCESS_LABEL
};
counters::REQUESTS_SENT
.with_label_values(&[
peer.network_id().as_str(),
peer_id.short_str().as_str(),
result_label,
])
.inc();
}
if failed_peer_sends.is_empty() {
Ok(())
} else {
Err(Error::UnexpectedError(format!(
"Failed to send chunk request to: {:?}",
failed_peer_sends
)))
}
}
fn get_network_sender(&mut self, peer: &PeerNetworkId) -> StateSyncSender {
self.network_senders
.get_mut(&peer.network_id())
.unwrap_or_else(|| {
panic!(
"Missing network sender for network: {:?}",
peer.network_id()
)
})
.clone()
}
pub fn send_chunk_response(
&mut self,
peer: &PeerNetworkId,
message: StateSyncMessage,
) -> Result<(), Error> {
self.get_network_sender(peer)
.send_to(peer.peer_id(), message)
.map_err(|err| err.into())
}
pub fn add_request(&mut self, version: u64, peers: Vec<PeerNetworkId>) -> ChunkRequestInfo {
if let Some(prev_request) = self.requests.get_mut(&version) {
let now = SystemTime::now();
if self.multicast_network_level != prev_request.multicast_level {
prev_request.multicast_level = self.multicast_network_level;
prev_request.multicast_start_time = now;
}
prev_request.last_request_peers = peers;
prev_request.last_request_time = now;
prev_request.clone()
} else {
let chunk_request_info = ChunkRequestInfo::new(peers, self.multicast_network_level);
self.requests.insert(version, chunk_request_info.clone());
chunk_request_info
}
}
pub fn process_chunk_from_downstream(&mut self, peer: &PeerNetworkId) {
self.update_score(peer, PeerScoreUpdateType::InvalidChunk);
}
pub fn process_empty_chunk(&mut self, peer: &PeerNetworkId) {
self.update_score(peer, PeerScoreUpdateType::EmptyChunk);
}
pub fn process_invalid_chunk(&mut self, peer: &PeerNetworkId) {
self.update_score(peer, PeerScoreUpdateType::InvalidChunk);
}
pub fn process_invalid_chunk_request(&mut self, peer: &PeerNetworkId) {
self.update_score(peer, PeerScoreUpdateType::InvalidChunkRequest);
}
pub fn process_success_response(&mut self, peer: &PeerNetworkId) {
let peer_network_level = peer.network_id();
if peer_network_level < self.multicast_network_level {
self.update_multicast_network_level(peer_network_level, None)
}
self.update_score(peer, PeerScoreUpdateType::Success);
}
pub fn process_chunk_version_mismatch(
&mut self,
peer: &PeerNetworkId,
chunk_version: u64,
synced_version: u64,
) -> Result<(), Error> {
if self.is_multicast_response(chunk_version, peer) {
Err(Error::ReceivedChunkForOutdatedRequest(
peer.to_string(),
synced_version.to_string(),
chunk_version.to_string(),
))
} else {
self.update_score(peer, PeerScoreUpdateType::ChunkVersionCannotBeApplied);
Err(Error::ReceivedNonSequentialChunk(
peer.to_string(),
synced_version.to_string(),
chunk_version.to_string(),
))
}
}
fn is_multicast_response(&self, version: u64, peer: &PeerNetworkId) -> bool {
self.requests.get(&version).map_or(false, |req| {
req.last_request_peers.contains(peer) && req.last_request_peers.len() > 1
})
}
pub fn get_last_request_time(&self, version: u64) -> Option<SystemTime> {
self.requests
.get(&version)
.map(|req_info| req_info.last_request_time)
}
pub fn get_first_request_time(&self, version: u64) -> Option<SystemTime> {
self.requests
.get(&version)
.map(|req_info| req_info.first_request_time)
}
fn get_multicast_start_time(&self, version: u64) -> Option<SystemTime> {
self.requests
.get(&version)
.map(|req_info| req_info.multicast_start_time)
}
pub fn remove_requests(&mut self, version: u64) {
let versions_to_remove = self
.requests
.range(..version)
.filter_map(|(version, req)| {
if is_timeout(req.last_request_time, self.request_timeout) {
Some(*version)
} else {
None
}
})
.collect::<Vec<_>>();
for v in versions_to_remove {
self.requests.remove(&v);
}
}
pub fn has_request_timed_out(&mut self, version: u64) -> Result<bool, Error> {
let last_request_time = self.get_last_request_time(version).unwrap_or(UNIX_EPOCH);
let timeout = is_timeout(last_request_time, self.request_timeout);
if !timeout {
return Ok(timeout);
}
let peers_to_penalize = match self.requests.get(&version) {
Some(prev_request) => prev_request.last_request_peers.clone(),
None => {
return Ok(timeout);
}
};
for peer in peers_to_penalize.iter() {
self.update_score(peer, PeerScoreUpdateType::TimeOut);
}
let multicast_start_time = self.get_multicast_start_time(version).unwrap_or(UNIX_EPOCH);
if is_timeout(multicast_start_time, self.multicast_timeout) {
let new_multicast_network_level = match self.multicast_network_level {
NetworkId::Validator => NetworkId::Vfn,
_ => NetworkId::Public,
};
self.update_multicast_network_level(new_multicast_network_level, Some(version));
}
Ok(timeout)
}
fn is_valid_state_sync_peer(&self, peer: &PeerNetworkId, origin: ConnectionOrigin) -> bool {
peer.network_id().is_validator_network()
|| peer.network_id().is_vfn_network()
|| origin == ConnectionOrigin::Outbound
}
pub fn is_known_state_sync_peer(&self, peer: &PeerNetworkId) -> bool {
self.peer_scores.contains_key(peer)
}
fn update_multicast_network_level(
&mut self,
new_level: NetworkId,
request_version: Option<u64>,
) {
let current_level = self.multicast_network_level;
let log_event = match new_level.cmp(¤t_level) {
Ordering::Equal => return,
Ordering::Greater => LogEvent::Failover,
Ordering::Less => LogEvent::Recover,
};
self.multicast_network_level = new_level;
update_multicast_network_counter(self.multicast_network_level);
let mut log_event = LogSchema::event_log(LogEntry::Multicast, log_event)
.old_multicast_level(current_level)
.new_multicast_level(new_level);
if let Some(version) = request_version {
log_event = log_event.request_version(version);
}
info!(log_event);
}
}
fn is_timeout(timeout_start: SystemTime, timeout_duration: Duration) -> bool {
timeout_start
.checked_add(timeout_duration)
.map_or(false, |deadline| {
SystemTime::now().duration_since(deadline).is_ok()
})
}
fn pick_peer(
peers: &[PeerNetworkId],
weighted_index: &Option<WeightedIndex<f64>>,
) -> Option<PeerNetworkId> {
if let Some(weighted_index) = &weighted_index {
let mut rng = thread_rng();
if let Some(peer) = peers.get(weighted_index.sample(&mut rng)) {
return Some(*peer);
}
}
None
}
fn update_multicast_network_counter(multicast_network_level: NetworkId) {
let network_counter_value = if multicast_network_level.is_validator_network() {
0
} else if multicast_network_level.is_vfn_network() {
1
} else {
2
};
counters::MULTICAST_LEVEL.set(network_counter_value);
}
#[cfg(test)]
mod tests {
use super::*;
const NUM_CHUNKS_TO_PROCESS: u64 = 50;
const NUM_PICKS_TO_MAKE: u64 = 1000;
#[test]
fn test_disable_peer() {
let (mut request_manager, validators) = generate_request_manager_and_validators(0, 1);
let validator_0 = validators[0];
assert!(request_manager.is_known_state_sync_peer(&validator_0));
assert!(!request_manager.no_available_peers());
request_manager.disable_peer(&validator_0).unwrap();
assert!(!request_manager.is_known_state_sync_peer(&validator_0));
assert!(request_manager.no_available_peers());
add_validator_to_request_manager(&mut request_manager, &validator_0, PeerRole::Validator);
assert!(request_manager.is_known_state_sync_peer(&validator_0));
assert!(!request_manager.no_available_peers());
}
#[test]
fn test_score_chunk_success() {
let num_validators = 4;
let (mut request_manager, validators) =
generate_request_manager_and_validators(0, num_validators);
for _ in 0..NUM_CHUNKS_TO_PROCESS {
for validator_index in 0..num_validators {
request_manager.process_empty_chunk(&validators[validator_index as usize]);
}
}
for _ in 0..NUM_CHUNKS_TO_PROCESS {
request_manager.process_success_response(&validators[0]);
}
verify_validator_picked_most_often(&mut request_manager, &validators, 0);
}
#[test]
fn test_score_chunk_timeout() {
let (mut request_manager, validators) = generate_request_manager_and_validators(0, 4);
let validator_0 = validators[0];
for _ in 0..NUM_CHUNKS_TO_PROCESS {
request_manager.add_request(1, vec![validator_0]);
assert!(request_manager.has_request_timed_out(1).unwrap());
}
verify_validator_picked_least_often(&mut request_manager, &validators, 0);
}
#[test]
fn test_score_chunk_version_mismatch() {
let (mut request_manager, validators) = generate_request_manager_and_validators(0, 4);
let validator_0 = validators[0];
for _ in 0..NUM_CHUNKS_TO_PROCESS {
request_manager.add_request(100, vec![validator_0]);
request_manager
.process_chunk_version_mismatch(&validator_0, 100, 0)
.unwrap_err();
}
verify_validator_picked_least_often(&mut request_manager, &validators, 0);
}
#[test]
fn test_score_chunk_version_mismatch_multicast() {
let num_validators = 4;
let (mut request_manager, validators) =
generate_request_manager_and_validators(0, num_validators);
let validator_0 = validators[0];
let validator_1 = validators[1];
for _ in 0..NUM_CHUNKS_TO_PROCESS {
for validator_index in 1..num_validators {
request_manager.process_empty_chunk(&validators[validator_index as usize]);
}
}
for _ in 0..NUM_CHUNKS_TO_PROCESS {
request_manager.add_request(100, vec![validator_0, validator_1]);
request_manager
.process_chunk_version_mismatch(&validator_0, 100, 0)
.unwrap_err();
}
verify_validator_picked_most_often(&mut request_manager, &validators, 0);
}
#[test]
fn test_score_empty_chunk() {
let (mut request_manager, validators) = generate_request_manager_and_validators(10, 4);
for _ in 0..NUM_CHUNKS_TO_PROCESS {
request_manager.process_empty_chunk(&validators[0]);
}
verify_validator_picked_least_often(&mut request_manager, &validators, 0);
}
#[test]
fn test_score_invalid_chunk() {
let (mut request_manager, validators) = generate_request_manager_and_validators(10, 4);
for _ in 0..NUM_CHUNKS_TO_PROCESS {
request_manager.process_invalid_chunk(&validators[0]);
}
verify_validator_picked_least_often(&mut request_manager, &validators, 0);
}
#[test]
fn test_score_invalid_chunk_request() {
let (mut request_manager, validators) = generate_request_manager_and_validators(10, 4);
for _ in 0..NUM_CHUNKS_TO_PROCESS {
request_manager.process_invalid_chunk_request(&validators[0]);
}
verify_validator_picked_least_often(&mut request_manager, &validators, 0);
}
#[test]
fn test_score_chunk_from_downstream() {
let (mut request_manager, validators) = generate_request_manager_and_validators(10, 4);
for _ in 0..NUM_CHUNKS_TO_PROCESS {
request_manager.process_chunk_from_downstream(&validators[0]);
}
verify_validator_picked_least_often(&mut request_manager, &validators, 0);
}
#[test]
fn test_score_preferred() {
let num_validators = 4;
let mut request_manager = generate_request_manager(0);
let mut validators = Vec::new();
for validator_index in 0..num_validators {
let validator = PeerNetworkId::random_validator();
let peer_role = if validator_index == 0 {
PeerRole::PreferredUpstream
} else {
PeerRole::Validator
};
add_validator_to_request_manager(&mut request_manager, &validator, peer_role);
validators.push(validator);
}
verify_validator_picked_most_often(&mut request_manager, &validators, 0);
}
#[test]
fn test_remove_requests() {
let (mut request_manager, validators) = generate_request_manager_and_validators(0, 2);
let validator_0 = vec![validators[0]];
let validator_1 = vec![validators[1]];
request_manager.add_request(1, validator_0.clone());
request_manager.add_request(3, validator_1.clone());
request_manager.add_request(5, validator_0.clone());
request_manager.add_request(10, validator_0);
request_manager.add_request(12, validator_1);
request_manager.remove_requests(5);
assert!(request_manager.get_last_request_time(1).is_none());
assert!(request_manager.get_last_request_time(3).is_none());
assert!(request_manager.get_last_request_time(5).is_some());
assert!(request_manager.get_last_request_time(10).is_some());
assert!(request_manager.get_last_request_time(12).is_some());
}
#[test]
fn test_request_times() {
let (mut request_manager, validators) = generate_request_manager_and_validators(0, 2);
assert!(request_manager.get_first_request_time(1).is_none());
request_manager.add_request(1, vec![validators[0]]);
request_manager.add_request(1, vec![validators[1]]);
assert!(
request_manager.get_first_request_time(1).unwrap()
< request_manager.get_last_request_time(1).unwrap()
);
}
fn verify_validator_picked_most_often(
request_manager: &mut RequestManager,
validators: &[PeerNetworkId],
validator_index: usize,
) {
verify_validator_pick_frequency(request_manager, validators, validator_index, true)
}
fn verify_validator_picked_least_often(
request_manager: &mut RequestManager,
validators: &[PeerNetworkId],
validator_index: usize,
) {
verify_validator_pick_frequency(request_manager, validators, validator_index, false)
}
fn verify_validator_pick_frequency(
request_manager: &mut RequestManager,
validators: &[PeerNetworkId],
validator_index: usize,
check_highest_frequency: bool,
) {
let pick_counts = calculate_pick_counts_for_validators(request_manager, NUM_PICKS_TO_MAKE);
let validator_count = pick_counts.get(&validators[validator_index]).unwrap_or(&0);
for (index, validator) in validators.iter().enumerate() {
if validator_index != index {
if check_highest_frequency {
assert!(validator_count > pick_counts.get(validator).unwrap());
} else {
assert!(validator_count < pick_counts.get(validator).unwrap());
}
}
}
}
fn calculate_pick_counts_for_validators(
request_manager: &mut RequestManager,
number_of_picks_to_execute: u64,
) -> HashMap<PeerNetworkId, u64> {
let mut pick_counts = HashMap::new();
for _ in 0..number_of_picks_to_execute {
let picked_peers = request_manager.pick_peers();
assert_eq!(1, picked_peers.len());
let picked_peer = picked_peers[0];
let counter = pick_counts.entry(picked_peer).or_insert(0);
*counter += 1;
}
pick_counts
}
fn generate_request_manager(request_timeout: u64) -> RequestManager {
RequestManager::new(
Duration::from_secs(request_timeout),
Duration::from_secs(30),
HashMap::new(),
)
}
fn generate_request_manager_and_validators(
request_timeout: u64,
num_validators: u64,
) -> (RequestManager, Vec<PeerNetworkId>) {
let mut request_manager = generate_request_manager(request_timeout);
let mut validators = Vec::new();
for _ in 0..num_validators {
let validator = PeerNetworkId::random_validator();
add_validator_to_request_manager(&mut request_manager, &validator, PeerRole::Validator);
validators.push(validator);
}
(request_manager, validators)
}
fn add_validator_to_request_manager(
request_manager: &mut RequestManager,
validator: &PeerNetworkId,
peer_role: PeerRole,
) {
let connection_metadata = ConnectionMetadata::mock_with_role_and_origin(
validator.peer_id(),
peer_role,
ConnectionOrigin::Inbound,
);
request_manager
.enable_peer(*validator, connection_metadata)
.unwrap();
}
}