lumina_node/
test_utils.rs

1//! Utilities for writing tests.
2
3use std::time::Duration;
4
5use celestia_proto::p2p::pb::{header_request::Data, HeaderRequest};
6use celestia_types::hash::Hash;
7use celestia_types::test_utils::ExtendedHeaderGenerator;
8use celestia_types::ExtendedHeader;
9use cid::Cid;
10use lumina_utils::time::timeout;
11use tokio::sync::{mpsc, watch};
12
13use crate::{
14    block_ranges::{BlockRange, BlockRanges},
15    blockstore::InMemoryBlockstore,
16    network::Network,
17    p2p::{P2pCmd, P2pError},
18    peer_tracker::PeerTrackerInfo,
19    store::{InMemoryStore, VerifiedExtendedHeaders},
20    utils::OneshotResultSender,
21    NodeBuilder,
22};
23
24/// Generate a store pre-filled with headers.
25pub async fn gen_filled_store(amount: u64) -> (InMemoryStore, ExtendedHeaderGenerator) {
26    let s = InMemoryStore::new();
27    let mut gen = ExtendedHeaderGenerator::new();
28
29    s.insert(gen.next_many_verified(amount))
30        .await
31        .expect("inserting test data failed");
32
33    (s, gen)
34}
35
36/// Convenience function for creating a `BlockRange` out of a list of `N..=M` ranges
37pub fn new_block_ranges<const N: usize>(ranges: [BlockRange; N]) -> BlockRanges {
38    BlockRanges::from_vec(ranges.into_iter().collect()).expect("invalid BlockRanges")
39}
40
41/// [`NodeBuilder`] with default values for the usage in tests.
42pub fn test_node_builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
43    NodeBuilder::new().network(Network::custom("private").unwrap())
44}
45
46/// [`NodeBuilder`] with listen address and default values for the usage in tests.
47pub fn listening_test_node_builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
48    test_node_builder().listen(["/ip4/0.0.0.0/tcp/0".parse().unwrap()])
49}
50
51/// Extends test header generator for easier insertion into the store
52pub trait ExtendedHeaderGeneratorExt {
53    /// Generate next amount verified headers
54    fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders;
55}
56
57impl ExtendedHeaderGeneratorExt for ExtendedHeaderGenerator {
58    fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders {
59        unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many(amount)) }
60    }
61}
62
63/// A handle to the mocked [`P2p`] component.
64///
65/// [`P2p`]: crate::p2p::P2p
66pub struct MockP2pHandle {
67    #[allow(dead_code)]
68    pub(crate) cmd_tx: mpsc::Sender<P2pCmd>,
69    pub(crate) cmd_rx: mpsc::Receiver<P2pCmd>,
70    pub(crate) header_sub_tx: Option<mpsc::Sender<ExtendedHeader>>,
71    pub(crate) peer_tracker_tx: watch::Sender<PeerTrackerInfo>,
72}
73
74impl MockP2pHandle {
75    /// Simulate a new connected peer.
76    pub fn announce_peer_connected(&self) {
77        self.peer_tracker_tx.send_modify(|info| {
78            info.num_connected_peers += 1;
79        });
80    }
81
82    /// Simulate a new connected trusted peer.
83    pub fn announce_trusted_peer_connected(&self) {
84        self.peer_tracker_tx.send_modify(|info| {
85            info.num_connected_peers += 1;
86            info.num_connected_trusted_peers += 1;
87        });
88    }
89
90    /// Simulate a disconnect from all peers.
91    pub fn announce_all_peers_disconnected(&self) {
92        self.peer_tracker_tx.send_modify(|info| {
93            info.num_connected_peers = 0;
94            info.num_connected_trusted_peers = 0;
95        });
96    }
97
98    /// Simulate a new header announced in the network.
99    pub fn announce_new_head(&self, header: ExtendedHeader) {
100        if let Some(ref tx) = self.header_sub_tx {
101            let _ = tx.try_send(header);
102        }
103    }
104
105    /// Assert that a command was sent to the [`P2p`] worker.
106    ///
107    /// [`P2p`]: crate::p2p::P2p
108    pub(crate) async fn expect_cmd(&mut self) -> P2pCmd {
109        self.try_recv_cmd()
110            .await
111            .expect("Expecting P2pCmd, but timed-out")
112    }
113
114    /// Assert that no command was sent to the [`P2p`] worker.
115    ///
116    /// [`P2p`]: crate::p2p::P2p
117    pub async fn expect_no_cmd(&mut self) {
118        if let Some(cmd) = self.try_recv_cmd().await {
119            panic!("Expecting no P2pCmd, but received: {cmd:?}");
120        }
121    }
122
123    pub(crate) async fn try_recv_cmd(&mut self) -> Option<P2pCmd> {
124        timeout(Duration::from_millis(300), async move {
125            self.cmd_rx.recv().await.expect("P2p dropped")
126        })
127        .await
128        .ok()
129    }
130
131    /// Assert that a header request was sent to the [`P2p`] worker and obtain a response channel.
132    ///
133    /// [`P2p`]: crate::p2p::P2p
134    pub async fn expect_header_request_cmd(
135        &mut self,
136    ) -> (
137        HeaderRequest,
138        OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
139    ) {
140        match self.expect_cmd().await {
141            P2pCmd::HeaderExRequest {
142                request,
143                respond_to,
144            } => (request, respond_to),
145            cmd => panic!("Expecting HeaderExRequest, but received: {cmd:?}"),
146        }
147    }
148
149    /// Assert that a header request for height was sent to the [`P2p`] worker and obtain a response channel.
150    ///
151    /// [`P2p`]: crate::p2p::P2p
152    pub async fn expect_header_request_for_height_cmd(
153        &mut self,
154    ) -> (u64, u64, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
155        let (req, respond_to) = self.expect_header_request_cmd().await;
156
157        match req.data {
158            Some(Data::Origin(height)) if req.amount > 0 => (height, req.amount, respond_to),
159            _ => panic!("Expecting HeaderExRequest for height, but received: {req:?}"),
160        }
161    }
162
163    /// Assert that a header request for hash was sent to the [`P2p`] worker and obtain a response channel.
164    ///
165    /// [`P2p`]: crate::p2p::P2p
166    pub async fn expect_header_request_for_hash_cmd(
167        &mut self,
168    ) -> (Hash, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
169        let (req, respond_to) = self.expect_header_request_cmd().await;
170
171        match req.data {
172            Some(Data::Hash(bytes)) if req.amount == 1 => {
173                let array = bytes.try_into().expect("Invalid hash");
174                let hash = Hash::Sha256(array);
175                (hash, respond_to)
176            }
177            _ => panic!("Expecting HeaderExRequest for hash, but received: {req:?}"),
178        }
179    }
180
181    /// Assert that a header-sub initialization command was sent to the [`P2p`] worker.
182    ///
183    /// [`P2p`]: crate::p2p::P2p
184    pub async fn expect_init_header_sub(&mut self) -> ExtendedHeader {
185        match self.expect_cmd().await {
186            P2pCmd::InitHeaderSub { head, channel } => {
187                self.header_sub_tx = Some(channel);
188                *head
189            }
190            cmd => panic!("Expecting InitHeaderSub, but received: {cmd:?}"),
191        }
192    }
193
194    /// Assert that a CID request was sent to the [`P2p`] worker and obtain a response channel.
195    ///
196    /// [`P2p`]: crate::p2p::P2p
197    pub async fn expect_get_shwap_cid(&mut self) -> (Cid, OneshotResultSender<Vec<u8>, P2pError>) {
198        match self.expect_cmd().await {
199            P2pCmd::GetShwapCid { cid, respond_to } => (cid, respond_to),
200            cmd => panic!("Expecting GetShwapCid, but received: {cmd:?}"),
201        }
202    }
203}