use crate::bootstrapper::GENESIS_TRANSACTION_VERSION;
use crate::{
bootstrapper::Bootstrapper,
driver::DriverConfiguration,
error::Error,
tests::{
mocks::{
create_mock_db_reader, create_mock_streaming_client, create_ready_storage_synchronizer,
MockMetadataStorage, MockStorageSynchronizer, MockStreamingClient,
},
utils::{
create_data_stream_listener, create_empty_epoch_state, create_epoch_ending_ledger_info,
create_full_node_driver_configuration, create_global_summary,
create_output_list_with_proof, create_random_epoch_ending_ledger_info,
create_transaction_info, create_transaction_list_with_proof,
},
},
};
use aptos_config::config::BootstrappingMode;
use aptos_data_client::GlobalDataSummary;
use aptos_types::{
transaction::{TransactionOutputListWithProof, Version},
waypoint::Waypoint,
};
use claim::{assert_matches, assert_none, assert_ok};
use data_streaming_service::{
data_notification::{DataNotification, DataPayload},
streaming_client::NotificationFeedback,
};
use futures::{channel::oneshot, FutureExt};
use mockall::{predicate::eq, Sequence};
use std::sync::Arc;
#[tokio::test]
async fn test_bootstrap_genesis_waypoint() {
let driver_configuration = create_full_node_driver_configuration();
let mock_streaming_client = create_mock_streaming_client();
let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true);
assert!(!bootstrapper.is_bootstrapped());
let (bootstrap_notification_sender, bootstrap_notification_receiver) = oneshot::channel();
bootstrapper
.subscribe_to_bootstrap_notifications(bootstrap_notification_sender)
.unwrap();
let global_data_summary = create_global_summary(0);
drive_progress(&mut bootstrapper, &global_data_summary, true)
.await
.unwrap();
assert!(bootstrapper.is_bootstrapped());
verify_bootstrap_notification(bootstrap_notification_receiver);
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::AlreadyBootstrapped(_));
}
#[tokio::test]
async fn test_bootstrap_immediate_notification() {
let driver_configuration = create_full_node_driver_configuration();
let mock_streaming_client = create_mock_streaming_client();
let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true);
let global_data_summary = create_global_summary(0);
drive_progress(&mut bootstrapper, &global_data_summary, true)
.await
.unwrap();
assert!(bootstrapper.is_bootstrapped());
let (bootstrap_notification_sender, bootstrap_notification_receiver) = oneshot::channel();
bootstrapper
.subscribe_to_bootstrap_notifications(bootstrap_notification_sender)
.unwrap();
verify_bootstrap_notification(bootstrap_notification_receiver);
}
#[tokio::test]
async fn test_bootstrap_no_notification() {
let driver_configuration = create_full_node_driver_configuration();
let mut mock_streaming_client = create_mock_streaming_client();
let (_notification_sender, data_stream_listener) = create_data_stream_listener();
mock_streaming_client
.expect_get_all_epoch_ending_ledger_infos()
.with(eq(1))
.return_once(move |_| Ok(data_stream_listener));
let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true);
let global_data_summary = create_global_summary(1);
let (bootstrap_notification_sender, bootstrap_notification_receiver) = oneshot::channel();
bootstrapper
.subscribe_to_bootstrap_notifications(bootstrap_notification_sender)
.unwrap();
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
assert_none!(bootstrap_notification_receiver.now_or_never());
}
#[tokio::test]
async fn test_critical_timeout() {
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.config.max_stream_wait_time_ms = 1000;
let mut mock_streaming_client = create_mock_streaming_client();
let mut expectation_sequence = Sequence::new();
let (_notification_sender_1, data_stream_listener_1) = create_data_stream_listener();
let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener();
for data_stream_listener in [data_stream_listener_1, data_stream_listener_2] {
mock_streaming_client
.expect_get_all_epoch_ending_ledger_infos()
.times(1)
.with(eq(1))
.return_once(move |_| Ok(data_stream_listener))
.in_sequence(&mut expectation_sequence);
}
let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true);
let global_data_summary = create_global_summary(1);
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
for _ in 0..2 {
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::DataStreamNotificationTimeout(_));
}
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::CriticalDataStreamTimeout(_));
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::DataStreamNotificationTimeout(_));
}
#[tokio::test]
async fn test_data_stream_state_values() {
let notification_id = 50043;
let highest_version = 10000;
let highest_ledger_info = create_random_epoch_ending_ledger_info(highest_version, 1);
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.config.bootstrapping_mode = BootstrappingMode::DownloadLatestStates;
let mut mock_streaming_client = create_mock_streaming_client();
let mut expectation_sequence = Sequence::new();
let (notification_sender_1, data_stream_listener_1) = create_data_stream_listener();
let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener();
for data_stream_listener in [data_stream_listener_1, data_stream_listener_2] {
mock_streaming_client
.expect_get_all_transaction_outputs()
.times(1)
.with(
eq(highest_version),
eq(highest_version),
eq(highest_version),
)
.return_once(move |_, _, _| Ok(data_stream_listener))
.in_sequence(&mut expectation_sequence);
}
mock_streaming_client
.expect_terminate_stream_with_feedback()
.with(
eq(notification_id),
eq(NotificationFeedback::InvalidPayloadData),
)
.return_const(Ok(()));
let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true);
manipulate_verified_epoch_states(&mut bootstrapper, true, true, Some(highest_version));
let mut global_data_summary = create_global_summary(1);
global_data_summary.advertised_data.synced_ledger_infos = vec![highest_ledger_info.clone()];
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
let data_notification = DataNotification {
notification_id,
data_payload: DataPayload::TransactionOutputsWithProof(create_output_list_with_proof()),
};
notification_sender_1.push((), data_notification).unwrap();
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::VerificationError(_));
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
}
#[tokio::test]
async fn test_data_stream_transactions() {
let notification_id = 0;
let highest_version = 9998765;
let highest_ledger_info = create_random_epoch_ending_ledger_info(highest_version, 1);
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.config.bootstrapping_mode =
BootstrappingMode::ExecuteTransactionsFromGenesis;
let mut mock_streaming_client = create_mock_streaming_client();
let mut expectation_sequence = Sequence::new();
let (notification_sender_1, data_stream_listener_1) = create_data_stream_listener();
let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener();
for data_stream_listener in [data_stream_listener_1, data_stream_listener_2] {
mock_streaming_client
.expect_get_all_transactions()
.times(1)
.with(eq(1), eq(highest_version), eq(highest_version), eq(false))
.return_once(move |_, _, _, _| Ok(data_stream_listener))
.in_sequence(&mut expectation_sequence);
}
mock_streaming_client
.expect_terminate_stream_with_feedback()
.with(
eq(notification_id),
eq(NotificationFeedback::InvalidPayloadData),
)
.return_const(Ok(()));
let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true);
manipulate_verified_epoch_states(&mut bootstrapper, true, true, Some(highest_version));
let mut global_data_summary = create_global_summary(1);
global_data_summary.advertised_data.synced_ledger_infos = vec![highest_ledger_info.clone()];
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
let data_notification = DataNotification {
notification_id,
data_payload: DataPayload::TransactionsWithProof(create_transaction_list_with_proof()),
};
notification_sender_1.push((), data_notification).unwrap();
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::VerificationError(_));
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
}
#[tokio::test]
async fn test_data_stream_transaction_outputs() {
let notification_id = 1235;
let highest_version = 45;
let highest_ledger_info = create_random_epoch_ending_ledger_info(highest_version, 1);
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.config.bootstrapping_mode =
BootstrappingMode::ApplyTransactionOutputsFromGenesis;
let mut mock_streaming_client = create_mock_streaming_client();
let mut expectation_sequence = Sequence::new();
let (notification_sender_1, data_stream_listener_1) = create_data_stream_listener();
let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener();
for data_stream_listener in [data_stream_listener_1, data_stream_listener_2] {
mock_streaming_client
.expect_get_all_transaction_outputs()
.times(1)
.with(eq(1), eq(highest_version), eq(highest_version))
.return_once(move |_, _, _| Ok(data_stream_listener))
.in_sequence(&mut expectation_sequence);
}
mock_streaming_client
.expect_terminate_stream_with_feedback()
.with(
eq(notification_id),
eq(NotificationFeedback::EmptyPayloadData),
)
.return_const(Ok(()));
let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true);
manipulate_verified_epoch_states(&mut bootstrapper, true, true, Some(highest_version));
let mut global_data_summary = create_global_summary(1);
global_data_summary.advertised_data.synced_ledger_infos = vec![highest_ledger_info.clone()];
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
let data_notification = DataNotification {
notification_id,
data_payload: DataPayload::TransactionOutputsWithProof(
TransactionOutputListWithProof::new_empty(),
),
};
notification_sender_1.push((), data_notification).unwrap();
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::VerificationError(_));
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
}
#[tokio::test]
async fn test_fetch_epoch_ending_ledger_infos() {
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.config.max_stream_wait_time_ms = 1000;
let mut mock_streaming_client = create_mock_streaming_client();
let (_notification_sender, data_stream_listener) = create_data_stream_listener();
mock_streaming_client
.expect_get_all_epoch_ending_ledger_infos()
.with(eq(1))
.return_once(move |_| Ok(data_stream_listener));
let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true);
manipulate_verified_epoch_states(&mut bootstrapper, false, true, None);
let global_data_summary = create_global_summary(1);
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::DataStreamNotificationTimeout(_));
}
#[tokio::test]
async fn test_snapshot_sync_epoch_change() {
let synced_version = GENESIS_TRANSACTION_VERSION; let target_version = 1000;
let highest_version = 5000;
let last_persisted_index = 1030405;
let target_ledger_info = create_random_epoch_ending_ledger_info(target_version, 1);
let highest_ledger_info = create_random_epoch_ending_ledger_info(highest_version, 2);
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.config.bootstrapping_mode = BootstrappingMode::DownloadLatestStates;
let mut mock_streaming_client = create_mock_streaming_client();
let (_notification_sender_1, data_stream_listener_1) = create_data_stream_listener();
mock_streaming_client
.expect_get_all_state_values()
.times(1)
.with(eq(target_version), eq(Some(last_persisted_index)))
.return_once(move |_, _| Ok(data_stream_listener_1));
let mut metadata_storage = MockMetadataStorage::new();
let target_ledger_info_clone = target_ledger_info.clone();
let last_persisted_index_clone = last_persisted_index;
metadata_storage
.expect_previous_snapshot_sync_target()
.returning(move || Ok(Some(target_ledger_info_clone.clone())));
metadata_storage
.expect_is_snapshot_sync_complete()
.returning(|_| Ok(false));
metadata_storage
.expect_get_last_persisted_state_value_index()
.returning(move |_| Ok(last_persisted_index_clone));
let mut bootstrapper = create_bootstrapper_with_storage(
driver_configuration,
mock_streaming_client,
metadata_storage,
synced_version,
true,
);
manipulate_verified_epoch_states(&mut bootstrapper, true, true, Some(highest_version));
bootstrapper
.get_state_value_syncer()
.set_transaction_output_to_sync(create_output_list_with_proof());
let mut global_data_summary = create_global_summary(1);
global_data_summary.advertised_data.synced_ledger_infos = vec![highest_ledger_info.clone()];
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
}
#[tokio::test]
async fn test_snapshot_sync_existing_state() {
let synced_version = GENESIS_TRANSACTION_VERSION; let highest_version = 1000000;
let highest_ledger_info = create_random_epoch_ending_ledger_info(highest_version, 1);
let last_persisted_index = 4567;
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.config.bootstrapping_mode = BootstrappingMode::DownloadLatestStates;
let mut mock_streaming_client = create_mock_streaming_client();
let mut expectation_sequence = Sequence::new();
let (notification_sender_1, data_stream_listener_1) = create_data_stream_listener();
let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener();
mock_streaming_client
.expect_get_all_state_values()
.times(1)
.with(eq(highest_version), eq(Some(last_persisted_index)))
.return_once(move |_, _| Ok(data_stream_listener_1))
.in_sequence(&mut expectation_sequence);
let notification_id = 100;
mock_streaming_client
.expect_terminate_stream_with_feedback()
.times(1)
.with(
eq(notification_id),
eq(NotificationFeedback::InvalidPayloadData),
)
.return_const(Ok(()))
.in_sequence(&mut expectation_sequence);
mock_streaming_client
.expect_get_all_state_values()
.times(1)
.with(eq(highest_version), eq(Some(last_persisted_index)))
.return_once(move |_, _| Ok(data_stream_listener_2))
.in_sequence(&mut expectation_sequence);
let mut metadata_storage = MockMetadataStorage::new();
let highest_ledger_info_clone = highest_ledger_info.clone();
let last_persisted_index_clone = last_persisted_index;
metadata_storage
.expect_previous_snapshot_sync_target()
.returning(move || Ok(Some(highest_ledger_info_clone.clone())));
metadata_storage
.expect_is_snapshot_sync_complete()
.returning(|_| Ok(false));
metadata_storage
.expect_get_last_persisted_state_value_index()
.returning(move |_| Ok(last_persisted_index_clone));
let mut bootstrapper = create_bootstrapper_with_storage(
driver_configuration,
mock_streaming_client,
metadata_storage,
synced_version,
true,
);
manipulate_verified_epoch_states(&mut bootstrapper, true, true, Some(highest_version));
bootstrapper
.get_state_value_syncer()
.set_transaction_output_to_sync(create_output_list_with_proof());
let mut global_data_summary = create_global_summary(1);
global_data_summary.advertised_data.synced_ledger_infos = vec![highest_ledger_info.clone()];
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
let data_notification = DataNotification {
notification_id,
data_payload: DataPayload::TransactionOutputsWithProof(create_output_list_with_proof()),
};
notification_sender_1.push((), data_notification).unwrap();
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::InvalidPayload(_));
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
}
#[tokio::test]
async fn test_snapshot_sync_fresh_state() {
let synced_version = GENESIS_TRANSACTION_VERSION; let highest_version = 1000;
let highest_ledger_info = create_random_epoch_ending_ledger_info(highest_version, 1);
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.config.bootstrapping_mode = BootstrappingMode::DownloadLatestStates;
let mut mock_streaming_client = create_mock_streaming_client();
let (_notification_sender_1, data_stream_listener_1) = create_data_stream_listener();
mock_streaming_client
.expect_get_all_state_values()
.times(1)
.with(eq(highest_version), eq(Some(0)))
.return_once(move |_, _| Ok(data_stream_listener_1));
let mut metadata_storage = MockMetadataStorage::new();
metadata_storage
.expect_previous_snapshot_sync_target()
.returning(move || Ok(None));
let mut bootstrapper = create_bootstrapper_with_storage(
driver_configuration,
mock_streaming_client,
metadata_storage,
synced_version,
true,
);
manipulate_verified_epoch_states(&mut bootstrapper, true, true, Some(highest_version));
bootstrapper
.get_state_value_syncer()
.set_transaction_output_to_sync(create_output_list_with_proof());
let mut global_data_summary = create_global_summary(1);
global_data_summary.advertised_data.synced_ledger_infos = vec![highest_ledger_info.clone()];
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
}
#[tokio::test]
#[should_panic(
expected = "The snapshot sync for the target was marked as complete but the highest synced version is genesis!"
)]
async fn test_snapshot_sync_invalid_state() {
let synced_version = GENESIS_TRANSACTION_VERSION; let highest_version = 1000000;
let highest_ledger_info = create_random_epoch_ending_ledger_info(highest_version, 1);
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.config.bootstrapping_mode = BootstrappingMode::DownloadLatestStates;
let mock_streaming_client = create_mock_streaming_client();
let mut metadata_storage = MockMetadataStorage::new();
let highest_ledger_info_clone = highest_ledger_info.clone();
metadata_storage
.expect_previous_snapshot_sync_target()
.return_once(move || Ok(Some(highest_ledger_info_clone)));
metadata_storage
.expect_is_snapshot_sync_complete()
.returning(|_| Ok(true));
let mut bootstrapper = create_bootstrapper_with_storage(
driver_configuration,
mock_streaming_client,
metadata_storage,
synced_version,
true,
);
manipulate_verified_epoch_states(&mut bootstrapper, true, true, Some(highest_version));
let mut global_data_summary = create_global_summary(1);
global_data_summary.advertised_data.synced_ledger_infos = vec![highest_ledger_info.clone()];
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
}
#[tokio::test]
async fn test_snapshot_sync_lag() {
let num_versions_behind = 1000;
let highest_version = 1000000;
let synced_version = highest_version - num_versions_behind;
let highest_ledger_info = create_random_epoch_ending_ledger_info(highest_version, 1);
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.config.bootstrapping_mode = BootstrappingMode::DownloadLatestStates;
driver_configuration
.config
.num_versions_to_skip_snapshot_sync = num_versions_behind + 1;
let mock_streaming_client = create_mock_streaming_client();
let mut metadata_storage = MockMetadataStorage::new();
metadata_storage
.expect_previous_snapshot_sync_target()
.returning(|| Ok(None));
let mut bootstrapper = create_bootstrapper_with_storage(
driver_configuration,
mock_streaming_client,
metadata_storage,
synced_version,
true,
);
manipulate_verified_epoch_states(&mut bootstrapper, true, true, Some(highest_version));
let mut global_data_summary = create_global_summary(1);
global_data_summary.advertised_data.synced_ledger_infos = vec![highest_ledger_info.clone()];
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
assert!(bootstrapper.is_bootstrapped());
}
#[tokio::test]
#[should_panic(
expected = "Snapshot syncing is currently unsupported for nodes with existing state!"
)]
async fn test_snapshot_sync_lag_panic() {
let num_versions_behind = 10000;
let highest_version = 1000000;
let synced_version = highest_version - num_versions_behind;
let highest_ledger_info = create_random_epoch_ending_ledger_info(highest_version, 1);
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.config.bootstrapping_mode = BootstrappingMode::DownloadLatestStates;
driver_configuration
.config
.num_versions_to_skip_snapshot_sync = num_versions_behind;
let mock_streaming_client = create_mock_streaming_client();
let mut metadata_storage = MockMetadataStorage::new();
metadata_storage
.expect_previous_snapshot_sync_target()
.returning(|| Ok(None));
let mut bootstrapper = create_bootstrapper_with_storage(
driver_configuration,
mock_streaming_client,
metadata_storage,
synced_version,
true,
);
manipulate_verified_epoch_states(&mut bootstrapper, true, true, Some(highest_version));
let mut global_data_summary = create_global_summary(1);
global_data_summary.advertised_data.synced_ledger_infos = vec![highest_ledger_info.clone()];
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
}
#[tokio::test]
async fn test_waypoint_mismatch() {
let waypoint_version = 1;
let waypoint_epoch = 1;
let waypoint = create_random_epoch_ending_ledger_info(waypoint_version, waypoint_epoch);
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.waypoint = Waypoint::new_any(waypoint.ledger_info());
let mut mock_streaming_client = create_mock_streaming_client();
let (notification_sender, data_stream_listener) = create_data_stream_listener();
mock_streaming_client
.expect_get_all_epoch_ending_ledger_infos()
.with(eq(1))
.return_once(move |_| Ok(data_stream_listener));
let notification_id = 100;
mock_streaming_client
.expect_terminate_stream_with_feedback()
.with(
eq(notification_id),
eq(NotificationFeedback::PayloadProofFailed),
)
.return_const(Ok(()));
let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true);
let mut global_data_summary = create_global_summary(waypoint_epoch);
global_data_summary.advertised_data.synced_ledger_infos = vec![waypoint.clone()];
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
let invalid_ledger_info = vec![create_random_epoch_ending_ledger_info(
waypoint_version,
waypoint_epoch,
)];
let data_notification = DataNotification {
notification_id,
data_payload: DataPayload::EpochEndingLedgerInfos(invalid_ledger_info),
};
notification_sender.push((), data_notification).unwrap();
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::VerificationError(_));
}
#[tokio::test]
async fn test_waypoint_must_be_verified() {
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.config.max_stream_wait_time_ms = 1000;
let mut mock_streaming_client = create_mock_streaming_client();
let (_notification_sender, data_stream_listener) = create_data_stream_listener();
mock_streaming_client
.expect_get_all_epoch_ending_ledger_infos()
.with(eq(1))
.return_once(move |_| Ok(data_stream_listener));
let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true);
manipulate_verified_epoch_states(&mut bootstrapper, true, false, None);
let global_data_summary = create_global_summary(1);
drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap();
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::DataStreamNotificationTimeout(_));
}
#[tokio::test]
async fn test_waypoint_satisfiable() {
let mut driver_configuration = create_full_node_driver_configuration();
let waypoint = create_random_epoch_ending_ledger_info(10, 1);
driver_configuration.waypoint = Waypoint::new_any(waypoint.ledger_info());
let mock_streaming_client = create_mock_streaming_client();
let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true);
let mut global_data_summary = GlobalDataSummary::empty();
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::AdvertisedDataError(_));
global_data_summary.advertised_data.synced_ledger_infos =
vec![create_random_epoch_ending_ledger_info(9, 5)];
let error = drive_progress(&mut bootstrapper, &global_data_summary, false)
.await
.unwrap_err();
assert_matches!(error, Error::AdvertisedDataError(_));
}
fn create_bootstrapper(
driver_configuration: DriverConfiguration,
mock_streaming_client: MockStreamingClient,
expect_reset_executor: bool,
) -> Bootstrapper<MockMetadataStorage, MockStorageSynchronizer, MockStreamingClient> {
aptos_logger::Logger::init_for_testing();
let mock_storage_synchronizer = create_ready_storage_synchronizer(expect_reset_executor);
let mut metadata_storage = MockMetadataStorage::new();
metadata_storage
.expect_previous_snapshot_sync_target()
.returning(|| Ok(None));
let mut mock_database_reader = create_mock_db_reader();
mock_database_reader
.expect_get_latest_epoch_state()
.returning(|| Ok(create_empty_epoch_state()));
mock_database_reader
.expect_get_latest_ledger_info()
.returning(|| Ok(create_epoch_ending_ledger_info()));
mock_database_reader
.expect_get_latest_transaction_info_option()
.returning(|| Ok(Some((0, create_transaction_info()))));
Bootstrapper::new(
driver_configuration,
metadata_storage,
mock_streaming_client,
Arc::new(mock_database_reader),
mock_storage_synchronizer,
)
}
fn create_bootstrapper_with_storage(
driver_configuration: DriverConfiguration,
mock_streaming_client: MockStreamingClient,
mock_metadata_storage: MockMetadataStorage,
latest_synced_version: Version,
expect_reset_executor: bool,
) -> Bootstrapper<MockMetadataStorage, MockStorageSynchronizer, MockStreamingClient> {
aptos_logger::Logger::init_for_testing();
let mock_storage_synchronizer = create_ready_storage_synchronizer(expect_reset_executor);
let mut mock_database_reader = create_mock_db_reader();
mock_database_reader
.expect_get_latest_epoch_state()
.returning(|| Ok(create_empty_epoch_state()));
mock_database_reader
.expect_get_latest_ledger_info()
.returning(|| Ok(create_epoch_ending_ledger_info()));
mock_database_reader
.expect_get_latest_transaction_info_option()
.returning(move || Ok(Some((latest_synced_version, create_transaction_info()))));
Bootstrapper::new(
driver_configuration,
mock_metadata_storage,
mock_streaming_client,
Arc::new(mock_database_reader),
mock_storage_synchronizer,
)
}
async fn drive_progress(
bootstrapper: &mut Bootstrapper<
MockMetadataStorage,
MockStorageSynchronizer,
MockStreamingClient,
>,
global_data_summary: &GlobalDataSummary,
until_bootstrapped: bool,
) -> Result<(), Error> {
loop {
bootstrapper.drive_progress(global_data_summary).await?;
if !until_bootstrapped || bootstrapper.is_bootstrapped() {
return Ok(());
}
}
}
fn manipulate_verified_epoch_states(
bootstrapper: &mut Bootstrapper<
MockMetadataStorage,
MockStorageSynchronizer,
MockStreamingClient,
>,
fetched_epochs: bool,
verified_waypoint: bool,
highest_version_to_insert: Option<Version>,
) {
let verified_epoch_states = bootstrapper.get_verified_epoch_states();
if fetched_epochs {
verified_epoch_states.set_fetched_epoch_ending_ledger_infos();
}
if verified_waypoint {
verified_epoch_states.set_verified_waypoint();
}
if let Some(highest_version) = highest_version_to_insert {
let epoch_ending_ledger_info = create_random_epoch_ending_ledger_info(highest_version, 0);
let waypoint_ledger_info = create_random_epoch_ending_ledger_info(0, 1);
verified_epoch_states
.verify_epoch_ending_ledger_info(
&epoch_ending_ledger_info,
&Waypoint::new_any(waypoint_ledger_info.ledger_info()),
)
.unwrap();
}
}
fn verify_bootstrap_notification(notification_receiver: oneshot::Receiver<Result<(), Error>>) {
assert_ok!(notification_receiver.now_or_never().unwrap().unwrap());
}