use crate::{
error::Error,
logging::{LogEntry, LogSchema},
metadata_storage::MetadataStorageInterface,
metrics,
notification_handlers::{
CommitNotification, CommittedTransactions, ErrorNotification, MempoolNotificationHandler,
},
utils,
};
use aptos_config::config::StateSyncDriverConfig;
use aptos_infallible::Mutex;
use aptos_logger::prelude::*;
use aptos_types::{
ledger_info::LedgerInfoWithSignatures,
state_store::state_value::StateValueChunkWithProof,
transaction::{
Transaction, TransactionListWithProof, TransactionOutput, TransactionOutputListWithProof,
},
};
use data_streaming_service::data_notification::NotificationId;
use event_notifications::EventSubscriptionService;
use executor_types::ChunkExecutorTrait;
use futures::{channel::mpsc, SinkExt, StreamExt};
use mempool_notifications::MempoolNotificationSender;
use std::{
future::Future,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use storage_interface::{DbReader, DbReaderWriter};
use tokio::{
runtime::{Handle, Runtime},
task::JoinHandle,
};
pub trait StorageSynchronizerInterface {
fn apply_transaction_outputs(
&mut self,
notification_id: NotificationId,
output_list_with_proof: TransactionOutputListWithProof,
target_ledger_info: LedgerInfoWithSignatures,
end_of_epoch_ledger_info: Option<LedgerInfoWithSignatures>,
) -> Result<(), Error>;
fn execute_transactions(
&mut self,
notification_id: NotificationId,
transaction_list_with_proof: TransactionListWithProof,
target_ledger_info: LedgerInfoWithSignatures,
end_of_epoch_ledger_info: Option<LedgerInfoWithSignatures>,
) -> Result<(), Error>;
fn initialize_state_synchronizer(
&mut self,
epoch_change_proofs: Vec<LedgerInfoWithSignatures>,
target_ledger_info: LedgerInfoWithSignatures,
target_output_with_proof: TransactionOutputListWithProof,
) -> Result<JoinHandle<()>, Error>;
fn pending_storage_data(&self) -> bool;
fn save_state_values(
&mut self,
notification_id: NotificationId,
state_value_chunk_with_proof: StateValueChunkWithProof,
) -> Result<(), Error>;
fn reset_chunk_executor(&self) -> Result<(), Error>;
fn finish_chunk_executor(&self);
}
pub struct StorageSynchronizer<ChunkExecutor, MetadataStorage> {
chunk_executor: Arc<ChunkExecutor>,
commit_notification_sender: mpsc::UnboundedSender<CommitNotification>,
driver_config: StateSyncDriverConfig,
error_notification_sender: mpsc::UnboundedSender<ErrorNotification>,
executor_notifier: mpsc::Sender<StorageDataChunk>,
metadata_storage: MetadataStorage,
pending_data_chunks: Arc<AtomicU64>,
runtime: Option<Handle>,
state_snapshot_notifier: Option<mpsc::Sender<StorageDataChunk>>,
storage: DbReaderWriter,
}
impl<
ChunkExecutor: ChunkExecutorTrait + 'static,
MetadataStorage: MetadataStorageInterface + Clone,
> Clone for StorageSynchronizer<ChunkExecutor, MetadataStorage>
{
fn clone(&self) -> Self {
Self {
chunk_executor: self.chunk_executor.clone(),
commit_notification_sender: self.commit_notification_sender.clone(),
driver_config: self.driver_config,
error_notification_sender: self.error_notification_sender.clone(),
executor_notifier: self.executor_notifier.clone(),
pending_data_chunks: self.pending_data_chunks.clone(),
metadata_storage: self.metadata_storage.clone(),
runtime: self.runtime.clone(),
state_snapshot_notifier: self.state_snapshot_notifier.clone(),
storage: self.storage.clone(),
}
}
}
impl<
ChunkExecutor: ChunkExecutorTrait + 'static,
MetadataStorage: MetadataStorageInterface + Clone,
> StorageSynchronizer<ChunkExecutor, MetadataStorage>
{
pub fn new<MempoolNotifier: MempoolNotificationSender>(
driver_config: StateSyncDriverConfig,
chunk_executor: Arc<ChunkExecutor>,
commit_notification_sender: mpsc::UnboundedSender<CommitNotification>,
error_notification_sender: mpsc::UnboundedSender<ErrorNotification>,
event_subscription_service: Arc<Mutex<EventSubscriptionService>>,
mempool_notification_handler: MempoolNotificationHandler<MempoolNotifier>,
metadata_storage: MetadataStorage,
storage: DbReaderWriter,
runtime: Option<&Runtime>,
) -> (Self, JoinHandle<()>, JoinHandle<()>) {
let max_pending_data_chunks = driver_config.max_pending_data_chunks as usize;
let (executor_notifier, executor_listener) = mpsc::channel(max_pending_data_chunks);
let (committer_notifier, committer_listener) = mpsc::channel(max_pending_data_chunks);
let pending_transaction_chunks = Arc::new(AtomicU64::new(0));
let runtime = runtime.map(|runtime| runtime.handle().clone());
let executor_handle = spawn_executor(
chunk_executor.clone(),
error_notification_sender.clone(),
executor_listener,
committer_notifier,
pending_transaction_chunks.clone(),
runtime.clone(),
);
let committer_handle = spawn_committer(
chunk_executor.clone(),
committer_listener,
error_notification_sender.clone(),
event_subscription_service,
mempool_notification_handler,
pending_transaction_chunks.clone(),
runtime.clone(),
storage.reader.clone(),
);
utils::initialize_sync_gauges(storage.reader.clone())
.expect("Failed to initialize the metric gauges!");
let storage_synchronizer = Self {
chunk_executor,
commit_notification_sender,
driver_config,
error_notification_sender,
executor_notifier,
pending_data_chunks: pending_transaction_chunks,
metadata_storage,
runtime,
state_snapshot_notifier: None,
storage,
};
(storage_synchronizer, executor_handle, committer_handle)
}
fn notify_executor(&mut self, storage_data_chunk: StorageDataChunk) -> Result<(), Error> {
if let Err(error) = self.executor_notifier.try_send(storage_data_chunk) {
Err(Error::UnexpectedError(format!(
"Failed to send storage data chunk to executor: {:?}",
error
)))
} else {
increment_pending_data_chunks(self.pending_data_chunks.clone());
Ok(())
}
}
}
impl<
ChunkExecutor: ChunkExecutorTrait + 'static,
MetadataStorage: MetadataStorageInterface + Clone + Send + Sync + 'static,
> StorageSynchronizerInterface for StorageSynchronizer<ChunkExecutor, MetadataStorage>
{
fn apply_transaction_outputs(
&mut self,
notification_id: NotificationId,
output_list_with_proof: TransactionOutputListWithProof,
target_ledger_info: LedgerInfoWithSignatures,
end_of_epoch_ledger_info: Option<LedgerInfoWithSignatures>,
) -> Result<(), Error> {
let storage_data_chunk = StorageDataChunk::TransactionOutputs(
notification_id,
output_list_with_proof,
target_ledger_info,
end_of_epoch_ledger_info,
);
self.notify_executor(storage_data_chunk)
}
fn execute_transactions(
&mut self,
notification_id: NotificationId,
transaction_list_with_proof: TransactionListWithProof,
target_ledger_info: LedgerInfoWithSignatures,
end_of_epoch_ledger_info: Option<LedgerInfoWithSignatures>,
) -> Result<(), Error> {
let storage_data_chunk = StorageDataChunk::Transactions(
notification_id,
transaction_list_with_proof,
target_ledger_info,
end_of_epoch_ledger_info,
);
self.notify_executor(storage_data_chunk)
}
fn initialize_state_synchronizer(
&mut self,
epoch_change_proofs: Vec<LedgerInfoWithSignatures>,
target_ledger_info: LedgerInfoWithSignatures,
target_output_with_proof: TransactionOutputListWithProof,
) -> Result<JoinHandle<()>, Error> {
let max_pending_data_chunks = self.driver_config.max_pending_data_chunks as usize;
let (state_snapshot_notifier, state_snapshot_listener) =
mpsc::channel(max_pending_data_chunks);
let receiver_handle = spawn_state_snapshot_receiver(
self.chunk_executor.clone(),
state_snapshot_listener,
self.commit_notification_sender.clone(),
self.error_notification_sender.clone(),
self.pending_data_chunks.clone(),
self.metadata_storage.clone(),
self.storage.clone(),
epoch_change_proofs,
target_ledger_info,
target_output_with_proof,
self.runtime.clone(),
);
self.state_snapshot_notifier = Some(state_snapshot_notifier);
Ok(receiver_handle)
}
fn pending_storage_data(&self) -> bool {
load_pending_data_chunks(self.pending_data_chunks.clone()) > 0
}
fn save_state_values(
&mut self,
notification_id: NotificationId,
state_value_chunk_with_proof: StateValueChunkWithProof,
) -> Result<(), Error> {
let state_snapshot_notifier = &mut self
.state_snapshot_notifier
.as_mut()
.expect("The state snapshot receiver has not been initialized!");
let storage_data_chunk =
StorageDataChunk::States(notification_id, state_value_chunk_with_proof);
if let Err(error) = state_snapshot_notifier.try_send(storage_data_chunk) {
Err(Error::UnexpectedError(format!(
"Failed to send storage data chunk to state snapshot listener: {:?}",
error
)))
} else {
increment_pending_data_chunks(self.pending_data_chunks.clone());
Ok(())
}
}
fn reset_chunk_executor(&self) -> Result<(), Error> {
self.chunk_executor.reset().map_err(|error| {
Error::UnexpectedError(format!(
"Failed to reset the chunk executor! Error: {:?}",
error
))
})
}
fn finish_chunk_executor(&self) {
self.chunk_executor.finish()
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum StorageDataChunk {
States(NotificationId, StateValueChunkWithProof),
Transactions(
NotificationId,
TransactionListWithProof,
LedgerInfoWithSignatures,
Option<LedgerInfoWithSignatures>,
),
TransactionOutputs(
NotificationId,
TransactionOutputListWithProof,
LedgerInfoWithSignatures,
Option<LedgerInfoWithSignatures>,
),
}
fn spawn_executor<ChunkExecutor: ChunkExecutorTrait + 'static>(
chunk_executor: Arc<ChunkExecutor>,
error_notification_sender: mpsc::UnboundedSender<ErrorNotification>,
mut executor_listener: mpsc::Receiver<StorageDataChunk>,
mut committer_notifier: mpsc::Sender<NotificationId>,
pending_transaction_chunks: Arc<AtomicU64>,
runtime: Option<Handle>,
) -> JoinHandle<()> {
let executor = async move {
while let Some(storage_data_chunk) = executor_listener.next().await {
let (notification_id, result) = match storage_data_chunk {
StorageDataChunk::Transactions(
notification_id,
transactions_with_proof,
target_ledger_info,
end_of_epoch_ledger_info,
) => {
let num_transactions = transactions_with_proof.transactions.len();
let result = chunk_executor.execute_chunk(
transactions_with_proof,
&target_ledger_info,
end_of_epoch_ledger_info.as_ref(),
);
if result.is_ok() {
info!(
LogSchema::new(LogEntry::StorageSynchronizer).message(&format!(
"Executed a new transaction chunk! Transaction total: {:?}.",
num_transactions
))
);
metrics::increment_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metrics::StorageSynchronizerOperations::ExecutedTransactions
.get_label(),
num_transactions as u64,
);
}
(notification_id, result)
}
StorageDataChunk::TransactionOutputs(
notification_id,
outputs_with_proof,
target_ledger_info,
end_of_epoch_ledger_info,
) => {
let num_outputs = outputs_with_proof.transactions_and_outputs.len();
let result = chunk_executor.apply_chunk(
outputs_with_proof,
&target_ledger_info,
end_of_epoch_ledger_info.as_ref(),
);
if result.is_ok() {
info!(
LogSchema::new(LogEntry::StorageSynchronizer).message(&format!(
"Applied a new transaction output chunk! Transaction total: {:?}.",
num_outputs
))
);
metrics::increment_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metrics::StorageSynchronizerOperations::AppliedTransactionOutputs
.get_label(),
num_outputs as u64,
);
}
(notification_id, result)
}
storage_data_chunk => {
panic!(
"Invalid storage data chunk sent to executor: {:?}",
storage_data_chunk
);
}
};
match result {
Ok(()) => {
if let Err(error) = committer_notifier.try_send(notification_id) {
let error = format!("Failed to notify the committer! Error: {:?}", error);
send_storage_synchronizer_error(
error_notification_sender.clone(),
notification_id,
error,
)
.await;
decrement_pending_data_chunks(pending_transaction_chunks.clone());
}
}
Err(error) => {
let error = format!(
"Failed to execute/apply the storage data chunk! Error: {:?}",
error
);
send_storage_synchronizer_error(
error_notification_sender.clone(),
notification_id,
error,
)
.await;
decrement_pending_data_chunks(pending_transaction_chunks.clone());
}
}
}
};
spawn(runtime, executor)
}
fn spawn_committer<
ChunkExecutor: ChunkExecutorTrait + 'static,
MempoolNotifier: MempoolNotificationSender,
>(
chunk_executor: Arc<ChunkExecutor>,
mut committer_listener: mpsc::Receiver<NotificationId>,
error_notification_sender: mpsc::UnboundedSender<ErrorNotification>,
event_subscription_service: Arc<Mutex<EventSubscriptionService>>,
mempool_notification_handler: MempoolNotificationHandler<MempoolNotifier>,
pending_transaction_chunks: Arc<AtomicU64>,
runtime: Option<Handle>,
storage: Arc<dyn DbReader>,
) -> JoinHandle<()> {
let committer = async move {
while let Some(notification_id) = committer_listener.next().await {
match chunk_executor.commit_chunk() {
Ok(notification) => {
info!(
LogSchema::new(LogEntry::StorageSynchronizer).message(&format!(
"Committed a new transaction chunk! \
Transaction total: {:?}, event total: {:?}",
notification.committed_transactions.len(),
notification.committed_events.len()
))
);
metrics::increment_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metrics::StorageSynchronizerOperations::Synced.get_label(),
notification.committed_transactions.len() as u64,
);
if notification.reconfiguration_occurred {
utils::update_new_epoch_metrics(storage.clone());
}
let committed_transactions = CommittedTransactions {
events: notification.committed_events,
transactions: notification.committed_transactions,
};
utils::handle_committed_transactions(
committed_transactions,
storage.clone(),
mempool_notification_handler.clone(),
event_subscription_service.clone(),
)
.await;
}
Err(error) => {
let error = format!("Failed to commit executed chunk! Error: {:?}", error);
send_storage_synchronizer_error(
error_notification_sender.clone(),
notification_id,
error,
)
.await;
}
};
decrement_pending_data_chunks(pending_transaction_chunks.clone());
}
};
spawn(runtime, committer)
}
fn spawn_state_snapshot_receiver<
ChunkExecutor: ChunkExecutorTrait + 'static,
MetadataStorage: MetadataStorageInterface + Clone + Send + Sync + 'static,
>(
chunk_executor: Arc<ChunkExecutor>,
mut state_snapshot_listener: mpsc::Receiver<StorageDataChunk>,
mut commit_notification_sender: mpsc::UnboundedSender<CommitNotification>,
error_notification_sender: mpsc::UnboundedSender<ErrorNotification>,
pending_transaction_chunks: Arc<AtomicU64>,
metadata_storage: MetadataStorage,
storage: DbReaderWriter,
epoch_change_proofs: Vec<LedgerInfoWithSignatures>,
target_ledger_info: LedgerInfoWithSignatures,
target_output_with_proof: TransactionOutputListWithProof,
runtime: Option<Handle>,
) -> JoinHandle<()> {
let receiver = async move {
let version = target_ledger_info.ledger_info().version();
let expected_root_hash = target_output_with_proof
.proof
.transaction_infos
.first()
.expect("Target transaction info should exist!")
.ensure_state_checkpoint_hash()
.expect("Must be at state checkpoint.");
let mut state_snapshot_receiver = storage
.writer
.get_state_snapshot_receiver(version, expected_root_hash)
.expect("Failed to initialize the state snapshot receiver!");
let target_ledger_info = &target_ledger_info;
while let Some(storage_data_chunk) = state_snapshot_listener.next().await {
match storage_data_chunk {
StorageDataChunk::States(notification_id, states_with_proof) => {
let all_states_synced = states_with_proof.is_last_chunk();
let last_committed_state_index = states_with_proof.last_index;
let num_state_values = states_with_proof.raw_values.len();
let commit_result = state_snapshot_receiver.add_chunk(
states_with_proof.raw_values,
states_with_proof.proof.clone(),
);
match commit_result {
Ok(()) => {
info!(
LogSchema::new(LogEntry::StorageSynchronizer).message(&format!(
"Committed a new state value chunk! Chunk size: {:?}, last persisted index: {:?}",
num_state_values,
last_committed_state_index
))
);
metrics::set_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metrics::StorageSynchronizerOperations::SyncedStates.get_label(),
last_committed_state_index as u64,
);
if !all_states_synced {
if let Err(error) = metadata_storage
.clone()
.update_last_persisted_state_value_index(
target_ledger_info,
last_committed_state_index,
all_states_synced,
)
{
let error = format!("Failed to update the last persisted state index at version: {:?}! Error: {:?}", version, error);
send_storage_synchronizer_error(
error_notification_sender.clone(),
notification_id,
error,
)
.await;
}
decrement_pending_data_chunks(pending_transaction_chunks.clone());
continue; }
let commit_notification = create_commit_notification(
&target_output_with_proof,
last_committed_state_index,
version,
);
let finalized_result = if let Err(error) =
state_snapshot_receiver.finish_box()
{
Err(format!(
"Failed to finish the state value synchronization! Error: {:?}",
error
))
} else if let Err(error) = storage
.writer
.finalize_state_snapshot(version, target_output_with_proof)
{
Err(format!(
"Failed to finalize the state snapshot! Error: {:?}",
error
))
} else if let Err(error) =
storage.writer.save_ledger_infos(&epoch_change_proofs)
{
Err(format!(
"Failed to save all epoch ending ledger infos! Error: {:?}",
error
))
} else if let Err(error) = storage.writer.delete_genesis() {
Err(format!(
"Failed to delete the genesis transaction! Error: {:?}",
error
))
} else if let Err(error) = metadata_storage
.clone()
.update_last_persisted_state_value_index(
target_ledger_info,
last_committed_state_index,
all_states_synced,
)
{
Err(format!("All states have synced, but failed to update the metadata storage at version {:?}! Error: {:?}", version, error))
} else if let Err(error) = chunk_executor.reset() {
Err(format!("Failed to reset the chunk executor after state snapshot synchronization! Error: {:?}", error))
} else if let Err(error) =
commit_notification_sender.send(commit_notification).await
{
Err(format!("Failed to send the final state commit notification! Error: {:?}", error))
} else if let Err(error) = utils::initialize_sync_gauges(storage.reader)
{
Err(format!("Failed to initialize the state sync version gauges! Error: {:?}", error))
} else {
Ok(())
};
if let Err(error) = finalized_result {
send_storage_synchronizer_error(
error_notification_sender.clone(),
notification_id,
error,
)
.await;
}
decrement_pending_data_chunks(pending_transaction_chunks.clone());
return; }
Err(error) => {
let error =
format!("Failed to commit state value chunk! Error: {:?}", error);
send_storage_synchronizer_error(
error_notification_sender.clone(),
notification_id,
error,
)
.await;
}
}
}
storage_data_chunk => {
panic!(
"Invalid storage data chunk sent to state snapshot receiver: {:?}",
storage_data_chunk
);
}
}
decrement_pending_data_chunks(pending_transaction_chunks.clone());
}
};
spawn(runtime, receiver)
}
fn create_commit_notification(
target_output_with_proof: &TransactionOutputListWithProof,
last_committed_state_index: u64,
version: u64,
) -> CommitNotification {
let (transactions, outputs): (Vec<Transaction>, Vec<TransactionOutput>) =
target_output_with_proof
.transactions_and_outputs
.clone()
.into_iter()
.unzip();
let events = outputs
.into_iter()
.flat_map(|output| output.events().to_vec())
.collect::<Vec<_>>();
CommitNotification::new_committed_state_snapshot(
events,
transactions,
last_committed_state_index,
version,
)
}
fn spawn(
runtime: Option<Handle>,
future: impl Future<Output = ()> + Send + 'static,
) -> JoinHandle<()> {
if let Some(runtime) = runtime {
runtime.spawn(future)
} else {
tokio::spawn(future)
}
}
fn load_pending_data_chunks(pending_data_chunks: Arc<AtomicU64>) -> u64 {
pending_data_chunks.load(Ordering::Relaxed)
}
fn increment_pending_data_chunks(pending_data_chunks: Arc<AtomicU64>) {
let delta = 1;
pending_data_chunks.fetch_add(delta, Ordering::Relaxed);
metrics::increment_gauge(
&metrics::STORAGE_SYNCHRONIZER_GAUGES,
metrics::STORAGE_SYNCHRONIZER_PENDING_DATA,
delta,
);
}
fn decrement_pending_data_chunks(atomic_u64: Arc<AtomicU64>) {
let delta = 1;
atomic_u64.fetch_sub(delta, Ordering::Relaxed);
metrics::decrement_gauge(
&metrics::STORAGE_SYNCHRONIZER_GAUGES,
metrics::STORAGE_SYNCHRONIZER_PENDING_DATA,
delta,
);
}
async fn send_storage_synchronizer_error(
mut error_notification_sender: mpsc::UnboundedSender<ErrorNotification>,
notification_id: NotificationId,
error_message: String,
) {
let error_message = format!("Storage synchronizer error: {:?}", error_message);
error!(LogSchema::new(LogEntry::StorageSynchronizer).message(&error_message));
let error = Error::UnexpectedError(error_message);
let error_notification = ErrorNotification {
error: error.clone(),
notification_id,
};
if let Err(error) = error_notification_sender.send(error_notification).await {
panic!("Failed to send error notification! Error: {:?}", error);
}
metrics::increment_counter(&metrics::STORAGE_SYNCHRONIZER_ERRORS, error.get_label());
}