use std::time::Duration;
use celestia_proto::p2p::pb::{HeaderRequest, header_request::Data};
use celestia_types::ExtendedHeader;
use celestia_types::hash::Hash;
use celestia_types::test_utils::ExtendedHeaderGenerator;
use cid::Cid;
use lumina_utils::time::timeout;
use tokio::sync::{mpsc, oneshot, watch};
use crate::{
NodeBuilder,
block_ranges::{BlockRange, BlockRanges},
blockstore::InMemoryBlockstore,
daser::DaserCmd,
network::Network,
p2p::{P2pCmd, P2pError},
peer_tracker::PeerTrackerInfo,
store::{InMemoryStore, VerifiedExtendedHeaders},
utils::OneshotResultSender,
};
pub async fn gen_filled_store(amount: u64) -> (InMemoryStore, ExtendedHeaderGenerator) {
let s = InMemoryStore::new();
let mut generator = ExtendedHeaderGenerator::new();
s.insert(generator.next_many_verified(amount))
.await
.expect("inserting test data failed");
(s, generator)
}
#[cfg(all(test, target_arch = "wasm32"))]
pub(crate) async fn new_indexed_db_store_name() -> String {
use std::sync::atomic::{AtomicU32, Ordering};
static NEXT_ID: AtomicU32 = AtomicU32::new(0);
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
let db_name = format!("indexeddb-lumina-node-store-test-{id}");
rexie::Rexie::delete(&db_name).await.unwrap();
db_name
}
pub fn new_block_ranges<const N: usize>(ranges: [BlockRange; N]) -> BlockRanges {
BlockRanges::from_vec(ranges.into_iter().collect()).expect("invalid BlockRanges")
}
pub fn test_node_builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
NodeBuilder::new().network(Network::custom("private").unwrap())
}
pub fn listening_test_node_builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
test_node_builder().listen(["/ip4/0.0.0.0/tcp/0".parse().unwrap()])
}
pub trait ExtendedHeaderGeneratorExt {
fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders;
fn next_many_empty_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders;
}
impl ExtendedHeaderGeneratorExt for ExtendedHeaderGenerator {
fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders {
unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many(amount)) }
}
fn next_many_empty_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders {
unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many_empty(amount)) }
}
}
pub struct MockP2pHandle {
#[allow(dead_code)]
pub(crate) cmd_tx: mpsc::Sender<P2pCmd>,
pub(crate) cmd_rx: mpsc::Receiver<P2pCmd>,
pub(crate) header_sub_tx: Option<mpsc::Sender<ExtendedHeader>>,
pub(crate) peer_tracker_tx: watch::Sender<PeerTrackerInfo>,
}
impl MockP2pHandle {
pub fn announce_peer_connected(&self) {
self.peer_tracker_tx.send_modify(|info| {
info.num_connected_peers += 1;
});
}
pub fn announce_trusted_peer_connected(&self) {
self.peer_tracker_tx.send_modify(|info| {
info.num_connected_peers += 1;
info.num_connected_trusted_peers += 1;
});
}
pub fn announce_all_peers_disconnected(&self) {
self.peer_tracker_tx.send_modify(|info| {
info.num_connected_peers = 0;
info.num_connected_trusted_peers = 0;
});
}
pub fn announce_new_head(&self, header: ExtendedHeader) {
if let Some(ref tx) = self.header_sub_tx {
let _ = tx.try_send(header);
}
}
pub(crate) async fn expect_cmd(&mut self) -> P2pCmd {
self.try_recv_cmd()
.await
.expect("Expecting P2pCmd, but timed-out")
}
pub async fn expect_no_cmd(&mut self) {
if let Some(cmd) = self.try_recv_cmd().await {
panic!("Expecting no P2pCmd, but received: {cmd:?}");
}
}
pub(crate) async fn try_recv_cmd(&mut self) -> Option<P2pCmd> {
timeout(Duration::from_millis(300), async move {
self.cmd_rx.recv().await.expect("P2p dropped")
})
.await
.ok()
}
pub async fn expect_header_request_cmd(
&mut self,
) -> (
HeaderRequest,
OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
) {
match self.expect_cmd().await {
P2pCmd::HeaderExRequest {
request,
respond_to,
} => (request, respond_to),
cmd => panic!("Expecting HeaderExRequest, but received: {cmd:?}"),
}
}
pub async fn expect_header_request_for_height_cmd(
&mut self,
) -> (u64, u64, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
let (req, respond_to) = self.expect_header_request_cmd().await;
match req.data {
Some(Data::Origin(height)) if req.amount > 0 => (height, req.amount, respond_to),
_ => panic!("Expecting HeaderExRequest for height, but received: {req:?}"),
}
}
pub async fn expect_header_request_for_hash_cmd(
&mut self,
) -> (Hash, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
let (req, respond_to) = self.expect_header_request_cmd().await;
match req.data {
Some(Data::Hash(bytes)) if req.amount == 1 => {
let array = bytes.try_into().expect("Invalid hash");
let hash = Hash::Sha256(array);
(hash, respond_to)
}
_ => panic!("Expecting HeaderExRequest for hash, but received: {req:?}"),
}
}
pub async fn expect_init_header_sub(&mut self) -> ExtendedHeader {
match self.expect_cmd().await {
P2pCmd::InitHeaderSub { head, channel } => {
self.header_sub_tx = Some(channel);
*head
}
cmd => panic!("Expecting InitHeaderSub, but received: {cmd:?}"),
}
}
pub async fn expect_get_shwap_cid(&mut self) -> (Cid, OneshotResultSender<Vec<u8>, P2pError>) {
match self.expect_cmd().await {
P2pCmd::GetShwapCid { cid, respond_to } => (cid, respond_to),
cmd => panic!("Expecting GetShwapCid, but received: {cmd:?}"),
}
}
}
pub struct MockDaserHandle {
pub(crate) cmd_rx: mpsc::Receiver<DaserCmd>,
}
impl MockDaserHandle {
pub(crate) async fn try_recv_cmd(&mut self) -> Option<DaserCmd> {
timeout(Duration::from_millis(300), async move {
self.cmd_rx.recv().await.expect("Daser dropped")
})
.await
.ok()
}
pub(crate) async fn expect_cmd(&mut self) -> DaserCmd {
self.try_recv_cmd()
.await
.expect("Expecting DaserCmd, but timed-out")
}
pub async fn expect_no_cmd(&mut self) {
if let Some(cmd) = self.try_recv_cmd().await {
panic!("Expecting no DaserCmd, but received: {cmd:?}");
}
}
pub async fn expect_want_to_prune(&mut self) -> (u64, oneshot::Sender<bool>) {
match self.expect_cmd().await {
DaserCmd::WantToPrune { height, respond_to } => (height, respond_to),
cmd => panic!("Expecting WantToPrune, but received: {cmd:?}"),
}
}
pub async fn expect_update_highest_prunable_block(&mut self) -> u64 {
match self.expect_cmd().await {
DaserCmd::UpdateHighestPrunableHeight { value } => value,
cmd => panic!("Expecting UpdateHighestPrunableHeight, but received: {cmd:?}"),
}
}
pub async fn expect_update_number_of_prunable_blocks(&mut self) -> u64 {
match self.expect_cmd().await {
DaserCmd::UpdateNumberOfPrunableBlocks { value } => value,
cmd => panic!("Expecting UpdateNumberOfPrunableBlocks, but received: {cmd:?}"),
}
}
}