lumina_node/
test_utils.rs

1//! Utilities for writing tests.
2
3use std::time::Duration;
4
5use celestia_proto::p2p::pb::{HeaderRequest, header_request::Data};
6use celestia_types::ExtendedHeader;
7use celestia_types::hash::Hash;
8use celestia_types::test_utils::ExtendedHeaderGenerator;
9use cid::Cid;
10use lumina_utils::time::timeout;
11use tokio::sync::{mpsc, oneshot, watch};
12
13use crate::{
14    NodeBuilder,
15    block_ranges::{BlockRange, BlockRanges},
16    blockstore::InMemoryBlockstore,
17    daser::DaserCmd,
18    network::Network,
19    p2p::{P2pCmd, P2pError},
20    peer_tracker::PeerTrackerInfo,
21    store::{InMemoryStore, VerifiedExtendedHeaders},
22    utils::OneshotResultSender,
23};
24
25/// Generate a store pre-filled with headers.
26pub async fn gen_filled_store(amount: u64) -> (InMemoryStore, ExtendedHeaderGenerator) {
27    let s = InMemoryStore::new();
28    let mut generator = ExtendedHeaderGenerator::new();
29
30    s.insert(generator.next_many_verified(amount))
31        .await
32        .expect("inserting test data failed");
33
34    (s, generator)
35}
36
37#[cfg(all(test, target_arch = "wasm32"))]
38pub(crate) async fn new_indexed_db_store_name() -> String {
39    use std::sync::atomic::{AtomicU32, Ordering};
40    static NEXT_ID: AtomicU32 = AtomicU32::new(0);
41
42    let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
43    let db_name = format!("indexeddb-lumina-node-store-test-{id}");
44
45    // DB can persist if test run within the browser
46    rexie::Rexie::delete(&db_name).await.unwrap();
47
48    db_name
49}
50
51/// Convenience function for creating a `BlockRange` out of a list of `N..=M` ranges
52pub fn new_block_ranges<const N: usize>(ranges: [BlockRange; N]) -> BlockRanges {
53    BlockRanges::from_vec(ranges.into_iter().collect()).expect("invalid BlockRanges")
54}
55
56/// [`NodeBuilder`] with default values for the usage in tests.
57pub fn test_node_builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
58    NodeBuilder::new().network(Network::custom("private").unwrap())
59}
60
61/// [`NodeBuilder`] with listen address and default values for the usage in tests.
62pub fn listening_test_node_builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
63    test_node_builder().listen(["/ip4/0.0.0.0/tcp/0".parse().unwrap()])
64}
65
66/// Extends test header generator for easier insertion into the store
67pub trait ExtendedHeaderGeneratorExt {
68    /// Generate next amount verified headers
69    fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders;
70}
71
72impl ExtendedHeaderGeneratorExt for ExtendedHeaderGenerator {
73    fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders {
74        unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many(amount)) }
75    }
76}
77
78/// A handle to the mocked [`P2p`] component.
79///
80/// [`P2p`]: crate::p2p::P2p
81pub struct MockP2pHandle {
82    #[allow(dead_code)]
83    pub(crate) cmd_tx: mpsc::Sender<P2pCmd>,
84    pub(crate) cmd_rx: mpsc::Receiver<P2pCmd>,
85    pub(crate) header_sub_tx: Option<mpsc::Sender<ExtendedHeader>>,
86    pub(crate) peer_tracker_tx: watch::Sender<PeerTrackerInfo>,
87}
88
89impl MockP2pHandle {
90    /// Simulate a new connected peer.
91    pub fn announce_peer_connected(&self) {
92        self.peer_tracker_tx.send_modify(|info| {
93            info.num_connected_peers += 1;
94        });
95    }
96
97    /// Simulate a new connected trusted peer.
98    pub fn announce_trusted_peer_connected(&self) {
99        self.peer_tracker_tx.send_modify(|info| {
100            info.num_connected_peers += 1;
101            info.num_connected_trusted_peers += 1;
102        });
103    }
104
105    /// Simulate a disconnect from all peers.
106    pub fn announce_all_peers_disconnected(&self) {
107        self.peer_tracker_tx.send_modify(|info| {
108            info.num_connected_peers = 0;
109            info.num_connected_trusted_peers = 0;
110        });
111    }
112
113    /// Simulate a new header announced in the network.
114    pub fn announce_new_head(&self, header: ExtendedHeader) {
115        if let Some(ref tx) = self.header_sub_tx {
116            let _ = tx.try_send(header);
117        }
118    }
119
120    /// Assert that a command was sent to the [`P2p`] worker.
121    ///
122    /// [`P2p`]: crate::p2p::P2p
123    pub(crate) async fn expect_cmd(&mut self) -> P2pCmd {
124        self.try_recv_cmd()
125            .await
126            .expect("Expecting P2pCmd, but timed-out")
127    }
128
129    /// Assert that no command was sent to the [`P2p`] worker.
130    ///
131    /// [`P2p`]: crate::p2p::P2p
132    pub async fn expect_no_cmd(&mut self) {
133        if let Some(cmd) = self.try_recv_cmd().await {
134            panic!("Expecting no P2pCmd, but received: {cmd:?}");
135        }
136    }
137
138    pub(crate) async fn try_recv_cmd(&mut self) -> Option<P2pCmd> {
139        timeout(Duration::from_millis(300), async move {
140            self.cmd_rx.recv().await.expect("P2p dropped")
141        })
142        .await
143        .ok()
144    }
145
146    /// Assert that a header request was sent to the [`P2p`] worker and obtain a response channel.
147    ///
148    /// [`P2p`]: crate::p2p::P2p
149    pub async fn expect_header_request_cmd(
150        &mut self,
151    ) -> (
152        HeaderRequest,
153        OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
154    ) {
155        match self.expect_cmd().await {
156            P2pCmd::HeaderExRequest {
157                request,
158                respond_to,
159            } => (request, respond_to),
160            cmd => panic!("Expecting HeaderExRequest, but received: {cmd:?}"),
161        }
162    }
163
164    /// Assert that a header request for height was sent to the [`P2p`] worker and obtain a response channel.
165    ///
166    /// [`P2p`]: crate::p2p::P2p
167    pub async fn expect_header_request_for_height_cmd(
168        &mut self,
169    ) -> (u64, u64, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
170        let (req, respond_to) = self.expect_header_request_cmd().await;
171
172        match req.data {
173            Some(Data::Origin(height)) if req.amount > 0 => (height, req.amount, respond_to),
174            _ => panic!("Expecting HeaderExRequest for height, but received: {req:?}"),
175        }
176    }
177
178    /// Assert that a header request for hash was sent to the [`P2p`] worker and obtain a response channel.
179    ///
180    /// [`P2p`]: crate::p2p::P2p
181    pub async fn expect_header_request_for_hash_cmd(
182        &mut self,
183    ) -> (Hash, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
184        let (req, respond_to) = self.expect_header_request_cmd().await;
185
186        match req.data {
187            Some(Data::Hash(bytes)) if req.amount == 1 => {
188                let array = bytes.try_into().expect("Invalid hash");
189                let hash = Hash::Sha256(array);
190                (hash, respond_to)
191            }
192            _ => panic!("Expecting HeaderExRequest for hash, but received: {req:?}"),
193        }
194    }
195
196    /// Assert that a header-sub initialization command was sent to the [`P2p`] worker.
197    ///
198    /// [`P2p`]: crate::p2p::P2p
199    pub async fn expect_init_header_sub(&mut self) -> ExtendedHeader {
200        match self.expect_cmd().await {
201            P2pCmd::InitHeaderSub { head, channel } => {
202                self.header_sub_tx = Some(channel);
203                *head
204            }
205            cmd => panic!("Expecting InitHeaderSub, but received: {cmd:?}"),
206        }
207    }
208
209    /// Assert that a CID request was sent to the [`P2p`] worker and obtain a response channel.
210    ///
211    /// [`P2p`]: crate::p2p::P2p
212    pub async fn expect_get_shwap_cid(&mut self) -> (Cid, OneshotResultSender<Vec<u8>, P2pError>) {
213        match self.expect_cmd().await {
214            P2pCmd::GetShwapCid { cid, respond_to } => (cid, respond_to),
215            cmd => panic!("Expecting GetShwapCid, but received: {cmd:?}"),
216        }
217    }
218}
219
220/// Mock handle for `Daser`.
221pub struct MockDaserHandle {
222    pub(crate) cmd_rx: mpsc::Receiver<DaserCmd>,
223}
224
225impl MockDaserHandle {
226    pub(crate) async fn try_recv_cmd(&mut self) -> Option<DaserCmd> {
227        timeout(Duration::from_millis(300), async move {
228            self.cmd_rx.recv().await.expect("Daser dropped")
229        })
230        .await
231        .ok()
232    }
233
234    pub(crate) async fn expect_cmd(&mut self) -> DaserCmd {
235        self.try_recv_cmd()
236            .await
237            .expect("Expecting DaserCmd, but timed-out")
238    }
239
240    /// Assert that no command was sent to the `Daser` worker.
241    pub async fn expect_no_cmd(&mut self) {
242        if let Some(cmd) = self.try_recv_cmd().await {
243            panic!("Expecting no DaserCmd, but received: {cmd:?}");
244        }
245    }
246
247    /// Assert that `DaserCmd::WantToPrune` was sent to `Daser` and obtain a response channel.
248    pub async fn expect_want_to_prune(&mut self) -> (u64, oneshot::Sender<bool>) {
249        match self.expect_cmd().await {
250            DaserCmd::WantToPrune { height, respond_to } => (height, respond_to),
251            cmd => panic!("Expecting WantToPrune, but received: {cmd:?}"),
252        }
253    }
254
255    /// Assert that `DaserCmd::UpdateHighestPrunableHeight` was sent to `Daser`.
256    pub async fn expect_update_highest_prunable_block(&mut self) -> u64 {
257        match self.expect_cmd().await {
258            DaserCmd::UpdateHighestPrunableHeight { value } => value,
259            cmd => panic!("Expecting UpdateHighestPrunableHeight, but received: {cmd:?}"),
260        }
261    }
262
263    /// Assert that `DaserCmd::UpdateNumberOfPrunableBlocks` was sent to `Daser`.
264    pub async fn expect_update_number_of_prunable_blocks(&mut self) -> u64 {
265        match self.expect_cmd().await {
266            DaserCmd::UpdateNumberOfPrunableBlocks { value } => value,
267            cmd => panic!("Expecting UpdateNumberOfPrunableBlocks, but received: {cmd:?}"),
268        }
269    }
270}