#![cfg(not(target_arch = "wasm32"))]
use std::time::Duration;
use celestia_types::consts::HASH_SIZE;
use celestia_types::fraud_proof::BadEncodingFraudProof;
use celestia_types::hash::Hash;
use celestia_types::test_utils::{ExtendedHeaderGenerator, corrupt_eds, generate_dummy_eds};
use futures::StreamExt;
use libp2p::swarm::NetworkBehaviour;
use libp2p::{Multiaddr, SwarmBuilder, gossipsub, noise, ping, tcp, yamux};
use lumina_node::store::{InMemoryStore, Store};
use lumina_node::test_utils::{
ExtendedHeaderGeneratorExt, gen_filled_store, listening_test_node_builder, test_node_builder,
};
use rand::Rng;
use tendermint_proto::Protobuf;
use tokio::{select, spawn, sync::mpsc, time::sleep};
use crate::utils::{fetch_bridge_info, new_connected_node};
mod utils;
#[tokio::test]
async fn connects_to_the_go_bridge_node() {
let (node, _) = new_connected_node().await;
let info = node.network_info().await.unwrap();
assert!(info.num_peers() >= 1);
}
#[tokio::test]
async fn header_store_access() {
let (store, _) = gen_filled_store(100).await;
let node = test_node_builder().store(store).start().await.unwrap();
let head = node.get_local_head_header().await.unwrap();
let expected_head = node.get_header_by_height(100).await.unwrap();
assert_eq!(head, expected_head);
for height in 1..100 {
let header_by_height = node.get_header_by_height(height).await.unwrap();
let header_by_hash = node
.get_header_by_hash(&header_by_height.hash())
.await
.unwrap();
assert_eq!(header_by_height, header_by_hash);
let start = height + 1;
let amount = rand::thread_rng().gen_range(1..50);
let res = node.get_headers(start..start + amount).await;
if height + amount > 100 {
res.unwrap_err();
} else {
assert!(
res.unwrap()
.into_iter()
.zip(start..start + amount)
.all(|(header, height)| header.height() == height)
);
}
}
for _ in 0..100 {
let height = rand::thread_rng().gen_range(100..u64::MAX);
node.get_header_by_height(height).await.unwrap_err();
let mut hash = [0u8; HASH_SIZE];
rand::thread_rng().fill(&mut hash);
node.get_header_by_hash(&Hash::Sha256(hash))
.await
.unwrap_err();
}
}
#[tokio::test]
async fn peer_discovery() {
let (bridge_peer_id, bridge_ma) = fetch_bridge_info().await;
let node1 = listening_test_node_builder()
.bootnodes([bridge_ma])
.start()
.await
.unwrap();
node1.wait_connected().await.unwrap();
let node1_addrs = node1.listeners().await.unwrap();
let node2 = listening_test_node_builder()
.bootnodes(node1_addrs.clone())
.start()
.await
.unwrap();
node2.wait_connected().await.unwrap();
let node3 = listening_test_node_builder()
.bootnodes(node1_addrs)
.start()
.await
.unwrap();
node3.wait_connected().await.unwrap();
sleep(Duration::from_millis(2000)).await;
let node1_peer_id = node1.local_peer_id();
let node2_peer_id = node2.local_peer_id();
let node3_peer_id = node3.local_peer_id();
let connected_peers = node1.connected_peers().await.unwrap();
let tracker_info = node1.peer_tracker_info();
assert!(connected_peers.contains(&bridge_peer_id));
assert!(connected_peers.contains(node2_peer_id));
assert!(connected_peers.contains(node3_peer_id));
assert!(tracker_info.num_connected_peers >= 3);
assert_eq!(tracker_info.num_connected_trusted_peers, 1);
let connected_peers = node2.connected_peers().await.unwrap();
let tracker_info = node2.peer_tracker_info();
assert!(connected_peers.contains(&bridge_peer_id));
assert!(connected_peers.contains(node1_peer_id));
assert!(connected_peers.contains(node3_peer_id));
assert!(tracker_info.num_connected_peers >= 3);
assert_eq!(tracker_info.num_connected_trusted_peers, 1);
let connected_peers = node3.connected_peers().await.unwrap();
let tracker_info = node2.peer_tracker_info();
assert!(connected_peers.contains(&bridge_peer_id));
assert!(connected_peers.contains(node1_peer_id));
assert!(connected_peers.contains(node2_peer_id));
assert!(tracker_info.num_connected_peers >= 3);
assert_eq!(tracker_info.num_connected_trusted_peers, 1);
}
#[tokio::test]
async fn stops_services_when_network_is_compromised() {
let mut generator = ExtendedHeaderGenerator::new();
let store = InMemoryStore::new();
store
.insert(generator.next_many_verified(64))
.await
.unwrap();
let mut eds = generate_dummy_eds(8);
let (header, befp) = corrupt_eds(&mut generator, &mut eds);
store.insert(header).await.unwrap();
let node = listening_test_node_builder()
.store(store)
.start()
.await
.unwrap();
sleep(Duration::from_millis(300)).await;
let listener_addr = node.listeners().await.unwrap()[0].clone();
let befp_announce_tx = spawn_befp_announcer(listener_addr);
sleep(Duration::from_millis(300)).await;
assert!(node.syncer_info().await.is_ok());
befp_announce_tx.send(befp).await.unwrap();
sleep(Duration::from_millis(300)).await;
assert!(node.syncer_info().await.is_err());
}
fn spawn_befp_announcer(connect_to: Multiaddr) -> mpsc::Sender<BadEncodingFraudProof> {
#[derive(NetworkBehaviour)]
struct Behaviour {
ping: ping::Behaviour,
gossipsub: gossipsub::Behaviour,
}
let mut announcer = SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)
.unwrap()
.with_behaviour(|key| {
let ping = ping::Behaviour::new(ping::Config::default());
let config = gossipsub::ConfigBuilder::default().build().unwrap();
let message_authenticity = gossipsub::MessageAuthenticity::Signed(key.clone());
let gossipsub: gossipsub::Behaviour =
gossipsub::Behaviour::new(message_authenticity, config).unwrap();
Ok(Behaviour { ping, gossipsub })
})
.unwrap()
.build();
announcer.dial(connect_to).unwrap();
let topic = gossipsub::IdentTopic::new("/badencoding/fraud-sub/private/v0.0.1");
announcer
.behaviour_mut()
.gossipsub
.subscribe(&topic)
.unwrap();
let (tx, mut rx) = mpsc::channel::<BadEncodingFraudProof>(8);
spawn(async move {
loop {
select! {
_ = announcer.select_next_some() => (),
Some(proof) = rx.recv() => {
let proof = proof.encode_vec();
announcer.behaviour_mut().gossipsub.publish(topic.hash(), proof).unwrap();
}
}
}
});
tx
}