use crate::{
chunk_request::{GetChunkRequest, TargetType},
chunk_response::{GetChunkResponse, ResponseLedgerInfo},
client::CoordinatorMessage,
counters,
error::Error,
executor_proxy::ExecutorProxyTrait,
logging::{LogEntry, LogEvent, LogSchema},
network::{StateSyncEvents, StateSyncMessage, StateSyncSender},
request_manager::RequestManager,
shared_components::SyncState,
};
use aptos_config::{
config::{NodeConfig, RoleType, StateSyncConfig},
network_id::{NetworkId, PeerNetworkId},
};
use aptos_logger::prelude::*;
use aptos_types::{
contract_event::ContractEvent,
ledger_info::LedgerInfoWithSignatures,
transaction::{Transaction, TransactionListWithProof, Version},
waypoint::Waypoint,
PeerId,
};
use consensus_notifications::{
ConsensusCommitNotification, ConsensusNotification, ConsensusNotificationListener,
ConsensusSyncNotification,
};
use fail::fail_point;
use futures::{
channel::{mpsc, oneshot},
executor::block_on,
stream::select_all,
StreamExt,
};
use mempool_notifications::MempoolNotificationSender;
use network::{protocols::network::Event, transport::ConnectionMetadata};
use short_hex_str::AsShortHexStr;
use std::{
cmp,
collections::HashMap,
time::{Duration, SystemTime},
};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
#[derive(Clone, Debug, PartialEq, Eq)]
struct PendingRequestInfo {
expiration_time: SystemTime,
known_version: u64,
request_epoch: u64,
target_li: Option<LedgerInfoWithSignatures>,
chunk_limit: u64,
}
pub struct SyncRequest {
pub last_commit_timestamp: SystemTime,
pub consensus_sync_notification: ConsensusSyncNotification,
}
pub(crate) struct StateSyncCoordinator<T, M> {
client_events: mpsc::UnboundedReceiver<CoordinatorMessage>,
mempool_notifier: M,
consensus_listener: ConsensusNotificationListener,
local_state: SyncState,
config: StateSyncConfig,
role: RoleType,
waypoint: Waypoint,
request_manager: RequestManager,
sync_request: Option<SyncRequest>,
target_ledger_info: Option<LedgerInfoWithSignatures>,
initialization_listener: Option<oneshot::Sender<Result<(), Error>>>,
subscriptions: HashMap<PeerNetworkId, PendingRequestInfo>,
executor_proxy: T,
read_only_mode: bool,
}
impl<T: ExecutorProxyTrait, M: MempoolNotificationSender> StateSyncCoordinator<T, M> {
pub fn new(
client_events: mpsc::UnboundedReceiver<CoordinatorMessage>,
mempool_notifier: M,
consensus_listener: ConsensusNotificationListener,
network_senders: HashMap<NetworkId, StateSyncSender>,
node_config: &NodeConfig,
waypoint: Waypoint,
executor_proxy: T,
initial_state: SyncState,
read_only_mode: bool,
) -> Result<Self, Error> {
info!(LogSchema::event_log(LogEntry::Waypoint, LogEvent::Initialize).waypoint(waypoint));
let role = node_config.base.role;
let tick_interval_ms = node_config.state_sync.tick_interval_ms;
let retry_timeout_val = match role {
RoleType::FullNode => tick_interval_ms
.checked_add(node_config.state_sync.long_poll_timeout_ms)
.ok_or_else(|| {
Error::IntegerOverflow("Fullnode retry timeout has overflown!".into())
})?,
RoleType::Validator => tick_interval_ms.checked_mul(2).ok_or_else(|| {
Error::IntegerOverflow("Validator retry timeout has overflown!".into())
})?,
};
let request_manager = RequestManager::new(
Duration::from_millis(retry_timeout_val),
Duration::from_millis(node_config.state_sync.multicast_timeout_ms),
network_senders,
);
Ok(Self {
client_events,
mempool_notifier,
consensus_listener,
local_state: initial_state,
config: node_config.state_sync.clone(),
role,
waypoint,
request_manager,
subscriptions: HashMap::new(),
sync_request: None,
target_ledger_info: None,
initialization_listener: None,
executor_proxy,
read_only_mode,
})
}
pub async fn start(
mut self,
network_handles: Vec<(NetworkId, StateSyncSender, StateSyncEvents)>,
) {
info!(LogSchema::new(LogEntry::RuntimeStart));
let mut interval = IntervalStream::new(interval(Duration::from_millis(
self.config.tick_interval_ms,
)))
.fuse();
let events: Vec<_> = network_handles
.into_iter()
.map(|(network_id, _sender, events)| events.map(move |e| (network_id, e)))
.collect();
let mut network_events = select_all(events).fuse();
loop {
let _timer = counters::MAIN_LOOP.start_timer();
::futures::select! {
msg = self.consensus_listener.select_next_some() => {
match msg {
ConsensusNotification::SyncToTarget(sync_notification) => {
let _timer = counters::PROCESS_COORDINATOR_MSG_LATENCY
.with_label_values(&[counters::SYNC_MSG_LABEL])
.start_timer();
if let Err(e) = self.process_sync_request(sync_notification).await {
error!(LogSchema::new(LogEntry::SyncRequest).error(&e));
counters::SYNC_REQUEST_RESULT.with_label_values(&[counters::FAIL_LABEL]).inc();
}
},
ConsensusNotification::NotifyCommit(commit_notification) => {
let _timer = counters::PROCESS_COORDINATOR_MSG_LATENCY
.with_label_values(&[counters::COMMIT_MSG_LABEL])
.start_timer();
if let Err(e) = self.process_commit_notification(commit_notification.transactions.clone(), commit_notification.reconfiguration_events.clone(), Some(commit_notification), None).await {
counters::CONSENSUS_COMMIT_FAIL_COUNT.inc();
error!(LogSchema::event_log(LogEntry::ConsensusCommit, LogEvent::PostCommitFail).error(&e));
}
}
}
}
msg = self.client_events.select_next_some() => {
match msg {
CoordinatorMessage::GetSyncState(callback) => {
let _ = self.get_sync_state(callback);
}
CoordinatorMessage::WaitForInitialization(cb_sender) => {
if let Err(e) = self.wait_for_initialization(cb_sender) {
error!(LogSchema::new(LogEntry::Waypoint).error(&e));
}
}
};
},
(network_id, event) = network_events.select_next_some() => {
match event {
Event::NewPeer(metadata) => {
if let Err(e) = self.process_new_peer(network_id, metadata) {
error!(LogSchema::new(LogEntry::NewPeer).error(&e));
}
}
Event::LostPeer(metadata) => {
if let Err(e) = self.process_lost_peer(network_id, metadata.remote_peer_id) {
error!(LogSchema::new(LogEntry::LostPeer).error(&e));
}
}
Event::Message(peer_id, message) => {
if let Err(e) = self.process_chunk_message(network_id, peer_id, message).await {
error!(LogSchema::new(LogEntry::ProcessChunkMessage).error(&e));
}
}
unexpected_event => {
counters::NETWORK_ERROR_COUNT.inc();
warn!(LogSchema::new(LogEntry::NetworkError),
"received unexpected network event: {:?}", unexpected_event);
},
}
},
_ = interval.select_next_some() => {
if let Err(e) = self.check_progress() {
error!(LogSchema::event_log(LogEntry::ProgressCheck, LogEvent::Fail).error(&e));
}
}
}
}
}
fn process_new_peer(
&mut self,
network_id: NetworkId,
metadata: ConnectionMetadata,
) -> Result<(), Error> {
let peer = PeerNetworkId::new(network_id, metadata.remote_peer_id);
self.request_manager.enable_peer(peer, metadata)?;
self.check_progress()
}
fn process_lost_peer(&mut self, network_id: NetworkId, peer_id: PeerId) -> Result<(), Error> {
let peer = PeerNetworkId::new(network_id, peer_id);
self.request_manager.disable_peer(&peer)
}
pub(crate) async fn process_chunk_message(
&mut self,
network_id: NetworkId,
peer_id: PeerId,
msg: StateSyncMessage,
) -> Result<(), Error> {
let peer = PeerNetworkId::new(network_id, peer_id);
match msg {
StateSyncMessage::GetChunkRequest(request) => {
let _timer = counters::PROCESS_MSG_LATENCY
.with_label_values(&[
peer.network_id().as_str(),
peer.peer_id().short_str().as_str(),
counters::CHUNK_REQUEST_MSG_LABEL,
])
.start_timer();
let process_result = self.process_chunk_request(peer, *request.clone());
if let Err(ref error) = process_result {
error!(
LogSchema::event_log(LogEntry::ProcessChunkRequest, LogEvent::Fail)
.peer(&peer)
.error(&error.clone())
.local_li_version(self.local_state.committed_version())
.chunk_request(*request)
);
counters::PROCESS_CHUNK_REQUEST_COUNT
.with_label_values(&[
peer.network_id().as_str(),
peer.peer_id().short_str().as_str(),
counters::FAIL_LABEL,
])
.inc();
} else {
counters::PROCESS_CHUNK_REQUEST_COUNT
.with_label_values(&[
peer.network_id().as_str(),
peer.peer_id().short_str().as_str(),
counters::SUCCESS_LABEL,
])
.inc();
}
process_result
}
StateSyncMessage::GetChunkResponse(response) => {
let _timer = counters::PROCESS_MSG_LATENCY
.with_label_values(&[
peer.network_id().as_str(),
peer.peer_id().short_str().as_str(),
counters::CHUNK_RESPONSE_MSG_LABEL,
])
.start_timer();
self.process_chunk_response(&peer, *response).await
}
}
}
fn sync_state_with_local_storage(&mut self) -> Result<(), Error> {
let new_state = self.executor_proxy.get_local_storage_state().map_err(|e| {
counters::STORAGE_READ_FAIL_COUNT.inc();
e
})?;
if new_state.trusted_epoch() > self.local_state.trusted_epoch() {
info!(LogSchema::new(LogEntry::EpochChange)
.old_epoch(self.local_state.trusted_epoch())
.new_epoch(new_state.trusted_epoch()));
}
self.local_state = new_state;
Ok(())
}
fn is_initialized(&self) -> bool {
self.waypoint.version() <= self.local_state.committed_version()
}
fn wait_for_initialization(
&mut self,
cb_sender: oneshot::Sender<Result<(), Error>>,
) -> Result<(), Error> {
if self.read_only_mode {
let read_only_error =
Error::ReadOnlyMode("Unable to initialize in read-only mode!".into());
return match cb_sender.send(Err(read_only_error.clone())) {
Err(error) => Err(Error::CallbackSendFailed(format!(
"Waypoint initialization callback error: {:?}",
error
))),
_ => Err(read_only_error),
};
} else if self.is_initialized() {
Self::send_initialization_callback(cb_sender)?;
} else {
self.initialization_listener = Some(cb_sender);
}
Ok(())
}
async fn process_sync_request(
&mut self,
sync_notification: ConsensusSyncNotification,
) -> Result<(), Error> {
if self.read_only_mode {
return Err(Error::ReadOnlyMode(
"Unable to process a sync request from consensus!".into(),
));
}
fail_point!("state_sync_v1::process_sync_request_message", |_| {
Err(crate::error::Error::UnexpectedError(
"Injected error in process_sync_request_message".into(),
))
});
let request = SyncRequest {
last_commit_timestamp: SystemTime::now(),
consensus_sync_notification: sync_notification,
};
if self.role == RoleType::FullNode {
return Err(Error::FullNodeSyncRequest);
}
let local_li_version = self.local_state.committed_version();
let target_version = request
.consensus_sync_notification
.target
.ledger_info()
.version();
info!(
LogSchema::event_log(LogEntry::SyncRequest, LogEvent::Received)
.target_version(target_version)
.local_li_version(local_li_version)
);
self.sync_state_with_local_storage()?;
if !self.is_initialized() {
return Err(Error::UninitializedError(
"Unable to process sync request message!".into(),
));
}
if target_version == local_li_version {
return self.send_sync_req_callback(request, Ok(())).await;
}
if target_version < local_li_version {
self.send_sync_req_callback(
request,
Err(Error::UnexpectedError("Sync request to old version".into())),
)
.await?;
return Err(Error::OldSyncRequestVersion(
target_version,
local_li_version,
));
}
self.sync_request = Some(request);
let known_version = self.local_state.synced_version();
self.send_chunk_request_with_target(
known_version,
self.local_state.trusted_epoch(),
self.create_sync_request_chunk_target(known_version)?,
)
}
async fn notify_consensus_of_commit_response(
&mut self,
commit_notification: ConsensusCommitNotification,
) -> Result<(), Error> {
if let Err(error) = self
.consensus_listener
.respond_to_commit_notification(commit_notification, Ok(()))
.await
{
counters::COMMIT_FLOW_FAIL
.with_label_values(&[counters::CONSENSUS_LABEL])
.inc();
return Err(Error::CallbackSendFailed(format!(
"Failed to send commit ACK to consensus!: {:?}",
error
)));
};
Ok(())
}
async fn process_commit_notification(
&mut self,
committed_transactions: Vec<Transaction>,
reconfiguration_events: Vec<ContractEvent>,
commit_notification: Option<ConsensusCommitNotification>,
chunk_sender: Option<&PeerNetworkId>,
) -> Result<(), Error> {
self.sync_state_with_local_storage()?;
self.update_sync_state_metrics_and_logs()?;
self.notify_mempool_of_committed_transactions(committed_transactions)
.await;
if let Some(commit_notification) = commit_notification {
if let Err(error) = self
.notify_consensus_of_commit_response(commit_notification)
.await
{
error!(LogSchema::new(LogEntry::CommitFlow).error(&error));
}
}
self.check_subscriptions();
let synced_version = self.local_state.synced_version();
self.request_manager.remove_requests(synced_version);
if let Some(peer) = chunk_sender {
self.request_manager.process_success_response(peer);
}
if let Some(mut req) = self.sync_request.as_mut() {
req.last_commit_timestamp = SystemTime::now();
}
self.check_initialized_or_sync_request_completed(synced_version)
.await?;
if let Err(error) = self
.executor_proxy
.publish_event_notifications(reconfiguration_events)
{
counters::RECONFIG_PUBLISH_COUNT
.with_label_values(&[counters::FAIL_LABEL])
.inc();
error!(LogSchema::event_log(LogEntry::Reconfig, LogEvent::Fail).error(&error));
}
Ok(())
}
async fn check_initialized_or_sync_request_completed(
&mut self,
synced_version: u64,
) -> Result<(), Error> {
let committed_version = self.local_state.committed_version();
let local_epoch = self.local_state.trusted_epoch();
if self.is_initialized() {
if let Some(initialization_listener) = self.initialization_listener.take() {
info!(LogSchema::event_log(LogEntry::Waypoint, LogEvent::Complete)
.local_li_version(committed_version)
.local_synced_version(synced_version)
.local_epoch(local_epoch));
Self::send_initialization_callback(initialization_listener)?;
}
}
if let Some(sync_request) = self.sync_request.as_ref() {
let sync_target_version = sync_request
.consensus_sync_notification
.target
.ledger_info()
.version();
if synced_version > sync_target_version {
return Err(Error::SyncedBeyondTarget(
synced_version,
sync_target_version,
));
}
if synced_version == sync_target_version {
info!(
LogSchema::event_log(LogEntry::SyncRequest, LogEvent::Complete)
.local_li_version(committed_version)
.local_synced_version(synced_version)
.local_epoch(local_epoch)
);
counters::SYNC_REQUEST_RESULT
.with_label_values(&[counters::COMPLETE_LABEL])
.inc();
if let Some(sync_request) = self.sync_request.take() {
self.send_sync_req_callback(sync_request, Ok(())).await?;
}
}
}
Ok(())
}
async fn notify_mempool_of_committed_transactions(
&mut self,
committed_transactions: Vec<Transaction>,
) {
let block_timestamp_usecs = self
.local_state
.committed_ledger_info()
.ledger_info()
.timestamp_usecs();
let mempool_notifier = self.mempool_notifier.clone();
let mempool_commit_timeout_ms = self.config.mempool_commit_timeout_ms;
let send_notification = mempool_notifier.notify_new_commit(
committed_transactions,
block_timestamp_usecs,
mempool_commit_timeout_ms,
);
if let Err(error) = send_notification.await {
counters::COMMIT_FLOW_FAIL
.with_label_values(&[counters::MEMPOOL_LABEL])
.inc();
error!(LogSchema::new(LogEntry::CommitFlow).error(&error.into()));
}
}
fn update_sync_state_metrics_and_logs(&mut self) -> Result<(), Error> {
let synced_version = self.local_state.synced_version();
let committed_version = self.local_state.committed_version();
let local_epoch = self.local_state.trusted_epoch();
counters::set_version(counters::VersionType::Synced, synced_version);
counters::set_version(counters::VersionType::Committed, committed_version);
counters::EPOCH.set(local_epoch as i64);
counters::set_timestamp(
counters::TimestampType::Synced,
self.executor_proxy.get_version_timestamp(synced_version)?,
);
counters::set_timestamp(
counters::TimestampType::Committed,
self.executor_proxy
.get_version_timestamp(committed_version)?,
);
counters::set_timestamp(
counters::TimestampType::Real,
aptos_infallible::duration_since_epoch().as_micros() as u64,
);
debug!(LogSchema::new(LogEntry::LocalState)
.local_li_version(committed_version)
.local_synced_version(synced_version)
.local_epoch(local_epoch));
Ok(())
}
fn get_sync_state(&mut self, callback: oneshot::Sender<SyncState>) -> Result<(), Error> {
self.sync_state_with_local_storage()?;
match callback.send(self.local_state.clone()) {
Err(error) => Err(Error::CallbackSendFailed(format!(
"Failed to get sync state! Error: {:?}",
error
))),
_ => Ok(()),
}
}
fn process_chunk_request(
&mut self,
peer: PeerNetworkId,
request: GetChunkRequest,
) -> Result<(), Error> {
debug!(
LogSchema::event_log(LogEntry::ProcessChunkRequest, LogEvent::Received)
.peer(&peer)
.chunk_request(request.clone())
.local_li_version(self.local_state.committed_version())
);
fail_point!("state_sync_v1::process_chunk_request", |_| {
Err(crate::error::Error::UnexpectedError(
"Injected error in process_chunk_request".into(),
))
});
self.sync_state_with_local_storage()?;
if let Err(error) = self.verify_chunk_request_is_valid(&request) {
self.request_manager.process_invalid_chunk_request(&peer);
return Err(error);
}
match request.target.clone() {
TargetType::TargetLedgerInfo(li) => {
self.process_request_for_target_and_highest(peer, request, Some(li), None)
}
TargetType::HighestAvailable {
target_li,
timeout_ms,
} => self.process_request_for_target_and_highest(
peer,
request,
target_li,
Some(timeout_ms),
),
TargetType::Waypoint(waypoint_version) => {
self.process_request_for_waypoint(peer, request, waypoint_version)
}
}
}
fn verify_chunk_request_is_valid(&mut self, request: &GetChunkRequest) -> Result<(), Error> {
if let Some(target_version) = request.target.version() {
if target_version < request.known_version {
return Err(Error::InvalidChunkRequest(
"Target version is less than known version! Discarding request.".into(),
));
}
}
if let Some(target_epoch) = request.target.epoch() {
if target_epoch < request.current_epoch {
return Err(Error::InvalidChunkRequest(
"Target epoch is less than current epoch! Discarding request.".into(),
));
}
}
if request.limit == 0 {
return Err(Error::InvalidChunkRequest(
"Chunk request limit is 0. Discarding request.".into(),
));
}
if let TargetType::HighestAvailable {
target_li: _,
timeout_ms,
} = request.target.clone()
{
if timeout_ms == 0 {
return Err(Error::InvalidChunkRequest(
"Long poll timeout is 0. Discarding request.".into(),
));
}
}
Ok(())
}
fn process_request_for_target_and_highest(
&mut self,
peer: PeerNetworkId,
request: GetChunkRequest,
target_li: Option<LedgerInfoWithSignatures>,
timeout_ms: Option<u64>,
) -> Result<(), Error> {
let chunk_limit = std::cmp::min(request.limit, self.config.max_chunk_limit);
let timeout = if let Some(timeout_ms) = timeout_ms {
std::cmp::min(timeout_ms, self.config.max_timeout_ms)
} else {
self.config.max_timeout_ms
};
let local_version = self.local_state.committed_version();
if local_version <= request.known_version {
let expiration_time = SystemTime::now().checked_add(Duration::from_millis(timeout));
if let Some(time) = expiration_time {
let request_info = PendingRequestInfo {
expiration_time: time,
known_version: request.known_version,
request_epoch: request.current_epoch,
target_li,
chunk_limit,
};
self.subscriptions.insert(peer, request_info);
}
return Ok(());
}
let (target_li, highest_li) =
self.calculate_target_and_highest_li(request.current_epoch, target_li, local_version)?;
self.deliver_chunk(
peer,
request.known_version,
ResponseLedgerInfo::ProgressiveLedgerInfo {
target_li,
highest_li,
},
chunk_limit,
)
}
fn calculate_target_and_highest_li(
&mut self,
request_epoch: u64,
request_target_li: Option<LedgerInfoWithSignatures>,
local_version: u64,
) -> Result<(LedgerInfoWithSignatures, Option<LedgerInfoWithSignatures>), Error> {
let target_li = self.choose_response_li(request_epoch, request_target_li)?;
let highest_li = if target_li.ledger_info().version() < local_version
&& target_li.ledger_info().epoch() == self.local_state.trusted_epoch()
{
Some(self.local_state.committed_ledger_info())
} else {
None
};
Ok((target_li, highest_li))
}
fn process_request_for_waypoint(
&mut self,
peer: PeerNetworkId,
request: GetChunkRequest,
waypoint_version: Version,
) -> Result<(), Error> {
let mut limit = std::cmp::min(request.limit, self.config.max_chunk_limit);
if self.local_state.committed_version() < waypoint_version {
return Err(Error::UnexpectedError(format!(
"Local version {} < requested waypoint version {}.",
self.local_state.committed_version(),
waypoint_version
)));
}
if request.known_version >= waypoint_version {
return Err(Error::UnexpectedError(format!(
"Waypoint request version {} is not smaller than waypoint {}",
request.known_version, waypoint_version
)));
}
let waypoint_li = self
.executor_proxy
.get_epoch_ending_ledger_info(waypoint_version)?;
let end_of_epoch_li = if waypoint_li.ledger_info().epoch() > request.current_epoch {
let end_of_epoch_li = self
.executor_proxy
.get_epoch_change_ledger_info(request.current_epoch)?;
if end_of_epoch_li.ledger_info().version() < request.known_version {
return Err(Error::UnexpectedError(format!("Waypoint request's current_epoch (epoch {}, version {}) < waypoint request's known_version {}",
end_of_epoch_li.ledger_info().epoch(),
end_of_epoch_li.ledger_info().version(),
request.known_version,)));
}
let num_txns_until_end_of_epoch =
end_of_epoch_li.ledger_info().version() - request.known_version;
limit = std::cmp::min(limit, num_txns_until_end_of_epoch);
Some(end_of_epoch_li)
} else {
None
};
self.deliver_chunk(
peer,
request.known_version,
ResponseLedgerInfo::LedgerInfoForWaypoint {
waypoint_li,
end_of_epoch_li,
},
limit,
)
}
fn deliver_chunk(
&mut self,
peer: PeerNetworkId,
known_version: u64,
response_li: ResponseLedgerInfo,
limit: u64,
) -> Result<(), Error> {
let txns = self
.executor_proxy
.get_chunk(known_version, limit, response_li.version())?;
let chunk_response = GetChunkResponse::new(response_li, txns);
let log = LogSchema::event_log(LogEntry::ProcessChunkRequest, LogEvent::DeliverChunk)
.chunk_response(chunk_response.clone())
.peer(&peer);
let msg = StateSyncMessage::GetChunkResponse(Box::new(chunk_response));
let send_result = self.request_manager.send_chunk_response(&peer, msg);
let send_result_label = if send_result.is_err() {
counters::SEND_FAIL_LABEL
} else {
debug!(log);
counters::SEND_SUCCESS_LABEL
};
counters::RESPONSES_SENT
.with_label_values(&[
peer.network_id().as_str(),
peer.peer_id().short_str().as_str(),
send_result_label,
])
.inc();
send_result.map_err(|e| {
error!(log.error(&e));
Error::UnexpectedError(format!(
"Network error in sending chunk response to {}",
peer
))
})
}
fn choose_response_li(
&self,
request_epoch: u64,
target: Option<LedgerInfoWithSignatures>,
) -> Result<LedgerInfoWithSignatures, Error> {
let mut target_li = target.unwrap_or_else(|| self.local_state.committed_ledger_info());
let target_epoch = target_li.ledger_info().epoch();
if target_epoch > request_epoch {
let end_of_epoch_li = self
.executor_proxy
.get_epoch_change_ledger_info(request_epoch)?;
debug!(LogSchema::event_log(
LogEntry::ProcessChunkRequest,
LogEvent::PastEpochRequested
)
.old_epoch(request_epoch)
.new_epoch(target_epoch));
target_li = end_of_epoch_li;
}
Ok(target_li)
}
fn apply_chunk(
&mut self,
peer: &PeerNetworkId,
response: GetChunkResponse,
) -> Result<(), Error> {
debug!(
LogSchema::event_log(LogEntry::ProcessChunkResponse, LogEvent::Received)
.chunk_response(response.clone())
.peer(peer)
);
fail_point!("state_sync_v1::apply_chunk", |_| {
Err(crate::error::Error::UnexpectedError(
"Injected error in apply_chunk".into(),
))
});
let txn_list_with_proof = response.txn_list_with_proof.clone();
let chunk_size = response.txn_list_with_proof.transactions.len() as u64;
let known_version = self.local_state.synced_version();
match response.response_li {
ResponseLedgerInfo::VerifiableLedgerInfo(li) => {
self.process_response_with_target_and_highest(txn_list_with_proof, li, None)
}
ResponseLedgerInfo::ProgressiveLedgerInfo {
target_li,
highest_li,
} => {
let highest_li = highest_li.unwrap_or_else(|| target_li.clone());
self.process_response_with_target_and_highest(
txn_list_with_proof,
target_li,
Some(highest_li),
)
}
ResponseLedgerInfo::LedgerInfoForWaypoint {
waypoint_li,
end_of_epoch_li,
} => self.process_response_with_waypoint_li(
txn_list_with_proof,
waypoint_li,
end_of_epoch_li,
),
}
.map_err(|error| {
self.request_manager.process_invalid_chunk(peer);
Error::ProcessInvalidChunk(error.to_string())
})?;
counters::STATE_SYNC_CHUNK_SIZE
.with_label_values(&[
peer.network_id().as_str(),
peer.peer_id().short_str().as_str(),
])
.observe(chunk_size as f64);
let new_version = known_version
.checked_add(chunk_size)
.ok_or_else(|| Error::IntegerOverflow("New version has overflown!".into()))?;
debug!(
LogSchema::event_log(LogEntry::ProcessChunkResponse, LogEvent::ApplyChunkSuccess),
"Applied chunk of size {}. Previous version: {}, new version {}",
chunk_size,
known_version,
new_version
);
match self.request_manager.get_first_request_time(known_version) {
None => {
info!(
LogSchema::event_log(LogEntry::ProcessChunkResponse, LogEvent::ReceivedChunkWithoutRequest),
"Received a chunk of size {}, without making a request! Previous version: {}, new version {}",
chunk_size,
known_version,
new_version
);
}
Some(first_request_time) => {
if let Ok(duration) = SystemTime::now().duration_since(first_request_time) {
counters::SYNC_PROGRESS_DURATION.observe_duration(duration);
}
}
}
Ok(())
}
async fn process_chunk_response(
&mut self,
peer: &PeerNetworkId,
response: GetChunkResponse,
) -> Result<(), Error> {
let error = if self.is_consensus_executing() {
Some(Error::ConsensusIsExecuting)
} else if self.read_only_mode {
Some(Error::ReadOnlyMode(
"Received an unrequested chunk response!".into(),
))
} else {
None
};
if let Some(error) = error {
error!(LogSchema::new(LogEntry::ProcessChunkResponse,)
.peer(peer)
.error(&error));
return Err(error);
}
self.verify_chunk_response_is_valid(peer, &response)?;
match self.apply_chunk(peer, response.clone()) {
Ok(()) => {
counters::APPLY_CHUNK_COUNT
.with_label_values(&[
peer.network_id().as_str(),
peer.peer_id().short_str().as_str(),
counters::SUCCESS_LABEL,
])
.inc();
}
Err(error) => {
error!(LogSchema::event_log(
LogEntry::ProcessChunkResponse,
LogEvent::ApplyChunkFail
)
.peer(peer)
.error(&error));
counters::APPLY_CHUNK_COUNT
.with_label_values(&[
peer.network_id().as_str(),
peer.peer_id().short_str().as_str(),
counters::FAIL_LABEL,
])
.inc();
return Err(error);
}
}
self.process_commit_notification(
response.txn_list_with_proof.transactions.clone(),
vec![],
None,
Some(peer),
)
.await
.map_err(|error| {
error!(
LogSchema::event_log(LogEntry::ProcessChunkResponse, LogEvent::PostCommitFail)
.peer(peer)
.error(&error)
);
error
})
}
fn verify_chunk_response_is_valid(
&mut self,
peer: &PeerNetworkId,
response: &GetChunkResponse,
) -> Result<(), Error> {
if !self.request_manager.is_known_state_sync_peer(peer) {
counters::RESPONSE_FROM_DOWNSTREAM_COUNT
.with_label_values(&[
peer.network_id().as_str(),
peer.peer_id().short_str().as_str(),
])
.inc();
self.request_manager.process_chunk_from_downstream(peer);
return Err(Error::ReceivedChunkFromDownstream(peer.to_string()));
}
if let Some(first_chunk_version) = response.txn_list_with_proof.first_transaction_version {
let known_version = self.local_state.synced_version();
let expected_version = known_version
.checked_add(1)
.ok_or_else(|| Error::IntegerOverflow("Expected version has overflown!".into()))?;
if first_chunk_version != expected_version {
self.request_manager.process_chunk_version_mismatch(
peer,
first_chunk_version,
known_version,
)?;
}
} else {
self.request_manager.process_empty_chunk(peer);
return Err(Error::ReceivedEmptyChunk(peer.to_string()));
}
match &response.response_li {
ResponseLedgerInfo::LedgerInfoForWaypoint {
waypoint_li,
end_of_epoch_li,
} => self.verify_response_with_waypoint_li(waypoint_li, end_of_epoch_li),
ResponseLedgerInfo::VerifiableLedgerInfo(response_li) => {
self.verify_response_with_target_and_highest(response_li, &None)
}
ResponseLedgerInfo::ProgressiveLedgerInfo {
target_li,
highest_li,
} => self.verify_response_with_target_and_highest(target_li, highest_li),
}
}
fn verify_response_with_target_and_highest(
&mut self,
target_li: &LedgerInfoWithSignatures,
highest_li: &Option<LedgerInfoWithSignatures>,
) -> Result<(), Error> {
if !self.is_initialized() {
return Err(Error::ReceivedWrongChunkType(
"Received a progressive ledger info, but we're not initialized!".into(),
));
}
if let Some(sync_request) = self.sync_request.as_ref() {
let sync_request_version = sync_request
.consensus_sync_notification
.target
.ledger_info()
.version();
let response_version = target_li.ledger_info().version();
if sync_request_version < response_version {
let error_message = format!("Verifiable ledger info version is higher than the sync target. Received: {}, requested: {}.",
response_version,
sync_request_version);
return Err(Error::ProcessInvalidChunk(error_message));
}
}
if let Some(highest_li) = highest_li {
let target_version = target_li.ledger_info().version();
let highest_version = highest_li.ledger_info().version();
if target_version > highest_version {
let error_message = format!("Progressive ledger info has target version > highest version. Target: {}, highest: {}.",
target_version,
highest_version);
return Err(Error::ProcessInvalidChunk(error_message));
}
}
Ok(())
}
fn verify_response_with_waypoint_li(
&mut self,
waypoint_li: &LedgerInfoWithSignatures,
end_of_epoch_li: &Option<LedgerInfoWithSignatures>,
) -> Result<(), Error> {
if self.is_initialized() || self.sync_request.is_some() {
return Err(Error::ReceivedWrongChunkType(
"Received a waypoint ledger info, but we're already initialized!".into(),
));
}
if let Some(end_of_epoch_li) = end_of_epoch_li {
let end_of_epoch_version = end_of_epoch_li.ledger_info().version();
let waypoint_version = waypoint_li.ledger_info().version();
if end_of_epoch_version > waypoint_version {
let error_message = format!("Waypoint ledger info version is less than the end_of_epoch_li version. Waypoint: {}, end_of_epoch_li: {}.",
waypoint_version,
end_of_epoch_version);
return Err(Error::ProcessInvalidChunk(error_message));
}
}
Ok(())
}
fn log_highest_seen_version(&self, new_highest_li: Option<LedgerInfoWithSignatures>) {
let current_highest_version = if !self.is_initialized() {
self.waypoint.version()
} else if let Some(sync_request) = self.sync_request.as_ref() {
sync_request
.consensus_sync_notification
.target
.ledger_info()
.version()
} else if let Some(new_highest_li) = new_highest_li.as_ref() {
new_highest_li.ledger_info().version()
} else if let Some(target_ledger_info) = self.target_ledger_info.as_ref() {
target_ledger_info.ledger_info().version()
} else {
self.local_state.synced_version()
};
let highest_seen_version = counters::get_version(counters::VersionType::Highest);
let highest_version = cmp::max(current_highest_version, highest_seen_version);
counters::set_version(counters::VersionType::Highest, highest_version);
}
fn calculate_new_known_version_and_epoch(
&mut self,
txn_list_with_proof: TransactionListWithProof,
ledger_info: Option<LedgerInfoWithSignatures>,
) -> Result<(u64, u64), Error> {
let new_version = self
.local_state
.synced_version()
.checked_add(txn_list_with_proof.transactions.len() as u64)
.ok_or_else(|| {
Error::IntegerOverflow("Potential state sync version has overflown".into())
})?;
let mut new_epoch = self.local_state.trusted_epoch();
if let Some(ledger_info) = ledger_info {
if ledger_info.ledger_info().version() == new_version
&& ledger_info.ledger_info().ends_epoch()
{
new_epoch = new_epoch.checked_add(1).ok_or_else(|| {
Error::IntegerOverflow("Potential state sync epoch has overflown".into())
})?;
}
}
Ok((new_version, new_epoch))
}
fn create_highest_available_chunk_target(
&self,
target_ledger_info: Option<LedgerInfoWithSignatures>,
) -> TargetType {
TargetType::HighestAvailable {
target_li: target_ledger_info,
timeout_ms: self.config.long_poll_timeout_ms,
}
}
fn create_sync_request_chunk_target(&self, known_version: u64) -> Result<TargetType, Error> {
if let Some(sync_request) = &self.sync_request {
let target_version = sync_request
.consensus_sync_notification
.target
.ledger_info()
.version();
if target_version <= known_version {
Err(Error::SyncedBeyondTarget(known_version, target_version))
} else {
let chunk_target = self.create_highest_available_chunk_target(Some(
sync_request.consensus_sync_notification.target.clone(),
));
Ok(chunk_target)
}
} else {
Err(Error::NoSyncRequestFound(
"Unable to create a sync request chunk target".into(),
))
}
}
fn create_waypoint_chunk_target(&self) -> TargetType {
let waypoint_version = self.waypoint.version();
TargetType::Waypoint(waypoint_version)
}
fn process_response_with_target_and_highest(
&mut self,
txn_list_with_proof: TransactionListWithProof,
response_li: LedgerInfoWithSignatures,
new_highest_li: Option<LedgerInfoWithSignatures>,
) -> Result<(), Error> {
let (known_version, known_epoch) = self.calculate_new_known_version_and_epoch(
txn_list_with_proof.clone(),
Some(response_li.clone()),
)?;
if self.sync_request.is_some() {
match self.create_sync_request_chunk_target(known_version) {
Ok(chunk_target) => {
let _ = self.send_chunk_request_and_log_error(
known_version,
known_epoch,
chunk_target,
LogEntry::ProcessChunkResponse,
);
}
Err(error) => {
error!(LogSchema::new(LogEntry::SendChunkRequest).error(&error));
}
}
} else {
let mut new_target_ledger_info = None;
if let Some(target_ledger_info) = self.target_ledger_info.clone() {
if known_version < target_ledger_info.ledger_info().version() {
new_target_ledger_info = Some(target_ledger_info);
}
}
let _ = self.send_chunk_request_and_log_error(
known_version,
known_epoch,
self.create_highest_available_chunk_target(new_target_ledger_info),
LogEntry::ProcessChunkResponse,
);
}
self.local_state.verify_ledger_info(&response_li)?;
if let Some(new_highest_li) = new_highest_li.clone() {
if new_highest_li != response_li {
self.local_state.verify_ledger_info(&new_highest_li)?;
}
}
self.log_highest_seen_version(new_highest_li.clone());
self.validate_and_store_chunk(txn_list_with_proof, response_li, None)?;
self.sync_state_with_local_storage()?;
let synced_version = self.local_state.synced_version();
if let Some(target_ledger_info) = &self.target_ledger_info {
if synced_version >= target_ledger_info.ledger_info().version() {
self.target_ledger_info = None;
}
}
if self.target_ledger_info.is_none() {
if let Some(new_highest_li) = new_highest_li {
if synced_version < new_highest_li.ledger_info().version() {
self.target_ledger_info = Some(new_highest_li);
}
}
}
Ok(())
}
fn process_response_with_waypoint_li(
&mut self,
txn_list_with_proof: TransactionListWithProof,
waypoint_li: LedgerInfoWithSignatures,
end_of_epoch_li: Option<LedgerInfoWithSignatures>,
) -> Result<(), Error> {
let (known_version, known_epoch) = self.calculate_new_known_version_and_epoch(
txn_list_with_proof.clone(),
end_of_epoch_li.clone(),
)?;
if known_version < self.waypoint.version() {
let _ = self.send_chunk_request_and_log_error(
known_version,
known_epoch,
self.create_waypoint_chunk_target(),
LogEntry::ProcessChunkResponse,
);
}
let end_of_epoch_li_to_commit = if let Some(end_of_epoch_li) = end_of_epoch_li {
self.local_state.verify_ledger_info(&end_of_epoch_li)?;
let ledger_info = end_of_epoch_li.ledger_info();
if !ledger_info.ends_epoch() {
return Err(Error::ProcessInvalidChunk(
"Received waypoint ledger info with an end_of_epoch_li that does not end the epoch!".into(),
));
}
if ledger_info.version() == known_version {
Some(end_of_epoch_li)
} else {
None
}
} else {
None
};
self.waypoint
.verify(waypoint_li.ledger_info())
.map_err(|error| {
Error::UnexpectedError(format!("Waypoint verification failed: {}", error))
})?;
self.validate_and_store_chunk(txn_list_with_proof, waypoint_li, end_of_epoch_li_to_commit)?;
self.log_highest_seen_version(None);
Ok(())
}
fn validate_and_store_chunk(
&mut self,
txn_list_with_proof: TransactionListWithProof,
target: LedgerInfoWithSignatures,
intermediate_end_of_epoch_li: Option<LedgerInfoWithSignatures>,
) -> Result<(), Error> {
let target_epoch = target.ledger_info().epoch();
let target_version = target.ledger_info().version();
let local_epoch = self.local_state.committed_epoch();
let local_version = self.local_state.committed_version();
if (target_epoch, target_version) <= (local_epoch, local_version) {
warn!(
LogSchema::event_log(LogEntry::ProcessChunkResponse, LogEvent::OldResponseLI)
.local_li_version(local_version)
.local_epoch(local_epoch),
response_li_version = target_version,
response_li_epoch = target_epoch
);
return Ok(());
}
self.executor_proxy
.execute_chunk(txn_list_with_proof, target, intermediate_end_of_epoch_li)
}
fn is_consensus_executing(&self) -> bool {
self.is_initialized() && self.role == RoleType::Validator && self.sync_request.is_none()
}
fn check_progress(&mut self) -> Result<(), Error> {
if self.is_consensus_executing() || self.read_only_mode {
return Ok(()); }
if let Some(sync_request) = self.sync_request.as_ref() {
let timeout_between_commits =
Duration::from_millis(self.config.sync_request_timeout_ms);
let commit_deadline = sync_request
.last_commit_timestamp
.checked_add(timeout_between_commits)
.ok_or_else(|| {
Error::IntegerOverflow("The commit deadline timestamp has overflown!".into())
})?;
if SystemTime::now().duration_since(commit_deadline).is_ok() {
counters::SYNC_REQUEST_RESULT
.with_label_values(&[counters::TIMEOUT_LABEL])
.inc();
warn!(LogSchema::event_log(
LogEntry::SyncRequest,
LogEvent::Timeout
));
if let Some(sync_request) = self.sync_request.take() {
if let Err(e) = block_on(self.send_sync_req_callback(
sync_request,
Err(Error::UnexpectedError("Sync request timed out!".into())),
)) {
error!(
LogSchema::event_log(LogEntry::SyncRequest, LogEvent::CallbackFail)
.error(&e)
);
}
}
}
}
let known_version = self.local_state.synced_version();
if self.request_manager.has_request_timed_out(known_version)? {
counters::TIMEOUT.inc();
warn!(LogSchema::new(LogEntry::Timeout).version(known_version));
let trusted_epoch = self.local_state.trusted_epoch();
let chunk_target = if !self.is_initialized() {
self.create_waypoint_chunk_target()
} else if self.sync_request.is_some() {
self.create_sync_request_chunk_target(known_version)?
} else {
self.create_highest_available_chunk_target(self.target_ledger_info.clone())
};
self.send_chunk_request_and_log_error(
known_version,
trusted_epoch,
chunk_target,
LogEntry::Timeout,
)
} else {
Ok(())
}
}
fn send_chunk_request_and_log_error(
&mut self,
known_version: u64,
known_epoch: u64,
chunk_target: TargetType,
log_entry: LogEntry,
) -> Result<(), Error> {
if let Err(error) =
self.send_chunk_request_with_target(known_version, known_epoch, chunk_target)
{
error!(
LogSchema::event_log(log_entry, LogEvent::SendChunkRequestFail)
.version(known_version)
.local_epoch(known_epoch)
.error(&error)
);
Err(error)
} else {
Ok(())
}
}
fn send_chunk_request_with_target(
&mut self,
known_version: u64,
known_epoch: u64,
target: TargetType,
) -> Result<(), Error> {
if self.request_manager.no_available_peers() {
warn!(LogSchema::event_log(
LogEntry::SendChunkRequest,
LogEvent::MissingPeers
));
return Err(Error::NoAvailablePeers(
"No peers to send chunk request to!".into(),
));
}
let target_version = target
.version()
.unwrap_or_else(|| known_version.wrapping_add(1));
counters::set_version(counters::VersionType::Target, target_version);
let req = GetChunkRequest::new(known_version, known_epoch, self.config.chunk_limit, target);
self.request_manager.send_chunk_request(req)
}
fn deliver_subscription(
&mut self,
peer: PeerNetworkId,
request_info: PendingRequestInfo,
local_version: u64,
) -> Result<(), Error> {
let (target_li, highest_li) = self.calculate_target_and_highest_li(
request_info.request_epoch,
request_info.target_li,
local_version,
)?;
self.deliver_chunk(
peer,
request_info.known_version,
ResponseLedgerInfo::ProgressiveLedgerInfo {
target_li,
highest_li,
},
request_info.chunk_limit,
)
}
fn check_subscriptions(&mut self) {
let highest_li_version = self.local_state.committed_version();
let mut ready = vec![];
self.subscriptions.retain(|peer, request_info| {
if SystemTime::now()
.duration_since(request_info.expiration_time)
.is_ok()
{
return false;
}
if request_info.known_version < highest_li_version {
ready.push((*peer, request_info.clone()));
false
} else {
true
}
});
ready.into_iter().for_each(|(peer, request_info)| {
let result_label = if let Err(err) =
self.deliver_subscription(peer, request_info, highest_li_version)
{
error!(LogSchema::new(LogEntry::SubscriptionDeliveryFail)
.peer(&peer)
.error(&err));
counters::FAIL_LABEL
} else {
counters::SUCCESS_LABEL
};
counters::SUBSCRIPTION_DELIVERY_COUNT
.with_label_values(&[
peer.network_id().as_str(),
peer.peer_id().short_str().as_str(),
result_label,
])
.inc();
});
}
async fn send_sync_req_callback(
&mut self,
sync_req: SyncRequest,
msg: Result<(), Error>,
) -> Result<(), Error> {
let msg = msg.map_err(|error| {
consensus_notifications::Error::UnexpectedErrorEncountered(format!("{:?}", error))
});
self.consensus_listener
.respond_to_sync_notification(sync_req.consensus_sync_notification, msg)
.await
.map_err(|error| {
counters::FAILED_CHANNEL_SEND
.with_label_values(&[counters::CONSENSUS_SYNC_REQ_CALLBACK])
.inc();
Error::UnexpectedError(format!(
"Consensus sync request callback error: {:?}",
error
))
})
}
fn send_initialization_callback(
callback: oneshot::Sender<Result<(), Error>>,
) -> Result<(), Error> {
match callback.send(Ok(())) {
Err(error) => {
counters::FAILED_CHANNEL_SEND
.with_label_values(&[counters::WAYPOINT_INIT_CALLBACK])
.inc();
Err(Error::CallbackSendFailed(format!(
"Waypoint initialization callback error - failed to send following msg: {:?}",
error
)))
}
_ => Ok(()),
}
}
}
#[cfg(test)]
mod tests {
use crate::{
chunk_request::{GetChunkRequest, TargetType},
chunk_response::{GetChunkResponse, ResponseLedgerInfo},
coordinator::StateSyncCoordinator,
error::Error,
executor_proxy::ExecutorProxy,
network::StateSyncMessage,
shared_components::{test_utils, test_utils::create_coordinator_with_config_and_waypoint},
};
use aptos_config::{
config::{NodeConfig, PeerRole, RoleType},
network_id::{NetworkId, PeerNetworkId},
};
use aptos_crypto::{
ed25519::{Ed25519PrivateKey, Ed25519Signature},
HashValue, PrivateKey, Uniform,
};
use aptos_types::{
account_address::AccountAddress,
block_info::BlockInfo,
chain_id::ChainId,
contract_event::ContractEvent,
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
proof::TransactionInfoListWithProof,
transaction::{
RawTransaction, Script, SignedTransaction, Transaction, TransactionListWithProof,
TransactionPayload, Version,
},
waypoint::Waypoint,
PeerId,
};
use claim::{assert_err, assert_matches, assert_ok};
use consensus_notifications::{
ConsensusCommitNotification, ConsensusNotificationResponse, ConsensusSyncNotification,
};
use executor_types::ChunkExecutorTrait;
use futures::{channel::oneshot, executor::block_on};
use mempool_notifications::MempoolNotifier;
use netcore::transport::ConnectionOrigin;
use network::transport::ConnectionMetadata;
use std::collections::BTreeMap;
#[test]
fn test_process_sync_request() {
let mut read_only_coordinator = test_utils::create_read_only_coordinator();
let (sync_request, _) = create_sync_notification_at_version(0);
let process_result = block_on(read_only_coordinator.process_sync_request(sync_request));
assert_matches!(process_result, Err(Error::ReadOnlyMode(_)));
let mut full_node_coordinator = test_utils::create_full_node_coordinator();
let (sync_request, _) = create_sync_notification_at_version(0);
let process_result = block_on(full_node_coordinator.process_sync_request(sync_request));
assert_matches!(process_result, Err(Error::FullNodeSyncRequest));
let mut validator_coordinator = test_utils::create_validator_coordinator();
let (sync_request, mut callback_receiver) = create_sync_notification_at_version(0);
assert_ok!(block_on(
validator_coordinator.process_sync_request(sync_request)
));
match callback_receiver.try_recv() {
Ok(Some(notification_result)) => assert_ok!(notification_result.result),
result => panic!("Expected okay but got: {:?}", result),
};
let waypoint_version = 10;
let waypoint_ledger_info = create_ledger_info_at_version(waypoint_version);
let waypoint = Waypoint::new_any(waypoint_ledger_info.ledger_info());
let mut validator_coordinator =
create_coordinator_with_config_and_waypoint(NodeConfig::default(), waypoint);
let (sync_request, mut callback_receiver) = create_sync_notification_at_version(10);
let process_result = block_on(validator_coordinator.process_sync_request(sync_request));
assert_matches!(process_result, Err(Error::UninitializedError(_)));
let callback_result = callback_receiver.try_recv();
assert_err!(callback_result);
}
#[test]
fn test_get_sync_state() {
let mut validator_coordinator = test_utils::create_validator_coordinator();
let (callback_sender, mut callback_receiver) = oneshot::channel();
assert_ok!(validator_coordinator.get_sync_state(callback_sender));
match callback_receiver.try_recv() {
Ok(Some(sync_state)) => {
assert_eq!(sync_state.committed_version(), 0);
}
result => panic!("Expected okay but got: {:?}", result),
};
let (callback_sender, _) = oneshot::channel();
let sync_state_result = validator_coordinator.get_sync_state(callback_sender);
assert_matches!(sync_state_result, Err(Error::CallbackSendFailed(_)));
}
#[test]
fn test_wait_for_initialization() {
let mut read_only_coordinator = test_utils::create_read_only_coordinator();
let (callback_sender, mut callback_receiver) = oneshot::channel();
assert_matches!(
read_only_coordinator.wait_for_initialization(callback_sender),
Err(Error::ReadOnlyMode(_))
);
match callback_receiver.try_recv() {
Ok(Some(result)) => assert_matches!(result, Err(Error::ReadOnlyMode(_))),
result => panic!("Expected read-only error but got: {:?}", result),
};
let mut validator_coordinator = test_utils::create_validator_coordinator();
let (callback_sender, mut callback_receiver) = oneshot::channel();
assert_ok!(validator_coordinator.wait_for_initialization(callback_sender));
match callback_receiver.try_recv() {
Ok(Some(result)) => assert_ok!(result),
result => panic!("Expected okay but got: {:?}", result),
};
let (callback_sender, _) = oneshot::channel();
let initialization_result = validator_coordinator.wait_for_initialization(callback_sender);
assert_matches!(initialization_result, Err(Error::CallbackSendFailed(_)));
let waypoint_version = 10;
let waypoint_ledger_info = create_ledger_info_at_version(waypoint_version);
let waypoint = Waypoint::new_any(waypoint_ledger_info.ledger_info());
let mut validator_coordinator =
create_coordinator_with_config_and_waypoint(NodeConfig::default(), waypoint);
let (callback_sender, mut callback_receiver) = oneshot::channel();
assert_ok!(validator_coordinator.wait_for_initialization(callback_sender));
let callback_result = callback_receiver.try_recv();
assert_matches!(callback_result, Ok(None));
}
#[tokio::test]
async fn test_process_commit_notification() {
let mut validator_coordinator = test_utils::create_validator_coordinator();
assert_ok!(block_on(validator_coordinator.process_commit_notification(
vec![],
vec![],
None,
None
)));
let (commit_notification, mut callback_receiver) =
create_commit_notification(vec![], vec![]);
assert_ok!(block_on(validator_coordinator.process_commit_notification(
vec![],
vec![],
Some(commit_notification),
None,
)));
match callback_receiver.try_recv() {
Ok(Some(notification_result)) => {
assert_ok!(notification_result.result);
}
callback_result => panic!("Expected an okay result but got: {:?}", callback_result),
};
let committed_transactions = vec![create_test_transaction()];
let (commit_notification, _callback_receiver) =
create_commit_notification(committed_transactions.clone(), vec![]);
assert_ok!(block_on(validator_coordinator.process_commit_notification(
committed_transactions,
vec![],
Some(commit_notification),
None,
)));
}
#[test]
fn test_check_progress() {
let mut read_only_coordinator = test_utils::create_read_only_coordinator();
assert_ok!(read_only_coordinator.check_progress());
let mut validator_coordinator = test_utils::create_validator_coordinator();
assert_ok!(validator_coordinator.check_progress());
let (sync_request, _) = create_sync_notification_at_version(1);
let _ = block_on(validator_coordinator.process_sync_request(sync_request));
let progress_result = validator_coordinator.check_progress();
assert_matches!(progress_result, Err(Error::NoAvailablePeers(_)));
let mut node_config = NodeConfig::default();
node_config.base.role = RoleType::Validator;
node_config.state_sync.sync_request_timeout_ms = 0;
let mut validator_coordinator =
create_coordinator_with_config_and_waypoint(node_config, Waypoint::default());
let (sync_request, mut callback_receiver) = create_sync_notification_at_version(1);
let _ = block_on(validator_coordinator.process_sync_request(sync_request));
assert_err!(validator_coordinator.check_progress());
match callback_receiver.try_recv() {
Ok(Some(notification_result)) => {
assert_err!(notification_result.result);
}
callback_result => panic!("Expected an error result but got: {:?}", callback_result),
};
}
#[test]
fn test_new_and_lost_peers() {
let mut validator_coordinator = test_utils::create_validator_coordinator();
let network_id = NetworkId::Public;
let peer_id = PeerId::random();
let connection_metadata = ConnectionMetadata::mock_with_role_and_origin(
peer_id,
PeerRole::Validator,
ConnectionOrigin::Inbound,
);
let new_peer_result =
validator_coordinator.process_new_peer(network_id, connection_metadata.clone());
assert_matches!(new_peer_result, Err(Error::InvalidStateSyncPeer(..)));
let network_id = NetworkId::Validator;
assert_ok!(validator_coordinator.process_new_peer(network_id, connection_metadata));
assert_ok!(validator_coordinator.process_lost_peer(network_id, peer_id));
}
#[test]
fn test_invalid_chunk_request_messages() {
let mut validator_coordinator = test_utils::create_validator_coordinator();
let peer_network_id = PeerNetworkId::random();
let current_epoch = 0;
let chunk_limit = 250;
let timeout_ms = 1000;
let known_version = 100;
let target_version = 10;
let chunk_requests = create_chunk_requests(
known_version,
current_epoch,
chunk_limit,
target_version,
timeout_ms,
);
verify_all_chunk_requests_are_invalid(
&mut validator_coordinator,
&peer_network_id,
&chunk_requests,
);
let known_version = 0;
let current_epoch = 100;
let chunk_requests = create_chunk_requests(
known_version,
current_epoch,
chunk_limit,
target_version,
timeout_ms,
);
verify_all_chunk_requests_are_invalid(
&mut validator_coordinator,
&peer_network_id,
&chunk_requests[1..2], );
let chunk_limit = 0;
let chunk_requests = create_chunk_requests(
known_version,
current_epoch,
chunk_limit,
target_version,
timeout_ms,
);
verify_all_chunk_requests_are_invalid(
&mut validator_coordinator,
&peer_network_id,
&chunk_requests,
);
let chunk_limit = 0;
let chunk_requests = create_chunk_requests(
known_version,
current_epoch,
chunk_limit,
target_version,
timeout_ms,
);
verify_all_chunk_requests_are_invalid(
&mut validator_coordinator,
&peer_network_id,
&chunk_requests,
);
}
#[test]
fn test_process_chunk_response_read_only() {
let mut read_only_coordinator = test_utils::create_validator_coordinator();
let (sync_request, _) = create_sync_notification_at_version(10);
let _ = block_on(read_only_coordinator.process_sync_request(sync_request));
read_only_coordinator.read_only_mode = true;
let peer_network_id = PeerNetworkId::random_validator();
let empty_chunk_responses = create_empty_chunk_responses(10);
for chunk_response in &empty_chunk_responses {
let result = block_on(read_only_coordinator.process_chunk_message(
peer_network_id.network_id(),
peer_network_id.peer_id(),
chunk_response.clone(),
));
assert_matches!(result, Err(Error::ReadOnlyMode(_)));
}
}
#[test]
fn test_process_chunk_response_messages() {
let mut validator_coordinator = test_utils::create_validator_coordinator();
let peer_network_id = PeerNetworkId::random_validator();
let empty_chunk_responses = create_empty_chunk_responses(10);
for chunk_response in &empty_chunk_responses {
let result = block_on(validator_coordinator.process_chunk_message(
peer_network_id.network_id(),
peer_network_id.peer_id(),
chunk_response.clone(),
));
assert_matches!(result, Err(Error::ConsensusIsExecuting));
}
let (sync_request, _) = create_sync_notification_at_version(10);
let _ = block_on(validator_coordinator.process_sync_request(sync_request));
for chunk_response in &empty_chunk_responses {
let result = block_on(validator_coordinator.process_chunk_message(
peer_network_id.network_id(),
peer_network_id.peer_id(),
chunk_response.clone(),
));
assert_matches!(result, Err(Error::ReceivedChunkFromDownstream(_)));
}
process_new_peer_event(&mut validator_coordinator, &peer_network_id);
for chunk_response in &empty_chunk_responses {
let result = block_on(validator_coordinator.process_chunk_message(
peer_network_id.network_id(),
peer_network_id.peer_id(),
chunk_response.clone(),
));
assert_matches!(result, Err(Error::ReceivedEmptyChunk(_)));
}
let chunk_responses = create_non_empty_chunk_responses(10);
for chunk_response in &chunk_responses {
let result = block_on(validator_coordinator.process_chunk_message(
peer_network_id.network_id(),
peer_network_id.peer_id(),
chunk_response.clone(),
));
assert_matches!(result, Err(Error::ReceivedNonSequentialChunk(..)));
}
}
#[test]
fn test_process_chunk_response_highest() {
let mut full_node_coordinator = test_utils::create_full_node_coordinator();
let peer_network_id = PeerNetworkId::random_validator();
process_new_peer_event(&mut full_node_coordinator, &peer_network_id);
let chunk_responses = create_non_empty_chunk_responses(1);
verify_all_chunk_responses_are_the_wrong_type(
&mut full_node_coordinator,
&peer_network_id,
&chunk_responses[0..1], );
let response_ledger_info = ResponseLedgerInfo::ProgressiveLedgerInfo {
target_li: create_ledger_info_at_version(100),
highest_li: Some(create_ledger_info_at_version(10)),
};
let highest_response = create_chunk_response_message(
response_ledger_info,
create_dummy_transaction_list_with_proof(1),
);
verify_all_chunk_responses_are_invalid(
&mut full_node_coordinator,
&peer_network_id,
&[highest_response],
);
let response_ledger_info = ResponseLedgerInfo::ProgressiveLedgerInfo {
target_li: create_ledger_info_at_version(100),
highest_li: None,
};
let highest_response = create_chunk_response_message(
response_ledger_info,
create_dummy_transaction_list_with_proof(1),
);
verify_all_chunk_responses_are_invalid(
&mut full_node_coordinator,
&peer_network_id,
&[highest_response],
);
}
#[test]
fn test_process_chunk_response_target() {
let mut validator_coordinator = test_utils::create_validator_coordinator();
let peer_network_id = PeerNetworkId::random_validator();
process_new_peer_event(&mut validator_coordinator, &peer_network_id);
let (sync_request, _) = create_sync_notification_at_version(10);
let _ = block_on(validator_coordinator.process_sync_request(sync_request));
let chunk_responses = create_non_empty_chunk_responses(1);
verify_all_chunk_responses_are_the_wrong_type(
&mut validator_coordinator,
&peer_network_id,
&chunk_responses[0..1], );
let ledger_info = create_ledger_info_at_version(100);
let response_ledger_info = ResponseLedgerInfo::VerifiableLedgerInfo(ledger_info);
let target_response = create_chunk_response_message(
response_ledger_info,
create_dummy_transaction_list_with_proof(1),
);
verify_all_chunk_responses_are_invalid(
&mut validator_coordinator,
&peer_network_id,
&[target_response],
);
let ledger_info = create_ledger_info_at_version(5);
let response_ledger_info = ResponseLedgerInfo::VerifiableLedgerInfo(ledger_info);
let target_response = create_chunk_response_message(
response_ledger_info,
create_dummy_transaction_list_with_proof(1),
);
verify_all_chunk_responses_are_invalid(
&mut validator_coordinator,
&peer_network_id,
&[target_response],
);
}
#[test]
fn test_process_chunk_response_waypoint() {
let waypoint_ledger_info = create_ledger_info_at_version(10);
let waypoint = Waypoint::new_any(waypoint_ledger_info.ledger_info());
let mut validator_coordinator =
create_coordinator_with_config_and_waypoint(NodeConfig::default(), waypoint);
let peer_network_id = PeerNetworkId::random_validator();
process_new_peer_event(&mut validator_coordinator, &peer_network_id);
let chunk_responses = create_non_empty_chunk_responses(1);
verify_all_chunk_responses_are_the_wrong_type(
&mut validator_coordinator,
&peer_network_id,
&chunk_responses[1..=2], );
let response_ledger_info = ResponseLedgerInfo::LedgerInfoForWaypoint {
waypoint_li: create_ledger_info_at_version(10),
end_of_epoch_li: Some(create_ledger_info_at_version(100)),
};
let waypoint_response = create_chunk_response_message(
response_ledger_info,
create_dummy_transaction_list_with_proof(1),
);
verify_all_chunk_responses_are_invalid(
&mut validator_coordinator,
&peer_network_id,
&[waypoint_response],
);
let response_ledger_info = ResponseLedgerInfo::LedgerInfoForWaypoint {
waypoint_li: create_ledger_info_at_version(10),
end_of_epoch_li: Some(create_ledger_info_at_version(10)),
};
let waypoint_response = create_chunk_response_message(
response_ledger_info,
create_dummy_transaction_list_with_proof(1),
);
verify_all_chunk_responses_are_invalid(
&mut validator_coordinator,
&peer_network_id,
&[waypoint_response],
);
}
fn create_test_transaction() -> Transaction {
let private_key = Ed25519PrivateKey::generate_for_testing();
let public_key = private_key.public_key();
let transaction_payload = TransactionPayload::Script(Script::new(vec![], vec![], vec![]));
let raw_transaction = RawTransaction::new(
AccountAddress::random(),
0,
transaction_payload,
0,
0,
0,
ChainId::new(10),
);
let signed_transaction = SignedTransaction::new(
raw_transaction,
public_key,
Ed25519Signature::dummy_signature(),
);
Transaction::UserTransaction(signed_transaction)
}
fn create_ledger_info_at_version(version: Version) -> LedgerInfoWithSignatures {
let block_info =
BlockInfo::new(0, 0, HashValue::zero(), HashValue::zero(), version, 0, None);
let ledger_info = LedgerInfo::new(block_info, HashValue::random());
LedgerInfoWithSignatures::new(ledger_info, BTreeMap::new())
}
fn create_commit_notification(
transactions: Vec<Transaction>,
reconfiguration_events: Vec<ContractEvent>,
) -> (
ConsensusCommitNotification,
oneshot::Receiver<ConsensusNotificationResponse>,
) {
let (commit_notification, callback_receiver) =
ConsensusCommitNotification::new(transactions, reconfiguration_events);
(commit_notification, callback_receiver)
}
fn create_sync_notification_at_version(
version: Version,
) -> (
ConsensusSyncNotification,
oneshot::Receiver<ConsensusNotificationResponse>,
) {
let ledger_info = create_ledger_info_at_version(version);
let (sync_notification, callback_receiver) = ConsensusSyncNotification::new(ledger_info);
(sync_notification, callback_receiver)
}
fn create_chunk_requests(
known_version: Version,
current_epoch: u64,
chunk_limit: u64,
target_version: u64,
timeout_ms: u64,
) -> Vec<StateSyncMessage> {
let target = TargetType::Waypoint(target_version);
let waypoint_request =
create_chunk_request_message(known_version, current_epoch, chunk_limit, target);
let target_li = Some(create_ledger_info_at_version(target_version));
let target = TargetType::HighestAvailable {
target_li,
timeout_ms,
};
let highest_request =
create_chunk_request_message(known_version, current_epoch, chunk_limit, target);
let target_ledger_info = create_ledger_info_at_version(target_version);
let target = TargetType::TargetLedgerInfo(target_ledger_info);
let target_request =
create_chunk_request_message(known_version, current_epoch, chunk_limit, target);
vec![waypoint_request, target_request, highest_request]
}
fn create_chunk_request_message(
known_version: Version,
current_epoch: u64,
chunk_limit: u64,
target: TargetType,
) -> StateSyncMessage {
let chunk_request = GetChunkRequest::new(known_version, current_epoch, chunk_limit, target);
StateSyncMessage::GetChunkRequest(Box::new(chunk_request))
}
fn create_dummy_transaction_list_with_proof(version: Version) -> TransactionListWithProof {
TransactionListWithProof::new(
vec![create_test_transaction()],
None,
Some(version),
TransactionInfoListWithProof::new_empty(),
)
}
fn create_chunk_response_message(
response_ledger_info: ResponseLedgerInfo,
transaction_list_with_proof: TransactionListWithProof,
) -> StateSyncMessage {
let chunk_response =
GetChunkResponse::new(response_ledger_info, transaction_list_with_proof);
StateSyncMessage::GetChunkResponse(Box::new(chunk_response))
}
fn create_empty_chunk_responses(version: Version) -> Vec<StateSyncMessage> {
create_chunk_responses(version, TransactionListWithProof::new_empty())
}
fn create_non_empty_chunk_responses(version: Version) -> Vec<StateSyncMessage> {
let transaction_list_with_proof = create_dummy_transaction_list_with_proof(version);
create_chunk_responses(version, transaction_list_with_proof)
}
fn create_chunk_responses(
version: Version,
transaction_list_with_proof: TransactionListWithProof,
) -> Vec<StateSyncMessage> {
let ledger_info_at_version = create_ledger_info_at_version(version);
let response_ledger_info = ResponseLedgerInfo::LedgerInfoForWaypoint {
waypoint_li: ledger_info_at_version.clone(),
end_of_epoch_li: None,
};
let waypoint_response = create_chunk_response_message(
response_ledger_info,
transaction_list_with_proof.clone(),
);
let response_ledger_info = ResponseLedgerInfo::ProgressiveLedgerInfo {
target_li: ledger_info_at_version.clone(),
highest_li: None,
};
let highest_response = create_chunk_response_message(
response_ledger_info,
transaction_list_with_proof.clone(),
);
let response_ledger_info = ResponseLedgerInfo::VerifiableLedgerInfo(ledger_info_at_version);
let target_response =
create_chunk_response_message(response_ledger_info, transaction_list_with_proof);
vec![waypoint_response, target_response, highest_response]
}
fn verify_all_chunk_requests_are_invalid<C: ChunkExecutorTrait>(
coordinator: &mut StateSyncCoordinator<ExecutorProxy<C>, MempoolNotifier>,
peer_network_id: &PeerNetworkId,
requests: &[StateSyncMessage],
) {
for request in requests {
let result = block_on(coordinator.process_chunk_message(
peer_network_id.network_id(),
peer_network_id.peer_id(),
request.clone(),
));
assert_matches!(result, Err(Error::InvalidChunkRequest(_)));
}
}
fn verify_all_chunk_responses_are_invalid<C: ChunkExecutorTrait>(
coordinator: &mut StateSyncCoordinator<ExecutorProxy<C>, MempoolNotifier>,
peer_network_id: &PeerNetworkId,
responses: &[StateSyncMessage],
) {
for response in responses {
let result = block_on(coordinator.process_chunk_message(
peer_network_id.network_id(),
peer_network_id.peer_id(),
response.clone(),
));
assert_matches!(result, Err(Error::ProcessInvalidChunk(_)));
}
}
fn verify_all_chunk_responses_are_the_wrong_type<C: ChunkExecutorTrait>(
coordinator: &mut StateSyncCoordinator<ExecutorProxy<C>, MempoolNotifier>,
peer_network_id: &PeerNetworkId,
responses: &[StateSyncMessage],
) {
for response in responses {
let result = block_on(coordinator.process_chunk_message(
peer_network_id.network_id(),
peer_network_id.peer_id(),
response.clone(),
));
assert_matches!(result, Err(Error::ReceivedWrongChunkType(_)));
}
}
fn process_new_peer_event<C: ChunkExecutorTrait>(
coordinator: &mut StateSyncCoordinator<ExecutorProxy<C>, MempoolNotifier>,
peer: &PeerNetworkId,
) {
let connection_metadata = ConnectionMetadata::mock_with_role_and_origin(
peer.peer_id(),
PeerRole::Validator,
ConnectionOrigin::Outbound,
);
let _ = coordinator.process_new_peer(peer.network_id(), connection_metadata);
}
}