use std::time::Duration;
use celestia_proto::p2p::pb::{header_request::Data, HeaderRequest};
use celestia_types::hash::Hash;
use celestia_types::test_utils::ExtendedHeaderGenerator;
use celestia_types::ExtendedHeader;
use cid::Cid;
use libp2p::identity::{self, Keypair};
use tokio::sync::{mpsc, watch};
use crate::{
block_ranges::{BlockRange, BlockRanges},
blockstore::InMemoryBlockstore,
executor::timeout,
node::NodeConfig,
p2p::{P2pCmd, P2pError},
peer_tracker::PeerTrackerInfo,
store::{InMemoryStore, VerifiedExtendedHeaders},
utils::OneshotResultSender,
};
#[cfg(test)]
#[cfg(not(target_arch = "wasm32"))]
pub(crate) use tokio::test as async_test;
#[cfg(test)]
#[cfg(target_arch = "wasm32")]
pub(crate) use wasm_bindgen_test::wasm_bindgen_test as async_test;
pub async fn gen_filled_store(amount: u64) -> (InMemoryStore, ExtendedHeaderGenerator) {
let s = InMemoryStore::new();
let mut gen = ExtendedHeaderGenerator::new();
s.insert(gen.next_many_verified(amount))
.await
.expect("inserting test data failed");
(s, gen)
}
pub fn new_block_ranges<const N: usize>(ranges: [BlockRange; N]) -> BlockRanges {
BlockRanges::from_vec(ranges.into_iter().collect()).expect("invalid BlockRanges")
}
pub fn test_node_config() -> NodeConfig<InMemoryBlockstore, InMemoryStore> {
let node_keypair = identity::Keypair::generate_ed25519();
NodeConfig {
network_id: "private".to_string(),
p2p_local_keypair: node_keypair,
p2p_bootnodes: vec![],
p2p_listen_on: vec![],
sync_batch_size: 512,
blockstore: InMemoryBlockstore::new(),
store: InMemoryStore::new(),
}
}
pub fn listening_test_node_config() -> NodeConfig<InMemoryBlockstore, InMemoryStore> {
NodeConfig {
p2p_listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
..test_node_config()
}
}
pub fn test_node_config_with_keypair(
keypair: Keypair,
) -> NodeConfig<InMemoryBlockstore, InMemoryStore> {
NodeConfig {
p2p_local_keypair: keypair,
..test_node_config()
}
}
pub trait ExtendedHeaderGeneratorExt {
fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders;
}
impl ExtendedHeaderGeneratorExt for ExtendedHeaderGenerator {
fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders {
unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many(amount)) }
}
}
pub struct MockP2pHandle {
#[allow(dead_code)]
pub(crate) cmd_tx: mpsc::Sender<P2pCmd>,
pub(crate) cmd_rx: mpsc::Receiver<P2pCmd>,
pub(crate) header_sub_tx: Option<mpsc::Sender<ExtendedHeader>>,
pub(crate) peer_tracker_tx: watch::Sender<PeerTrackerInfo>,
}
impl MockP2pHandle {
pub fn announce_peer_connected(&self) {
self.peer_tracker_tx.send_modify(|info| {
info.num_connected_peers += 1;
});
}
pub fn announce_trusted_peer_connected(&self) {
self.peer_tracker_tx.send_modify(|info| {
info.num_connected_peers += 1;
info.num_connected_trusted_peers += 1;
});
}
pub fn announce_all_peers_disconnected(&self) {
self.peer_tracker_tx.send_modify(|info| {
info.num_connected_peers = 0;
info.num_connected_trusted_peers = 0;
});
}
pub fn announce_new_head(&self, header: ExtendedHeader) {
if let Some(ref tx) = self.header_sub_tx {
let _ = tx.try_send(header);
}
}
pub(crate) async fn expect_cmd(&mut self) -> P2pCmd {
self.try_recv_cmd()
.await
.expect("Expecting P2pCmd, but timed-out")
}
pub async fn expect_no_cmd(&mut self) {
if let Some(cmd) = self.try_recv_cmd().await {
panic!("Expecting no P2pCmd, but received: {cmd:?}");
}
}
pub(crate) async fn try_recv_cmd(&mut self) -> Option<P2pCmd> {
timeout(Duration::from_millis(300), async move {
self.cmd_rx.recv().await.expect("P2p dropped")
})
.await
.ok()
}
pub async fn expect_header_request_cmd(
&mut self,
) -> (
HeaderRequest,
OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
) {
match self.expect_cmd().await {
P2pCmd::HeaderExRequest {
request,
respond_to,
} => (request, respond_to),
cmd => panic!("Expecting HeaderExRequest, but received: {cmd:?}"),
}
}
pub async fn expect_header_request_for_height_cmd(
&mut self,
) -> (u64, u64, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
let (req, respond_to) = self.expect_header_request_cmd().await;
match req.data {
Some(Data::Origin(height)) if req.amount > 0 => (height, req.amount, respond_to),
_ => panic!("Expecting HeaderExRequest for height, but received: {req:?}"),
}
}
pub async fn expect_header_request_for_hash_cmd(
&mut self,
) -> (Hash, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
let (req, respond_to) = self.expect_header_request_cmd().await;
match req.data {
Some(Data::Hash(bytes)) if req.amount == 1 => {
let array = bytes.try_into().expect("Invalid hash");
let hash = Hash::Sha256(array);
(hash, respond_to)
}
_ => panic!("Expecting HeaderExRequest for hash, but received: {req:?}"),
}
}
pub async fn expect_init_header_sub(&mut self) -> ExtendedHeader {
match self.expect_cmd().await {
P2pCmd::InitHeaderSub { head, channel } => {
self.header_sub_tx = Some(channel);
*head
}
cmd => panic!("Expecting InitHeaderSub, but received: {cmd:?}"),
}
}
pub async fn expect_get_shwap_cid(&mut self) -> (Cid, OneshotResultSender<Vec<u8>, P2pError>) {
match self.expect_cmd().await {
P2pCmd::GetShwapCid { cid, respond_to } => (cid, respond_to),
cmd => panic!("Expecting GetShwapCid, but received: {cmd:?}"),
}
}
}