#![allow(clippy::arithmetic_side_effects)]
use std::collections::BTreeSet;
use itertools::Itertools;
use crate::{
components::consensus::{
highway_core::{
highway::{tests::test_validators, ValidVertex},
highway_testing::TEST_INSTANCE_ID,
state::{tests::*, State},
},
BlockContext,
},
types::NodeId,
};
use super::*;
#[test]
fn purge_vertices() {
let params = test_params(0);
let mut state = State::new(WEIGHTS, params.clone(), vec![], vec![]);
let c0 = add_unit!(state, CAROL, 0x00, 0u8, 0xA; N, N, N).unwrap();
let c1 = add_unit!(state, CAROL, 0x0A, 0u8, None; N, N, c0).unwrap();
let c2 = add_unit!(state, CAROL, 0x1A, 0u8, None; N, N, c1).unwrap();
let b0 = add_unit!(state, BOB, 0x2A, 0u8, None; N, N, c0).unwrap();
let b1 = add_unit!(state, BOB, 0x3A, 0u8, None; N, b0, c0).unwrap();
let util_highway =
Highway::<TestContext>::new(TEST_INSTANCE_ID, test_validators(), params.clone(), None);
let unit = |hash: u64| Vertex::Unit(state.wire_unit(&hash, TEST_INSTANCE_ID).unwrap());
let pvv = |hash: u64| util_highway.pre_validate_vertex(unit(hash)).unwrap();
let peer0 = NodeId::from([0; 64]);
let max_requests_for_vertex = 5;
let mut sync = Synchronizer::<TestContext>::new(WEIGHTS.len(), TEST_INSTANCE_ID);
let mut highway =
Highway::<TestContext>::new(TEST_INSTANCE_ID, test_validators(), params, None);
let now = 0x20.into();
assert!(matches!(
*sync.schedule_add_vertex(peer0, pvv(c2), now),
[ProtocolOutcome::QueueAction(ACTION_ID_VERTEX)]
));
sync.store_vertex_for_addition_later(unit(b1).timestamp().unwrap(), now, peer0, pvv(b1));
sync.store_vertex_for_addition_later(unit(b0).timestamp().unwrap(), now, peer0, pvv(b0));
let now = 0x21.into();
assert!(sync.schedule_add_vertex(peer0, pvv(c1), now).is_empty());
let (maybe_pv, outcomes) =
sync.pop_vertex_to_add(&highway, &Default::default(), max_requests_for_vertex);
assert!(maybe_pv.is_none());
assert_targeted_message(&unwrap_single(outcomes), &peer0, Dependency::Unit(c0));
let now = 0x23.into();
let outcomes = sync.schedule_add_vertex(peer0, pvv(c0), now);
assert!(
matches!(*outcomes, [ProtocolOutcome::QueueAction(ACTION_ID_VERTEX)]),
"unexpected outcomes: {:?}",
outcomes
);
let (maybe_pv, outcomes) =
sync.pop_vertex_to_add(&highway, &Default::default(), max_requests_for_vertex);
assert_eq!(Dependency::Unit(c0), maybe_pv.unwrap().vertex().id());
assert!(outcomes.is_empty());
let vv_c0 = highway.validate_vertex(pvv(c0)).expect("c0 is valid");
highway.add_valid_vertex(vv_c0, now);
let outcomes = sync.remove_satisfied_deps(&highway);
assert!(
matches!(*outcomes, [ProtocolOutcome::QueueAction(ACTION_ID_VERTEX)]),
"unexpected outcomes: {:?}",
outcomes
);
let now = 0x2A.into();
assert!(sync.add_past_due_stored_vertices(now).is_empty());
let purge_vertex_timeout = 0x20;
#[allow(clippy::arithmetic_side_effects)]
sync.purge_vertices((0x41 - purge_vertex_timeout).into());
let (maybe_pv, outcomes) =
sync.pop_vertex_to_add(&highway, &Default::default(), max_requests_for_vertex);
assert_eq!(Dependency::Unit(c1), maybe_pv.unwrap().vertex().id());
assert!(outcomes.is_empty());
assert!(sync.is_empty());
}
#[test]
fn do_not_download_synchronized_dependencies() {
let params = test_params(0);
let mut state = State::new(WEIGHTS, params.clone(), vec![], vec![]);
let util_highway =
Highway::<TestContext>::new(TEST_INSTANCE_ID, test_validators(), params.clone(), None);
let c0 = add_unit!(state, CAROL, 0x00, 0u8, 0xA; N, N, N).unwrap();
let c1 = add_unit!(state, CAROL, 0x0A, 0u8, None; N, N, c0).unwrap();
let c2 = add_unit!(state, CAROL, 0x1A, 0u8, None; N, N, c1).unwrap();
let b0 = add_unit!(state, BOB, 0x2A, 0u8, None; N, N, c1).unwrap();
let unit = |hash: u64| Vertex::Unit(state.wire_unit(&hash, TEST_INSTANCE_ID).unwrap());
let pvv = |hash: u64| util_highway.pre_validate_vertex(unit(hash)).unwrap();
let peer0 = NodeId::from([0; 64]);
let peer1 = NodeId::from([1; 64]);
let max_requests_for_vertex = 5;
let mut sync = Synchronizer::<TestContext>::new(WEIGHTS.len(), TEST_INSTANCE_ID);
let mut highway =
Highway::<TestContext>::new(TEST_INSTANCE_ID, test_validators(), params, None);
let now = 0x20.into();
assert!(matches!(
*sync.schedule_add_vertex(peer0, pvv(c2), now),
[ProtocolOutcome::QueueAction(ACTION_ID_VERTEX)]
));
let (pv, outcomes) =
sync.pop_vertex_to_add(&highway, &Default::default(), max_requests_for_vertex);
assert!(pv.is_none());
assert_targeted_message(&unwrap_single(outcomes), &peer0, Dependency::Unit(c1));
let c1_outcomes = sync.schedule_add_vertex(peer0, pvv(c1), now);
assert!(matches!(
*c1_outcomes,
[ProtocolOutcome::QueueAction(ACTION_ID_VERTEX)]
));
let (pv, outcomes) =
sync.pop_vertex_to_add(&highway, &Default::default(), max_requests_for_vertex);
assert!(pv.is_none());
assert_targeted_message(&unwrap_single(outcomes), &peer0, Dependency::Unit(c0));
assert!(matches!(
*sync.schedule_add_vertex(peer1, pvv(b0), now),
[ProtocolOutcome::QueueAction(ACTION_ID_VERTEX)]
));
let (pv, outcomes) =
sync.pop_vertex_to_add(&highway, &Default::default(), max_requests_for_vertex);
assert!(pv.is_none());
assert_targeted_message(&unwrap_single(outcomes), &peer1, Dependency::Unit(c0));
let _ = sync.schedule_add_vertex(peer0, pvv(c0), now);
let mut units: BTreeSet<Dependency<TestContext>> = vec![c0, c1, b0, c2]
.into_iter()
.map(Dependency::Unit)
.collect();
while let (Some(pv), outcomes) =
sync.pop_vertex_to_add(&highway, &Default::default(), max_requests_for_vertex)
{
assert!(
!outcomes
.iter()
.any(|outcome| matches!(outcome, ProtocolOutcome::CreatedTargetedMessage(_, _))),
"unexpected dependency request {:?}",
outcomes
);
let pv_dep = pv.vertex().id();
assert!(units.remove(&pv_dep), "unexpected dependency");
match pv_dep {
Dependency::Unit(hash) => {
let vv = highway
.validate_vertex(pvv(hash))
.unwrap_or_else(|_| panic!("{:?} unit is valid", hash));
highway.add_valid_vertex(vv, now);
let _ = sync.remove_satisfied_deps(&highway);
}
_ => panic!("expected unit"),
}
}
assert!(sync.is_empty());
}
#[test]
fn transitive_proposal_dependency() {
let params = test_params(0);
let mut state = State::new(WEIGHTS, params.clone(), vec![], vec![]);
let util_highway =
Highway::<TestContext>::new(TEST_INSTANCE_ID, test_validators(), params.clone(), None);
let a0 = add_unit!(state, ALICE, 0xA; N, N, N).unwrap();
let c0 = add_unit!(state, CAROL, 0xC; N, N, N).unwrap();
let a1 = add_unit!(state, ALICE, None; a0, N, c0).unwrap();
let b0 = add_unit!(state, BOB, None; a1, N, c0).unwrap();
let unit = |hash: u64| Vertex::Unit(state.wire_unit(&hash, TEST_INSTANCE_ID).unwrap());
let pvv = |hash: u64| util_highway.pre_validate_vertex(unit(hash)).unwrap();
let peer0 = NodeId::from([0; 64]);
let peer1 = NodeId::from([1; 64]);
let max_requests_for_vertex = 5;
let mut sync = Synchronizer::<TestContext>::new(WEIGHTS.len(), TEST_INSTANCE_ID);
let mut highway =
Highway::<TestContext>::new(TEST_INSTANCE_ID, test_validators(), params, None);
let now = 0x100.into();
assert!(matches!(
*sync.schedule_add_vertex(peer0, pvv(a1), now),
[ProtocolOutcome::QueueAction(ACTION_ID_VERTEX)]
));
let (pv, outcomes) =
sync.pop_vertex_to_add(&highway, &Default::default(), max_requests_for_vertex);
assert!(pv.is_none());
assert_targeted_message(&unwrap_single(outcomes), &peer0, Dependency::Unit(a0));
let a0_outcomes = sync.schedule_add_vertex(peer0, pvv(a0), now);
assert!(matches!(
*a0_outcomes,
[ProtocolOutcome::QueueAction(ACTION_ID_VERTEX)]
));
let (maybe_pv, outcomes) =
sync.pop_vertex_to_add(&highway, &Default::default(), max_requests_for_vertex);
let pv = maybe_pv.expect("expected a0 vertex");
assert_eq!(pv.vertex(), &unit(a0));
assert!(outcomes.is_empty());
assert!(matches!(
*sync.schedule_add_vertex(peer1, pvv(b0), now),
[ProtocolOutcome::QueueAction(ACTION_ID_VERTEX)]
));
let a0_pending_values = {
let mut tmp = HashMap::new();
let vv = ValidVertex(unit(a0));
let proposed_block = ProposedBlock::new(1u32, BlockContext::new(now, Vec::new()));
let mut set = HashSet::new();
set.insert((vv, peer0));
tmp.insert(proposed_block, set);
tmp
};
let (maybe_pv, outcomes) =
sync.pop_vertex_to_add(&highway, &a0_pending_values, max_requests_for_vertex);
let pv = maybe_pv.unwrap();
assert!(pv.sender() == &peer1);
assert!(pv.vertex() == &unit(a0));
assert!(outcomes.is_empty());
let vv = highway.validate_vertex(pvv(a0)).expect("a0 is valid");
highway.add_valid_vertex(vv, now);
assert!(matches!(
*sync.remove_satisfied_deps(&highway),
[ProtocolOutcome::QueueAction(ACTION_ID_VERTEX)]
));
let (maybe_pv, outcomes) =
sync.pop_vertex_to_add(&highway, &Default::default(), max_requests_for_vertex);
assert!(maybe_pv.is_none());
match &*outcomes {
[ProtocolOutcome::CreatedTargetedMessage(msg0_serialized, p0), ProtocolOutcome::CreatedTargetedMessage(msg1_serialized, p1)] =>
{
let msg0: HighwayMessage<TestContext> = msg0_serialized.deserialize_expect();
let msg1: HighwayMessage<TestContext> = msg1_serialized.deserialize_expect();
assert_eq!(
vec![&peer0, &peer1],
vec![p0, p1].into_iter().sorted().collect_vec(),
"expected to request dependency from exactly two different peers",
);
match (msg0, msg1) {
(
HighwayMessage::RequestDependency(_, dep0),
HighwayMessage::RequestDependency(_, dep1),
) => {
assert_eq!(
dep0,
Dependency::Unit(c0),
"unexpected dependency requested"
);
assert_eq!(
dep0, dep1,
"we should have requested the same dependency from two different peers"
);
}
other => panic!("unexpected HighwayMessage variant {:?}", other),
}
}
outcomes => panic!("unexpected outcomes: {:?}", outcomes),
}
}
fn unwrap_single<T: Debug>(vec: Vec<T>) -> T {
assert_eq!(
vec.len(),
1,
"expected single element in the vector {:?}",
vec
);
vec.into_iter().next().unwrap()
}
fn assert_targeted_message(
outcome: &ProtocolOutcome<TestContext>,
peer: &NodeId,
expected: Dependency<TestContext>,
) {
match outcome {
ProtocolOutcome::CreatedTargetedMessage(raw_msg, peer0) => {
assert_eq!(peer, peer0);
let msg = raw_msg.deserialize_expect();
match msg {
HighwayMessage::RequestDependency(_, got) => assert_eq!(got, expected),
other => panic!("unexpected variant: {:?}", other),
}
}
_ => panic!("unexpected outcome: {:?}", outcome),
}
}