use std::time::{Duration, Instant};
use exocore_core::framing::FrameBuilder;
use itertools::Itertools;
use super::*;
use crate::{
chain::directory::DirectoryChainStore,
engine::{testing::*, SyncState},
operation::OperationBuilder,
};
#[test]
fn handle_sync_response_blocks() -> anyhow::Result<()> {
let mut cluster = EngineTestCluster::new(2);
cluster.chain_generate_dummy(0, 10, 1234);
cluster.chain_generate_dummy(1, 100, 1234);
let node0 = cluster.get_node(0);
let node1 = cluster.get_node(1);
cluster.sync_chain_node_to_node(0, 1)?;
cluster.tick_chain_synchronizer(0)?;
assert_eq!(cluster.chains_synchronizer[0].status, Status::Downloading);
assert!(cluster.chains_synchronizer[0].is_leader(node1.id()));
let blocks_iter = cluster.chains[1].blocks_iter(0);
let response = ChainSynchronizer::<DirectoryChainStore>::create_sync_response_for_blocks(
&cluster.chains_synchronizer[1].config,
10,
0,
blocks_iter,
)?;
let response_frame = response.as_owned_frame();
let mut sync_context = SyncContext::new(SyncState::default());
let result = cluster.chains_synchronizer[0].handle_sync_response(
&mut sync_context,
&node0,
&mut cluster.chains[0],
response_frame,
);
assert!(result.is_err());
assert!(sync_context.messages.is_empty());
let blocks_iter = cluster.chains[1].blocks_iter(0);
let response = ChainSynchronizer::<DirectoryChainStore>::create_sync_response_for_blocks(
&cluster.chains_synchronizer[1].config,
10,
0,
blocks_iter,
)?;
let response_frame = response.as_owned_frame();
let mut sync_context = SyncContext::new(SyncState::default());
let result = cluster.chains_synchronizer[0].handle_sync_response(
&mut sync_context,
&node1,
&mut cluster.chains[0],
response_frame,
);
assert!(result.is_err());
let blocks_iter = cluster.chains[1].blocks_iter(0).skip(10); let response = ChainSynchronizer::<DirectoryChainStore>::create_sync_response_for_blocks(
&cluster.chains_synchronizer[0].config,
10,
0,
blocks_iter,
)?;
let response_frame = response.as_owned_frame();
let mut sync_context = SyncContext::new(SyncState::default());
cluster.chains_synchronizer[0].handle_sync_response(
&mut sync_context,
&node1,
&mut cluster.chains[0],
response_frame,
)?;
Ok(())
}
#[test]
fn sync_empty_node1_to_full_node2() -> anyhow::Result<()> {
let mut cluster = EngineTestCluster::new(2);
cluster.chain_generate_dummy(1, 100, 3434);
let node1 = cluster.get_node(1);
cluster.sync_chain_node_to_node(0, 1)?;
{
let node1_node2_info = &cluster.chains_synchronizer[0].nodes_info[node1.id()];
assert_eq!(NodeStatus::Synchronized, node1_node2_info.status(),);
assert_eq!(
None,
node1_node2_info
.last_common_block
.as_ref()
.map(|b| b.height),
);
assert_eq!(
Some(99),
node1_node2_info.last_known_block.as_ref().map(|b| b.height),
);
}
cluster.sync_chain_node_to_node(0, 1)?;
assert_eq!(Status::Synchronized, cluster.chains_synchronizer[0].status);
assert!(cluster.chains_synchronizer[0].is_leader(node1.id()));
cluster.chains_synchronizer[0].status = Status::Downloading;
cluster.sync_chain_node_to_node(0, 1)?;
assert_eq!(Status::Synchronized, cluster.chains_synchronizer[0].status);
cluster.assert_node_chain_equals(0, 1);
Ok(())
}
#[test]
fn sync_full_node1_to_empty_node2() -> anyhow::Result<()> {
let mut cluster = EngineTestCluster::new(2);
cluster.chain_generate_dummy(0, 100, 3434);
let node1 = cluster.get_node(1);
for _i in 0..2 {
cluster.sync_chain_node_to_node(0, 1)?;
let node1_node2_info = &cluster.chains_synchronizer[0].nodes_info[node1.id()];
assert_eq!(node1_node2_info.status(), NodeStatus::Synchronized);
assert_eq!(
node1_node2_info
.last_common_block
.as_ref()
.map(|b| b.height),
None
);
assert_eq!(
node1_node2_info.last_known_block.as_ref().map(|b| b.height),
None
);
}
assert_eq!(cluster.chains_synchronizer[0].status, Status::Synchronized);
Ok(())
}
#[test]
fn sync_full_node1_to_half_node2() -> anyhow::Result<()> {
let mut cluster = EngineTestCluster::new(2);
cluster.chain_generate_dummy(0, 100, 3434);
cluster.chain_generate_dummy(1, 50, 3434);
let node0 = cluster.get_node(0);
let node1 = cluster.get_node(1);
for _i in 0..2 {
cluster.sync_chain_node_to_node(0, 1)?;
let node1_node2_info = &cluster.chains_synchronizer[0].nodes_info[node1.id()];
assert_eq!(node1_node2_info.status(), NodeStatus::Synchronized);
assert_eq!(
node1_node2_info
.last_common_block
.as_ref()
.map(|b| b.height),
Some(49)
);
assert_eq!(
node1_node2_info.last_known_block.as_ref().map(|b| b.height),
Some(49)
);
}
assert!(cluster.chains_synchronizer[0].is_leader(node0.id()));
assert_eq!(cluster.chains_synchronizer[0].status, Status::Synchronized);
Ok(())
}
#[test]
fn sync_half_node1_to_full_node2() -> anyhow::Result<()> {
let mut cluster = EngineTestCluster::new(2);
cluster.chain_generate_dummy(0, 50, 3434);
cluster.chain_generate_dummy(1, 100, 3434);
let node1 = cluster.get_node(1);
cluster.sync_chain_node_to_node(0, 1)?;
{
let node1_node2_info = &cluster.chains_synchronizer[0].nodes_info[node1.id()];
assert_eq!(node1_node2_info.status(), NodeStatus::Synchronized);
assert_eq!(
node1_node2_info
.last_common_block
.as_ref()
.map(|b| b.height),
Some(49)
);
assert_eq!(
node1_node2_info.last_known_block.as_ref().map(|b| b.height),
Some(99)
);
}
cluster.sync_chain_node_to_node(0, 1)?;
assert!(cluster.chains_synchronizer[0].is_leader(node1.id()));
assert_eq!(cluster.chains_synchronizer[0].status, Status::Synchronized);
cluster.assert_node_chain_equals(0, 1);
Ok(())
}
#[test]
fn sync_fully_divergent_node1_to_full_node2() -> anyhow::Result<()> {
let mut cluster = EngineTestCluster::new(2);
cluster.chain_generate_dummy(0, 100, 1234);
cluster.chain_generate_dummy(1, 100, 9876);
let node1 = cluster.get_node(1);
cluster.sync_chain_node_to_node(0, 1)?;
{
let node1_node2_info = &cluster.chains_synchronizer[0].nodes_info[node1.id()];
assert_eq!(node1_node2_info.status(), NodeStatus::Synchronized);
assert_eq!(
node1_node2_info
.last_common_block
.as_ref()
.map(|b| b.height),
None,
);
assert_eq!(
node1_node2_info.last_known_block.as_ref().map(|b| b.height),
Some(99),
);
}
match cluster.sync_chain_node_to_node(0, 1).err() {
Some(EngineError::ChainSync(ChainSyncError::Diverged(_))) => {}
other => panic!("Expected a diverged error, got {:?}", other),
}
assert_eq!(cluster.chains_synchronizer[0].status, Status::Unknown);
Ok(())
}
#[test]
fn sync_single_block_even_if_max_out_size() -> anyhow::Result<()> {
let mut cluster = EngineTestCluster::new(2);
let node_0 = cluster.get_local_node(0);
cluster.chain_add_genesis_block(0);
let operation_count = cluster.chains_synchronizer[0].config.blocks_max_send_size / (5 * 1024);
let operation_size =
cluster.chains_synchronizer[0].config.blocks_max_send_size / operation_count;
let operations = (0..operation_count)
.map(|_i| {
let op_id = cluster.consistent_timestamp(0).into();
let data = vec![0u8; operation_size + 1];
OperationBuilder::new_entry(op_id, node_0.id(), &data)
.sign_and_build(&node_0)
.unwrap()
.frame
})
.collect_vec();
cluster.chain_add_block_with_operations(0, operations.into_iter())?;
let node0_last_block = cluster.chains[0].get_last_block()?.unwrap();
let node0_last_block_size = node0_last_block.operations_data().len();
assert!(node0_last_block_size > cluster.chains_synchronizer[0].config.blocks_max_send_size);
cluster.chain_generate_dummy(1, 0, 1234);
cluster.sync_chain_node_to_node(1, 0)?;
cluster.sync_chain_node_to_node(1, 0)?;
let node1_last_block = cluster.chains[1].get_last_block()?.unwrap();
assert_eq!(
node0_last_block_size,
node1_last_block.operations_data().len()
);
Ok(())
}
#[test]
fn cannot_sync_all_divergent() -> anyhow::Result<()> {
let mut cluster = EngineTestCluster::new(4);
cluster.chain_generate_dummy(0, 100, 1234);
cluster.chain_generate_dummy(1, 100, 9876);
cluster.chain_generate_dummy(2, 100, 9876);
cluster.chain_generate_dummy(3, 100, 9876);
cluster.sync_chain_node_to_all(0)?;
match cluster.sync_chain_node_to_all(0).err() {
Some(EngineError::ChainSync(ChainSyncError::Diverged(_))) => {}
other => panic!("Expected a diverged error, got {:?}", other),
}
assert_eq!(cluster.chains_synchronizer[0].status, Status::Unknown);
Ok(())
}
#[test]
fn sync_half_divergent_node1_to_full_node2() -> anyhow::Result<()> {
let mut cluster = EngineTestCluster::new(2);
cluster.chain_generate_dummy(0, 100, 1234);
cluster.chain_generate_dummy(1, 50, 1234);
cluster.chain_append_dummy(1, 50, 1234);
let node1 = cluster.get_node(1);
cluster.sync_chain_node_to_node(0, 1)?;
{
let node1_node2_info = &cluster.chains_synchronizer[0].nodes_info[node1.id()];
assert_eq!(node1_node2_info.status(), NodeStatus::Synchronized);
assert_eq!(
node1_node2_info
.last_common_block
.as_ref()
.map(|b| b.height),
Some(49),
);
assert_eq!(
node1_node2_info.last_known_block.as_ref().map(|b| b.height),
Some(99),
);
}
match cluster.sync_chain_node_to_node(0, 1).err() {
Some(EngineError::ChainSync(ChainSyncError::Diverged(_))) => {}
other => panic!("Expected a diverged error, got {:?}", other),
}
assert_eq!(cluster.chains_synchronizer[0].status, Status::Unknown);
Ok(())
}
#[test]
fn sync_empty_node1_to_big_chain_node2() -> anyhow::Result<()> {
let mut cluster = EngineTestCluster::new(2);
cluster.chains_synchronizer[0].config.blocks_max_send_size = 1024;
cluster.chain_generate_dummy(1, 1024, 3434);
cluster.sync_chain_node_to_node(0, 1)?;
cluster.sync_chain_node_to_node(0, 1)?;
assert_eq!(cluster.chains_synchronizer[0].status, Status::Synchronized);
Ok(())
}
#[test]
fn leader_lost_metadata_out_of_date() -> anyhow::Result<()> {
let mut cluster = EngineTestCluster::new(4);
cluster.chain_generate_dummy(0, 50, 3434);
cluster.chain_generate_dummy(1, 100, 3434);
cluster.chain_generate_dummy(2, 90, 3434);
cluster.chain_generate_dummy(3, 90, 3434);
let node1 = cluster.get_node(1);
cluster.sync_chain_node_to_all(0)?;
cluster.sync_chain_node_to_all(0)?;
assert!(cluster.chains_synchronizer[0].is_leader(node1.id()));
{
let node_info = cluster.chains_synchronizer[0].get_or_create_node_info_mut(node1.id());
assert_eq!(node_info.status(), NodeStatus::Synchronized);
node_info.last_common_is_known = false;
node_info.last_known_block = None;
assert_eq!(node_info.status(), NodeStatus::Unknown);
}
cluster.tick_chain_synchronizer(0)?;
assert!(!cluster.chains_synchronizer[0].is_leader(node1.id()));
Ok(())
}
#[test]
fn leader_lost_chain_too_far() -> anyhow::Result<()> {
let mut cluster = EngineTestCluster::new(2);
cluster.chain_generate_dummy(0, 50, 3434);
cluster.chain_generate_dummy(1, 100, 3434);
cluster.clocks[0].set_fixed_instant(Instant::now());
let node1 = cluster.get_node(1);
cluster.sync_chain_node_to_node(0, 1)?;
cluster.sync_chain_node_to_node(0, 1)?;
assert!(cluster.chains_synchronizer[0].is_leader(node1.id()));
cluster.chain_append_dummy(1, 2, 3434);
cluster.clocks[0].add_fixed_instant_duration(Duration::from_secs(10));
cluster.sync_chain_node_to_node(0, 1)?;
assert_eq!(
Status::Synchronized,
cluster.chains_synchronizer[0].status(),
);
cluster.chain_append_dummy(1, 10, 3434);
cluster.clocks[0].add_fixed_instant_duration(Duration::from_secs(10));
cluster.sync_chain_node_to_node(0, 1)?;
cluster.tick_chain_synchronizer(0)?;
assert_eq!(Status::Downloading, cluster.chains_synchronizer[0].status(),);
Ok(())
}
#[test]
fn quorum_lost_and_regain() -> anyhow::Result<()> {
let mut cluster = EngineTestCluster::new(3);
cluster.chain_generate_dummy(0, 50, 3434);
cluster.chain_generate_dummy(1, 100, 3434);
cluster.chain_generate_dummy(2, 100, 3434);
cluster.sync_chain_node_to_all(0)?;
cluster.sync_chain_node_to_all(0)?;
assert_eq!(Status::Synchronized, cluster.chains_synchronizer[0].status);
for node_idx in 1..=2 {
let node = cluster.get_node(node_idx);
let node_info = cluster.chains_synchronizer[0].get_or_create_node_info_mut(node.id());
assert_eq!(NodeStatus::Synchronized, node_info.check_status());
node_info.request_tracker.set_response_failure_count(100);
assert_eq!(NodeStatus::Unknown, node_info.check_status());
}
cluster.tick_chain_synchronizer(0)?;
cluster.tick_chain_synchronizer(0)?;
cluster.tick_chain_synchronizer(0)?;
assert_eq!(Status::Unknown, cluster.chains_synchronizer[0].status);
for node_idx in 1..=2 {
let node = cluster.get_node(node_idx);
let node_info = cluster.chains_synchronizer[0].get_or_create_node_info_mut(node.id());
node_info.request_tracker.reset();
}
cluster.sync_chain_node_to_all(0)?;
cluster.sync_chain_node_to_all(0)?;
assert_eq!(Status::Synchronized, cluster.chains_synchronizer[0].status);
Ok(())
}