use crate::common::{
CurrentNetwork,
primary::new_test_committee,
utils::{sample_ledger, sample_worker},
};
use snarkos_node_bft::helpers::max_redundant_requests;
use snarkvm::{
ledger::narwhal::TransmissionID,
prelude::{Network, TestRng},
};
use std::net::SocketAddr;
#[tokio::test]
#[rustfmt::skip]
async fn test_resend_transmission_request() {
let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1;
let mut rng = TestRng::default();
let (accounts, committee) = new_test_committee(num_nodes, &mut rng);
let ledger = {
let accounts = accounts.clone();
tokio::task::spawn_blocking(move || sample_ledger(&accounts, &committee, &mut rng)).await.unwrap()
};
let worker = sample_worker(0, accounts[0].clone(), ledger.clone());
let max_redundancy = max_redundant_requests(ledger.clone(), 0).unwrap();
assert_eq!(max_redundancy, 6, "Update me if the formula changes");
let num_test_requests = 11;
let mut peer_ips = (0..num_test_requests).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect::<Vec<_>>();
let initial_peer_ip = peer_ips.pop().unwrap();
let transmission_id = TransmissionID::Transaction(<CurrentNetwork as Network>::TransactionID::default(), <CurrentNetwork as Network>::TransmissionChecksum::default());
assert!(!worker.contains_transmission(transmission_id), "Transmission should not exist");
let worker_ = worker.clone();
tokio::spawn(async move { worker_.get_or_fetch_transmission(initial_peer_ip, transmission_id).await });
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let pending = worker.pending();
assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue");
assert!(pending.contains_peer(transmission_id, initial_peer_ip), "Missing a peer IP for transmission in the pending queue");
assert_eq!(pending.get_peers(transmission_id), Some([initial_peer_ip].into_iter().collect()), "Missing a peer IP for transmission in the pending queue");
assert_eq!(pending.num_callbacks(transmission_id), 1, "Incorrect number of callbacks for transmission");
assert_eq!(pending.num_sent_requests(transmission_id), 1, "Incorrect number of sent requests for transmission");
for i in 1..num_test_requests {
let worker_ = worker.clone();
let peer_ip = initial_peer_ip;
tokio::spawn(async move { worker_.get_or_fetch_transmission(peer_ip, transmission_id).await });
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue");
assert!(pending.contains_peer(transmission_id, peer_ip), "Missing a peer IP for transmission in the pending queue");
assert_eq!(pending.get_peers(transmission_id), Some([peer_ip].into_iter().collect()), "Missing a peer IP for transmission in the pending queue");
assert_eq!(pending.num_callbacks(transmission_id), 1 + i, "Incorrect number of callbacks for transmission");
assert_eq!(pending.num_sent_requests(transmission_id), 1, "Incorrect number of sent requests for transmission");
}
for i in 1..num_test_requests {
let peer_ip = peer_ips.pop().unwrap();
let worker_ = worker.clone();
tokio::spawn(async move { worker_.get_or_fetch_transmission(peer_ip, transmission_id).await });
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue");
assert!(pending.contains_peer(transmission_id, peer_ip), "Missing a peer IP for transmission in the pending queue");
assert_eq!(pending.num_sent_requests(transmission_id), (1 + i).min(max_redundancy), "Incorrect number of sent requests for transmission");
}
}
#[tokio::test]
#[rustfmt::skip]
async fn test_flood_transmission_requests() {
let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1;
let mut rng = TestRng::default();
let (accounts, committee) = new_test_committee(num_nodes, &mut rng);
let ledger = {
let accounts = accounts.clone();
tokio::task::spawn_blocking(move || sample_ledger(&accounts, &committee, &mut rng)).await.unwrap()
};
let worker = sample_worker(0, accounts[0].clone(), ledger.clone());
let max_redundancy = max_redundant_requests(ledger.clone(), 0).unwrap();
assert_eq!(max_redundancy, 6, "Update me if the formula changes");
let mut peer_ips = (0..max_redundancy + 1).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect::<Vec<_>>();
let all_peer_ips = peer_ips.clone();
let initial_peer_ip = peer_ips.pop().unwrap();
let mut remaining_peer_ips = peer_ips;
let transmission_id = TransmissionID::Transaction(<CurrentNetwork as Network>::TransactionID::default(), <CurrentNetwork as Network>::TransmissionChecksum::default());
assert!(!worker.contains_transmission(transmission_id), "Transmission should not exist");
for peer_ip in remaining_peer_ips.clone() {
let worker_ = worker.clone();
tokio::spawn(async move { worker_.get_or_fetch_transmission(peer_ip, transmission_id).await });
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let pending = worker.pending();
assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue");
assert_eq!(pending.get_peers(transmission_id), Some(remaining_peer_ips.clone().into_iter().collect()), "Missing a peer IP for transmission in the pending queue");
assert_eq!(pending.num_callbacks(transmission_id), max_redundancy, "Incorrect number of callbacks for transmission");
assert_eq!(pending.num_sent_requests(transmission_id), max_redundancy, "Incorrect number of sent requests for transmission");
for i in 1..=6 {
let worker_ = worker.clone();
let peer_ip = initial_peer_ip;
tokio::spawn(async move { worker_.get_or_fetch_transmission(peer_ip, transmission_id).await });
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue");
assert!(pending.contains_peer(transmission_id, peer_ip), "Missing a peer IP for transmission in the pending queue");
assert_eq!(pending.get_peers(transmission_id), Some(all_peer_ips.clone().into_iter().collect()), "Missing a peer IP for transmission in the pending queue");
assert_eq!(pending.num_callbacks(transmission_id), max_redundancy + i, "Incorrect number of callbacks for transmission");
assert_eq!(pending.num_sent_requests(transmission_id), max_redundancy, "Incorrect number of sent requests for transmission");
}
for i in 1..=6 {
let worker_ = worker.clone();
let peer_ip = remaining_peer_ips.pop().unwrap();
tokio::spawn(async move { worker_.get_or_fetch_transmission(peer_ip, transmission_id).await });
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue");
assert!(pending.contains_peer(transmission_id, peer_ip), "Missing a peer IP for transmission in the pending queue");
assert_eq!(pending.get_peers(transmission_id), Some(all_peer_ips.clone().into_iter().collect()), "Missing a peer IP for transmission in the pending queue");
assert_eq!(pending.num_callbacks(transmission_id), max_redundancy + 6 + i, "Incorrect number of callbacks for transmission");
assert_eq!(pending.num_sent_requests(transmission_id), max_redundancy, "Incorrect number of sent requests for transmission");
}
}