use crate::{
bootstrapper::Bootstrapper,
continuous_syncer::ContinuousSyncer,
driver_client::{ClientNotificationListener, DriverNotification},
error::Error,
logging::{LogEntry, LogSchema},
metadata_storage::MetadataStorageInterface,
metrics,
metrics::ExecutingComponent,
notification_handlers::{
CommitNotification, CommitNotificationListener, CommittedTransactions,
ConsensusNotificationHandler, ErrorNotification, ErrorNotificationListener,
MempoolNotificationHandler,
},
storage_synchronizer::StorageSynchronizerInterface,
utils,
utils::PENDING_DATA_LOG_FREQ_SECS,
};
use aptos_config::config::{RoleType, StateSyncDriverConfig};
use aptos_data_client::AptosDataClient;
use aptos_infallible::Mutex;
use aptos_logger::prelude::*;
use aptos_types::waypoint::Waypoint;
use consensus_notifications::{
ConsensusCommitNotification, ConsensusNotification, ConsensusSyncNotification,
};
use data_streaming_service::streaming_client::{DataStreamingClient, NotificationFeedback};
use event_notifications::EventSubscriptionService;
use futures::StreamExt;
use mempool_notifications::MempoolNotificationSender;
use std::{sync::Arc, time::SystemTime};
use storage_interface::DbReader;
use tokio::task::yield_now;
use tokio::time::{interval, Duration};
use tokio_stream::wrappers::IntervalStream;
const DRIVER_ERROR_LOG_FREQ_SECS: u64 = 3;
#[derive(Clone)]
pub struct DriverConfiguration {
pub config: StateSyncDriverConfig,
pub role: RoleType,
pub waypoint: Waypoint,
}
impl DriverConfiguration {
pub fn new(config: StateSyncDriverConfig, role: RoleType, waypoint: Waypoint) -> Self {
Self {
config,
role,
waypoint,
}
}
}
pub struct StateSyncDriver<
DataClient,
MempoolNotifier,
MetadataStorage,
StorageSyncer,
StreamingClient,
> {
bootstrapper: Bootstrapper<MetadataStorage, StorageSyncer, StreamingClient>,
client_notification_listener: ClientNotificationListener,
commit_notification_listener: CommitNotificationListener,
consensus_notification_handler: ConsensusNotificationHandler,
continuous_syncer: ContinuousSyncer<StorageSyncer, StreamingClient>,
aptos_data_client: DataClient,
driver_configuration: DriverConfiguration,
error_notification_listener: ErrorNotificationListener,
event_subscription_service: Arc<Mutex<EventSubscriptionService>>,
mempool_notification_handler: MempoolNotificationHandler<MempoolNotifier>,
start_time: Option<SystemTime>,
storage: Arc<dyn DbReader>,
storage_synchronizer: StorageSyncer,
}
impl<
DataClient: AptosDataClient + Send + Clone + 'static,
MempoolNotifier: MempoolNotificationSender,
MetadataStorage: MetadataStorageInterface + Clone,
StorageSyncer: StorageSynchronizerInterface + Clone,
StreamingClient: DataStreamingClient + Clone,
>
StateSyncDriver<DataClient, MempoolNotifier, MetadataStorage, StorageSyncer, StreamingClient>
{
pub fn new(
client_notification_listener: ClientNotificationListener,
commit_notification_listener: CommitNotificationListener,
consensus_notification_handler: ConsensusNotificationHandler,
driver_configuration: DriverConfiguration,
error_notification_listener: ErrorNotificationListener,
event_subscription_service: Arc<Mutex<EventSubscriptionService>>,
mempool_notification_handler: MempoolNotificationHandler<MempoolNotifier>,
metadata_storage: MetadataStorage,
storage_synchronizer: StorageSyncer,
aptos_data_client: DataClient,
streaming_client: StreamingClient,
storage: Arc<dyn DbReader>,
) -> Self {
let bootstrapper = Bootstrapper::new(
driver_configuration.clone(),
metadata_storage,
streaming_client.clone(),
storage.clone(),
storage_synchronizer.clone(),
);
let continuous_syncer = ContinuousSyncer::new(
driver_configuration.clone(),
streaming_client,
storage.clone(),
storage_synchronizer.clone(),
);
Self {
bootstrapper,
client_notification_listener,
commit_notification_listener,
consensus_notification_handler,
continuous_syncer,
aptos_data_client,
driver_configuration,
error_notification_listener,
event_subscription_service,
mempool_notification_handler,
start_time: None,
storage,
storage_synchronizer,
}
}
pub async fn start_driver(mut self) {
let mut progress_check_interval = IntervalStream::new(interval(Duration::from_millis(
self.driver_configuration.config.progress_check_interval_ms,
)))
.fuse();
info!(LogSchema::new(LogEntry::Driver).message("Started the state sync v2 driver!"));
self.start_time = Some(SystemTime::now());
loop {
::futures::select! {
notification = self.client_notification_listener.select_next_some() => {
self.handle_client_notification(notification);
},
notification = self.commit_notification_listener.select_next_some() => {
self.handle_commit_notification(notification).await;
}
notification = self.consensus_notification_handler.select_next_some() => {
self.handle_consensus_notification(notification).await;
}
notification = self.error_notification_listener.select_next_some() => {
self.handle_error_notification(notification).await;
}
_ = progress_check_interval.select_next_some() => {
self.drive_progress().await;
}
}
}
}
async fn handle_consensus_notification(&mut self, notification: ConsensusNotification) {
let result = if self.driver_configuration.role == RoleType::FullNode {
Err(Error::FullNodeConsensusNotification(format!(
"Received consensus notification: {:?}",
notification
)))
} else if !self.bootstrapper.is_bootstrapped() {
Err(Error::BootstrapNotComplete(format!(
"Received consensus notification: {:?}",
notification
)))
} else {
Ok(())
};
if let Err(error) = result {
match notification {
ConsensusNotification::NotifyCommit(commit_notification) => {
let _ = self
.consensus_notification_handler
.respond_to_commit_notification(commit_notification, Err(error.clone()))
.await;
}
ConsensusNotification::SyncToTarget(sync_notification) => {
let _ = self
.consensus_notification_handler
.respond_to_sync_notification(sync_notification, Err(error.clone()))
.await;
}
}
error!(LogSchema::new(LogEntry::ConsensusNotification)
.error(&error)
.message("Error encountered when handling the consensus notification!"));
return;
}
let result = match notification {
ConsensusNotification::NotifyCommit(commit_notification) => {
self.handle_consensus_commit_notification(commit_notification)
.await
}
ConsensusNotification::SyncToTarget(sync_notification) => {
self.handle_consensus_sync_notification(sync_notification)
.await
}
};
if let Err(error) = result {
error!(LogSchema::new(LogEntry::ConsensusNotification)
.error(&error)
.message("Error encountered when handling the consensus notification!"));
}
}
async fn handle_consensus_commit_notification(
&mut self,
consensus_commit_notification: ConsensusCommitNotification,
) -> Result<(), Error> {
info!(
LogSchema::new(LogEntry::ConsensusNotification).message(&format!(
"Received a consensus commit notification! Total transactions: {:?}, events: {:?}",
consensus_commit_notification.transactions.len(),
consensus_commit_notification.reconfiguration_events.len()
))
);
self.update_consensus_commit_metrics(&consensus_commit_notification);
let committed_transactions = CommittedTransactions {
events: consensus_commit_notification.reconfiguration_events.clone(),
transactions: consensus_commit_notification.transactions.clone(),
};
utils::handle_committed_transactions(
committed_transactions,
self.storage.clone(),
self.mempool_notification_handler.clone(),
self.event_subscription_service.clone(),
)
.await;
self.consensus_notification_handler
.respond_to_commit_notification(consensus_commit_notification, Ok(()))
.await?;
self.check_sync_request_progress().await
}
fn update_consensus_commit_metrics(
&self,
consensus_commit_notification: &ConsensusCommitNotification,
) {
metrics::increment_counter(
&metrics::DRIVER_COUNTERS,
metrics::DRIVER_CONSENSUS_COMMIT_NOTIFICATION,
);
let operations = [
metrics::StorageSynchronizerOperations::ExecutedTransactions,
metrics::StorageSynchronizerOperations::Synced,
];
for operation in operations {
metrics::increment_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
operation.get_label(),
consensus_commit_notification.transactions.len() as u64,
);
}
if !consensus_commit_notification
.reconfiguration_events
.is_empty()
{
utils::update_new_epoch_metrics(self.storage.clone());
}
}
async fn handle_consensus_sync_notification(
&mut self,
sync_notification: ConsensusSyncNotification,
) -> Result<(), Error> {
let latest_synced_version = utils::fetch_latest_synced_version(self.storage.clone())?;
info!(
LogSchema::new(LogEntry::ConsensusNotification).message(&format!(
"Received a consensus sync notification! Target version: {:?}. Latest synced version: {:?}",
sync_notification.target, latest_synced_version,
))
);
metrics::increment_counter(
&metrics::DRIVER_COUNTERS,
metrics::DRIVER_CONSENSUS_SYNC_NOTIFICATION,
);
let latest_synced_ledger_info =
utils::fetch_latest_synced_ledger_info(self.storage.clone())?;
self.consensus_notification_handler
.initialize_sync_request(sync_notification, latest_synced_ledger_info)
.await
}
fn handle_client_notification(&mut self, notification: DriverNotification) {
debug!(LogSchema::new(LogEntry::ClientNotification)
.message("Received a notify bootstrap notification from the client!"));
metrics::increment_counter(
&metrics::DRIVER_COUNTERS,
metrics::DRIVER_CLIENT_NOTIFICATION,
);
let DriverNotification::NotifyOnceBootstrapped(notifier_channel) = notification;
if let Err(error) = self
.bootstrapper
.subscribe_to_bootstrap_notifications(notifier_channel)
{
error!(LogSchema::new(LogEntry::ClientNotification)
.error(&error)
.message("Failed to subscribe to bootstrap notifications!"));
}
}
async fn handle_commit_notification(&mut self, commit_notification: CommitNotification) {
let CommitNotification::CommittedStateSnapshot(committed_snapshot) = commit_notification;
info!(
LogSchema::new(LogEntry::SynchronizerNotification).message(&format!(
"Received a state snapshot commit notification from the storage synchronizer. \
Snapshot version: {:?}. Last committed index: {:?}.",
committed_snapshot.version, committed_snapshot.last_committed_state_index,
))
);
utils::handle_committed_transactions(
committed_snapshot.committed_transaction,
self.storage.clone(),
self.mempool_notification_handler.clone(),
self.event_subscription_service.clone(),
)
.await;
}
async fn handle_error_notification(&mut self, error_notification: ErrorNotification) {
error!(LogSchema::new(LogEntry::SynchronizerNotification)
.error_notification(error_notification.clone())
.message("Received an error notification from the storage synchronizer!"));
let notification_id = error_notification.notification_id;
let notification_feedback = NotificationFeedback::InvalidPayloadData;
if self.bootstrapper.is_bootstrapped() {
if let Err(error) = self
.continuous_syncer
.terminate_active_stream(notification_id, notification_feedback)
.await
{
panic!(
"Failed to terminate the active stream for the continuous syncer! Error: {:?}",
error
);
}
} else if let Err(error) = self
.bootstrapper
.terminate_active_stream(notification_id, notification_feedback)
.await
{
panic!(
"Failed to terminate the active stream for the bootstrapper! Error: {:?}",
error
);
};
}
async fn check_sync_request_progress(&mut self) -> Result<(), Error> {
if !self.active_sync_request() {
return Ok(());
}
while self.storage_synchronizer.pending_storage_data() {
sample!(
SampleRate::Duration(Duration::from_secs(PENDING_DATA_LOG_FREQ_SECS)),
info!("Waiting for the storage synchronizer to handle pending data!")
);
yield_now().await;
}
let latest_synced_ledger_info =
utils::fetch_latest_synced_ledger_info(self.storage.clone())?;
self.consensus_notification_handler
.check_sync_request_progress(latest_synced_ledger_info)
.await?;
if !self.active_sync_request() {
self.continuous_syncer.reset_active_stream();
self.storage_synchronizer.finish_chunk_executor(); }
Ok(())
}
fn active_sync_request(&self) -> bool {
self.consensus_notification_handler.active_sync_request()
}
fn is_validator(&self) -> bool {
self.driver_configuration.role == RoleType::Validator
}
fn check_if_consensus_executing(&self) -> bool {
self.is_validator() && self.bootstrapper.is_bootstrapped() && !self.active_sync_request()
}
fn check_auto_bootstrapping(&mut self) {
if !self.bootstrapper.is_bootstrapped()
&& self.is_validator()
&& self.driver_configuration.waypoint.version() == 0
{
if let Some(start_time) = self.start_time {
if let Some(connection_deadline) = start_time.checked_add(Duration::from_secs(
self.driver_configuration
.config
.max_connection_deadline_secs,
)) {
if SystemTime::now()
.duration_since(connection_deadline)
.is_ok()
{
info!(LogSchema::new(LogEntry::AutoBootstrapping).message(
"Passed the connection deadline! Auto-bootstrapping the validator!"
));
if let Err(error) = self.bootstrapper.bootstrapping_complete() {
error!(LogSchema::new(LogEntry::AutoBootstrapping)
.error(&error)
.message("Failed to mark bootstrapping as complete!"));
}
}
} else {
error!(LogSchema::new(LogEntry::AutoBootstrapping)
.message("The connection deadline overflowed! Unable to auto-bootstrap!"));
}
}
}
}
async fn drive_progress(&mut self) {
let global_data_summary = self.aptos_data_client.get_global_data_summary();
if global_data_summary.is_empty() {
trace!(LogSchema::new(LogEntry::Driver).message(
"The global data summary is empty! It's likely that we have no active peers."
));
return self.check_auto_bootstrapping();
}
if let Err(error) = self.check_sync_request_progress().await {
error!(LogSchema::new(LogEntry::Driver)
.error(&error)
.message("Error found when checking the sync request progress!"));
}
if self.check_if_consensus_executing() {
trace!(LogSchema::new(LogEntry::Driver)
.message("Consensus is executing. There's nothing to do."));
metrics::increment_counter(
&metrics::EXECUTING_COMPONENT,
ExecutingComponent::Consensus.get_label(),
);
return;
}
if self.bootstrapper.is_bootstrapped() {
let consensus_sync_request = self
.consensus_notification_handler
.get_consensus_sync_request();
metrics::increment_counter(
&metrics::EXECUTING_COMPONENT,
ExecutingComponent::ContinuousSyncer.get_label(),
);
if let Err(error) = self
.continuous_syncer
.drive_progress(consensus_sync_request)
.await
{
sample!(
SampleRate::Duration(Duration::from_secs(DRIVER_ERROR_LOG_FREQ_SECS)),
error!(LogSchema::new(LogEntry::Driver)
.error(&error)
.message("Error found when driving progress of the continuous syncer!"));
);
metrics::increment_counter(&metrics::CONTINUOUS_SYNCER_ERRORS, error.get_label());
}
} else {
metrics::increment_counter(
&metrics::EXECUTING_COMPONENT,
ExecutingComponent::Bootstrapper.get_label(),
);
if let Err(error) = self.bootstrapper.drive_progress(&global_data_summary).await {
sample!(
SampleRate::Duration(Duration::from_secs(DRIVER_ERROR_LOG_FREQ_SECS)),
error!(LogSchema::new(LogEntry::Driver)
.error(&error)
.message("Error found when checking the bootstrapper progress!"));
);
metrics::increment_counter(&metrics::BOOTSTRAPPER_ERRORS, error.get_label());
}
};
}
}