use crate::{
error::Error,
logging::{LogEntry, LogSchema},
metrics,
notification_handlers::{
CommitNotification, CommittedTransactions, MempoolNotificationHandler,
},
};
use aptos_infallible::Mutex;
use aptos_logger::prelude::*;
use aptos_types::{
epoch_change::Verifier, epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures,
transaction::Version,
};
use data_streaming_service::{
data_notification::{DataNotification, DataPayload, NotificationId},
data_stream::DataStreamListener,
streaming_client::{DataStreamingClient, NotificationFeedback},
};
use event_notifications::EventSubscriptionService;
use futures::StreamExt;
use mempool_notifications::MempoolNotificationSender;
use std::{sync::Arc, time::Duration};
use storage_interface::DbReader;
use tokio::time::timeout;
const MAX_NUM_DATA_STREAM_TIMEOUTS: u64 = 3;
pub const PENDING_DATA_LOG_FREQ_SECS: u64 = 3;
pub struct SpeculativeStreamState {
epoch_state: EpochState,
proof_ledger_info: Option<LedgerInfoWithSignatures>,
synced_version: Version,
}
impl SpeculativeStreamState {
pub fn new(
epoch_state: EpochState,
proof_ledger_info: Option<LedgerInfoWithSignatures>,
synced_version: Version,
) -> Self {
Self {
epoch_state,
proof_ledger_info,
synced_version,
}
}
pub fn expected_next_version(&self) -> Result<Version, Error> {
self.synced_version.checked_add(1).ok_or_else(|| {
Error::IntegerOverflow("The expected next version has overflown!".into())
})
}
pub fn get_proof_ledger_info(&self) -> LedgerInfoWithSignatures {
self.proof_ledger_info
.as_ref()
.expect("Proof ledger info is missing!")
.clone()
}
pub fn update_synced_version(&mut self, synced_version: Version) {
self.synced_version = synced_version;
}
pub fn maybe_update_epoch_state(
&mut self,
ledger_info_with_signatures: LedgerInfoWithSignatures,
) {
if let Some(epoch_state) = ledger_info_with_signatures.ledger_info().next_epoch_state() {
if ledger_info_with_signatures.ledger_info().version() == self.synced_version {
self.epoch_state = epoch_state.clone();
}
}
}
pub fn verify_ledger_info_with_signatures(
&mut self,
ledger_info_with_signatures: &LedgerInfoWithSignatures,
) -> Result<(), Error> {
self.epoch_state
.verify(ledger_info_with_signatures)
.map_err(|error| {
Error::VerificationError(format!("Ledger info failed verification: {:?}", error))
})
}
}
pub async fn get_data_notification(
max_stream_wait_time_ms: u64,
active_data_stream: Option<&mut DataStreamListener>,
) -> Result<DataNotification, Error> {
let active_data_stream = active_data_stream.expect("The active data stream should exist!");
let timeout_ms = Duration::from_millis(max_stream_wait_time_ms);
if let Ok(data_notification) = timeout(timeout_ms, active_data_stream.select_next_some()).await
{
active_data_stream.num_consecutive_timeouts = 0;
Ok(data_notification)
} else {
active_data_stream.num_consecutive_timeouts += 1;
if active_data_stream.num_consecutive_timeouts >= MAX_NUM_DATA_STREAM_TIMEOUTS {
Err(Error::CriticalDataStreamTimeout(format!(
"{:?}",
MAX_NUM_DATA_STREAM_TIMEOUTS
)))
} else {
Err(Error::DataStreamNotificationTimeout(format!(
"{:?}",
timeout_ms
)))
}
}
}
pub async fn terminate_stream_with_feedback<StreamingClient: DataStreamingClient + Clone>(
streaming_client: &mut StreamingClient,
notification_id: NotificationId,
notification_feedback: NotificationFeedback,
) -> Result<(), Error> {
info!(LogSchema::new(LogEntry::Driver).message(&format!(
"Terminating the current stream! Feedback: {:?}, notification ID: {:?}",
notification_feedback, notification_id
)));
streaming_client
.terminate_stream_with_feedback(notification_id, notification_feedback)
.await
.map_err(|error| error.into())
}
pub async fn handle_end_of_stream_or_invalid_payload<
StreamingClient: DataStreamingClient + Clone,
>(
streaming_client: &mut StreamingClient,
data_notification: DataNotification,
) -> Result<(), Error> {
let notification_feedback = match data_notification.data_payload {
DataPayload::EndOfStream => NotificationFeedback::EndOfStream,
_ => NotificationFeedback::PayloadTypeIsIncorrect,
};
terminate_stream_with_feedback(
streaming_client,
data_notification.notification_id,
notification_feedback,
)
.await?;
match data_notification.data_payload {
DataPayload::EndOfStream => Ok(()),
_ => Err(Error::InvalidPayload("Unexpected payload type!".into())),
}
}
pub fn fetch_latest_epoch_state(storage: Arc<dyn DbReader>) -> Result<EpochState, Error> {
storage.get_latest_epoch_state().map_err(|error| {
Error::StorageError(format!(
"Failed to get the latest epoch state from storage: {:?}",
error
))
})
}
pub fn fetch_latest_synced_ledger_info(
storage: Arc<dyn DbReader>,
) -> Result<LedgerInfoWithSignatures, Error> {
storage.get_latest_ledger_info().map_err(|error| {
Error::StorageError(format!(
"Failed to get the latest ledger info from storage: {:?}",
error
))
})
}
pub fn fetch_latest_synced_version(storage: Arc<dyn DbReader>) -> Result<Version, Error> {
let latest_transaction_info =
storage
.get_latest_transaction_info_option()
.map_err(|error| {
Error::StorageError(format!(
"Failed to get the latest transaction info from storage: {:?}",
error
))
})?;
latest_transaction_info
.ok_or_else(|| Error::StorageError("Latest transaction info is missing!".into()))
.map(|(latest_synced_version, _)| latest_synced_version)
}
pub fn initialize_sync_gauges(storage: Arc<dyn DbReader>) -> Result<(), Error> {
let highest_synced_version = fetch_latest_synced_version(storage.clone())?;
let metrics = [
metrics::StorageSynchronizerOperations::AppliedTransactionOutputs,
metrics::StorageSynchronizerOperations::ExecutedTransactions,
metrics::StorageSynchronizerOperations::Synced,
];
for metric in metrics {
metrics::set_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metric.get_label(),
highest_synced_version,
);
}
let highest_synced_epoch = fetch_latest_epoch_state(storage)?.epoch;
metrics::set_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metrics::StorageSynchronizerOperations::SyncedEpoch.get_label(),
highest_synced_epoch,
);
Ok(())
}
pub async fn handle_committed_transactions<M: MempoolNotificationSender>(
committed_transactions: CommittedTransactions,
storage: Arc<dyn DbReader>,
mempool_notification_handler: MempoolNotificationHandler<M>,
event_subscription_service: Arc<Mutex<EventSubscriptionService>>,
) {
let (latest_synced_version, latest_synced_ledger_info) =
match fetch_latest_synced_version(storage.clone()) {
Ok(latest_synced_version) => match fetch_latest_synced_ledger_info(storage.clone()) {
Ok(latest_synced_ledger_info) => (latest_synced_version, latest_synced_ledger_info),
Err(error) => {
error!(LogSchema::new(LogEntry::SynchronizerNotification)
.error(&error)
.message("Failed to fetch latest synced ledger info!"));
return;
}
},
Err(error) => {
error!(LogSchema::new(LogEntry::SynchronizerNotification)
.error(&error)
.message("Failed to fetch latest synced version!"));
return;
}
};
if let Err(error) = CommitNotification::handle_transaction_notification(
committed_transactions.events,
committed_transactions.transactions,
latest_synced_version,
latest_synced_ledger_info,
mempool_notification_handler,
event_subscription_service,
)
.await
{
error!(LogSchema::new(LogEntry::SynchronizerNotification)
.error(&error)
.message("Failed to handle a transaction commit notification!"));
}
}
pub fn update_new_epoch_metrics(storage: Arc<dyn DbReader>) {
metrics::increment_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metrics::StorageSynchronizerOperations::SyncedEpoch.get_label(),
1,
);
match fetch_latest_epoch_state(storage) {
Ok(latest_epoch_state) => {
let epoch = metrics::read_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metrics::StorageSynchronizerOperations::SyncedEpoch.get_label(),
);
let validator_verifier = latest_epoch_state.verifier;
for validator_address in validator_verifier.get_ordered_account_addresses_iter() {
let validator_weight = validator_verifier
.get_voting_power(&validator_address)
.unwrap_or(0);
metrics::set_epoch_state_gauge(
&epoch.to_string(),
&validator_address.to_string(),
&validator_weight.to_string(),
);
}
}
Err(error) => {
error!(LogSchema::new(LogEntry::Driver).message(&format!(
"Failed to get the latest epoch state from storage! Error: {:?}",
error
)));
}
}
}