use crate::{
continuous_syncer::ContinuousSyncer,
driver::DriverConfiguration,
error::Error,
notification_handlers::ConsensusSyncRequest,
tests::{
mocks::{
create_mock_db_reader, create_mock_streaming_client, create_ready_storage_synchronizer,
MockStorageSynchronizer, MockStreamingClient,
},
utils::{
create_data_stream_listener, create_epoch_ending_ledger_info, create_epoch_state,
create_full_node_driver_configuration, create_transaction_info,
},
},
};
use aptos_config::config::ContinuousSyncingMode;
use aptos_infallible::Mutex;
use aptos_types::transaction::{TransactionOutputListWithProof, Version};
use claim::assert_matches;
use consensus_notifications::ConsensusSyncNotification;
use data_streaming_service::{
data_notification::{DataNotification, DataPayload},
streaming_client::NotificationFeedback,
};
use mockall::{predicate::eq, Sequence};
use std::sync::Arc;
use storage_service_types::Epoch;
#[tokio::test]
async fn test_critical_timeout() {
let current_synced_epoch = 54;
let current_synced_version = 904345;
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.config.continuous_syncing_mode =
ContinuousSyncingMode::ApplyTransactionOutputs;
driver_configuration.config.max_stream_wait_time_ms = 1000;
let mut mock_streaming_client = create_mock_streaming_client();
let mut expectation_sequence = Sequence::new();
let (_notification_sender_1, data_stream_listener_1) = create_data_stream_listener();
let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener();
for data_stream_listener in [data_stream_listener_1, data_stream_listener_2] {
mock_streaming_client
.expect_continuously_stream_transaction_outputs()
.times(1)
.with(
eq(current_synced_version),
eq(current_synced_epoch),
eq(None),
)
.return_once(move |_, _, _| Ok(data_stream_listener))
.in_sequence(&mut expectation_sequence);
}
let mut continuous_syncer = create_continuous_syncer(
driver_configuration,
mock_streaming_client,
true,
current_synced_version,
current_synced_epoch,
);
let no_sync_request = Arc::new(Mutex::new(None));
continuous_syncer
.drive_progress(no_sync_request.clone())
.await
.unwrap();
for _ in 0..2 {
let error = continuous_syncer
.drive_progress(no_sync_request.clone())
.await
.unwrap_err();
assert_matches!(error, Error::DataStreamNotificationTimeout(_));
}
let error = continuous_syncer
.drive_progress(no_sync_request.clone())
.await
.unwrap_err();
assert_matches!(error, Error::CriticalDataStreamTimeout(_));
continuous_syncer
.drive_progress(no_sync_request.clone())
.await
.unwrap();
let error = continuous_syncer
.drive_progress(no_sync_request.clone())
.await
.unwrap_err();
assert_matches!(error, Error::DataStreamNotificationTimeout(_));
}
#[tokio::test]
async fn test_data_stream_transactions_with_target() {
let current_synced_epoch = 5;
let current_synced_version = 234;
let notification_id = 435345;
let target_ledger_info = create_epoch_ending_ledger_info();
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.config.continuous_syncing_mode =
ContinuousSyncingMode::ExecuteTransactions;
let mut mock_streaming_client = create_mock_streaming_client();
let mut expectation_sequence = Sequence::new();
let (notification_sender_1, data_stream_listener_1) = create_data_stream_listener();
let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener();
for data_stream_listener in [data_stream_listener_1, data_stream_listener_2] {
mock_streaming_client
.expect_continuously_stream_transactions()
.times(1)
.with(
eq(current_synced_version),
eq(current_synced_epoch),
eq(false),
eq(Some(target_ledger_info.clone())),
)
.return_once(move |_, _, _, _| Ok(data_stream_listener))
.in_sequence(&mut expectation_sequence);
}
mock_streaming_client
.expect_terminate_stream_with_feedback()
.with(
eq(notification_id),
eq(NotificationFeedback::EmptyPayloadData),
)
.return_const(Ok(()));
let mut continuous_syncer = create_continuous_syncer(
driver_configuration,
mock_streaming_client,
true,
current_synced_version,
current_synced_epoch,
);
let (consensus_sync_notification, _) = ConsensusSyncNotification::new(target_ledger_info);
let sync_request = Arc::new(Mutex::new(Some(ConsensusSyncRequest::new(
consensus_sync_notification,
))));
continuous_syncer
.drive_progress(sync_request.clone())
.await
.unwrap();
let data_notification = DataNotification {
notification_id,
data_payload: DataPayload::ContinuousTransactionOutputsWithProof(
create_epoch_ending_ledger_info(),
TransactionOutputListWithProof::new_empty(),
),
};
notification_sender_1.push((), data_notification).unwrap();
let error = continuous_syncer
.drive_progress(sync_request.clone())
.await
.unwrap_err();
assert_matches!(error, Error::VerificationError(_));
continuous_syncer
.drive_progress(sync_request.clone())
.await
.unwrap();
}
#[tokio::test]
async fn test_data_stream_transaction_outputs() {
let current_synced_epoch = 100;
let current_synced_version = 5;
let notification_id = 1235;
let mut driver_configuration = create_full_node_driver_configuration();
driver_configuration.config.continuous_syncing_mode =
ContinuousSyncingMode::ApplyTransactionOutputs;
let mut mock_streaming_client = create_mock_streaming_client();
let mut expectation_sequence = Sequence::new();
let (notification_sender_1, data_stream_listener_1) = create_data_stream_listener();
let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener();
for data_stream_listener in [data_stream_listener_1, data_stream_listener_2] {
mock_streaming_client
.expect_continuously_stream_transaction_outputs()
.times(1)
.with(
eq(current_synced_version),
eq(current_synced_epoch),
eq(None),
)
.return_once(move |_, _, _| Ok(data_stream_listener))
.in_sequence(&mut expectation_sequence);
}
mock_streaming_client
.expect_terminate_stream_with_feedback()
.with(
eq(notification_id),
eq(NotificationFeedback::InvalidPayloadData),
)
.return_const(Ok(()));
let mut continuous_syncer = create_continuous_syncer(
driver_configuration,
mock_streaming_client,
true,
current_synced_version,
current_synced_epoch,
);
let no_sync_request = Arc::new(Mutex::new(None));
continuous_syncer
.drive_progress(no_sync_request.clone())
.await
.unwrap();
let mut transaction_output_with_proof = TransactionOutputListWithProof::new_empty();
transaction_output_with_proof.first_transaction_output_version =
Some(current_synced_version - 1);
let data_notification = DataNotification {
notification_id,
data_payload: DataPayload::ContinuousTransactionOutputsWithProof(
create_epoch_ending_ledger_info(),
transaction_output_with_proof,
),
};
notification_sender_1.push((), data_notification).unwrap();
let error = continuous_syncer
.drive_progress(no_sync_request.clone())
.await
.unwrap_err();
assert_matches!(error, Error::VerificationError(_));
continuous_syncer
.drive_progress(no_sync_request.clone())
.await
.unwrap();
}
fn create_continuous_syncer(
driver_configuration: DriverConfiguration,
mock_streaming_client: MockStreamingClient,
expect_reset_executor: bool,
synced_version: Version,
current_epoch: Epoch,
) -> ContinuousSyncer<MockStorageSynchronizer, MockStreamingClient> {
aptos_logger::Logger::init_for_testing();
let mock_storage_synchronizer = create_ready_storage_synchronizer(expect_reset_executor);
let mut mock_database_reader = create_mock_db_reader();
mock_database_reader
.expect_get_latest_transaction_info_option()
.returning(move || Ok(Some((synced_version, create_transaction_info()))));
mock_database_reader
.expect_get_latest_epoch_state()
.returning(move || Ok(create_epoch_state(current_epoch)));
ContinuousSyncer::new(
driver_configuration,
mock_streaming_client,
Arc::new(mock_database_reader),
mock_storage_synchronizer,
)
}