use aptos_types::{
epoch_change::Verifier, epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures,
};
use storage_interface::ExecutedTrees;
use crate::error::Error;
#[derive(Clone, Debug)]
pub struct SyncState {
committed_ledger_info: LedgerInfoWithSignatures,
synced_trees: ExecutedTrees,
trusted_epoch_state: EpochState,
}
impl SyncState {
pub fn new(
committed_ledger_info: LedgerInfoWithSignatures,
synced_trees: ExecutedTrees,
current_epoch_state: EpochState,
) -> Self {
let trusted_epoch_state = committed_ledger_info
.ledger_info()
.next_epoch_state()
.cloned()
.unwrap_or(current_epoch_state);
SyncState {
committed_ledger_info,
synced_trees,
trusted_epoch_state,
}
}
pub fn committed_epoch(&self) -> u64 {
self.committed_ledger_info.ledger_info().epoch()
}
pub fn committed_ledger_info(&self) -> LedgerInfoWithSignatures {
self.committed_ledger_info.clone()
}
pub fn committed_version(&self) -> u64 {
self.committed_ledger_info.ledger_info().version()
}
pub fn synced_version(&self) -> u64 {
self.synced_trees.version().unwrap_or(0)
}
pub fn trusted_epoch(&self) -> u64 {
self.trusted_epoch_state.epoch
}
pub fn verify_ledger_info(&self, ledger_info: &LedgerInfoWithSignatures) -> Result<(), Error> {
self.trusted_epoch_state
.verify(ledger_info)
.map_err(|error| Error::UnexpectedError(error.to_string()))
}
}
#[cfg(any(feature = "fuzzing", test))]
pub(crate) mod test_utils {
use std::{collections::HashMap, sync::Arc};
use futures::channel::mpsc;
use aptos_config::{
config::{NodeConfig, RoleType},
network_id::NetworkId,
};
use aptos_infallible::RwLock;
use aptos_types::{
move_resource::MoveStorage,
on_chain_config::ON_CHAIN_CONFIG_REGISTRY,
transaction::{Transaction, WriteSetPayload},
waypoint::Waypoint,
};
use aptos_vm::AptosVM;
use aptosdb::AptosDB;
use channel::{aptos_channel, message_queues::QueueStyle};
use event_notifications::{EventNotificationSender, EventSubscriptionService};
use executor::chunk_executor::ChunkExecutor;
use executor_test_helpers::bootstrap_genesis;
use mempool_notifications::MempoolNotifier;
use network::{
peer_manager::{ConnectionRequestSender, PeerManagerRequestSender},
protocols::network::NewNetworkSender,
};
use storage_interface::{DbReader, DbReaderWriter};
use crate::{
coordinator::StateSyncCoordinator,
executor_proxy::{ExecutorProxy, ExecutorProxyTrait},
network::StateSyncSender,
};
#[cfg(test)]
pub(crate) fn create_coordinator_with_config_and_waypoint(
node_config: NodeConfig,
waypoint: Waypoint,
) -> StateSyncCoordinator<ExecutorProxy<ChunkExecutor<AptosVM>>, MempoolNotifier> {
create_state_sync_coordinator_for_tests(node_config, waypoint, false)
}
pub(crate) fn create_validator_coordinator(
) -> StateSyncCoordinator<ExecutorProxy<ChunkExecutor<AptosVM>>, MempoolNotifier> {
let mut node_config = NodeConfig::default();
node_config.base.role = RoleType::Validator;
create_state_sync_coordinator_for_tests(node_config, Waypoint::default(), false)
}
#[cfg(test)]
pub(crate) fn create_full_node_coordinator(
) -> StateSyncCoordinator<ExecutorProxy<ChunkExecutor<AptosVM>>, MempoolNotifier> {
let mut node_config = NodeConfig::default();
node_config.base.role = RoleType::FullNode;
create_state_sync_coordinator_for_tests(node_config, Waypoint::default(), false)
}
#[cfg(test)]
pub(crate) fn create_read_only_coordinator(
) -> StateSyncCoordinator<ExecutorProxy<ChunkExecutor<AptosVM>>, MempoolNotifier> {
let mut node_config = NodeConfig::default();
node_config.base.role = RoleType::Validator;
create_state_sync_coordinator_for_tests(node_config, Waypoint::default(), true)
}
fn create_state_sync_coordinator_for_tests(
node_config: NodeConfig,
waypoint: Waypoint,
read_only_mode: bool,
) -> StateSyncCoordinator<ExecutorProxy<ChunkExecutor<AptosVM>>, MempoolNotifier> {
let (genesis, _) = vm_genesis::test_genesis_change_set_and_validators(Some(1));
let db_path = aptos_temppath::TempPath::new();
db_path.create_as_dir().unwrap();
let (db, db_rw) = DbReaderWriter::wrap(AptosDB::new_for_test(db_path.path()));
let genesis_txn = Transaction::GenesisTransaction(WriteSetPayload::Direct(genesis));
bootstrap_genesis::<AptosVM>(&db_rw, &genesis_txn).unwrap();
let storage: Arc<dyn DbReader> = db.clone();
let synced_version = (&*storage).fetch_latest_state_checkpoint_version().unwrap();
let mut event_subscription_service = EventSubscriptionService::new(
ON_CHAIN_CONFIG_REGISTRY,
Arc::new(RwLock::new(db_rw.clone())),
);
event_subscription_service
.notify_initial_configs(synced_version)
.unwrap();
let chunk_executor = Arc::new(ChunkExecutor::<AptosVM>::new(db_rw).unwrap());
let executor_proxy = ExecutorProxy::new(db, chunk_executor, event_subscription_service);
let initial_state = executor_proxy.get_local_storage_state().unwrap();
let (network_reqs_tx, _network_reqs_rx) = aptos_channel::new(QueueStyle::FIFO, 8, None);
let (connection_reqs_tx, _) = aptos_channel::new(QueueStyle::FIFO, 8, None);
let network_sender = StateSyncSender::new(
PeerManagerRequestSender::new(network_reqs_tx),
ConnectionRequestSender::new(connection_reqs_tx),
);
let network_id = NetworkId::Validator;
let network_senders = vec![(network_id, network_sender)]
.into_iter()
.collect::<HashMap<_, _>>();
let (_coordinator_sender, coordinator_receiver) = mpsc::unbounded();
let (mempool_notifier, _) = mempool_notifications::new_mempool_notifier_listener_pair();
let (_, consensus_listener) =
consensus_notifications::new_consensus_notifier_listener_pair(1000);
StateSyncCoordinator::new(
coordinator_receiver,
mempool_notifier,
consensus_listener,
network_senders,
&node_config,
waypoint,
executor_proxy,
initial_state,
read_only_mode,
)
.unwrap()
}
}