lumina_node/
test_utils.rs

1//! Utilities for writing tests.
2
3use std::time::Duration;
4
5use celestia_proto::p2p::pb::{HeaderRequest, header_request::Data};
6use celestia_types::ExtendedHeader;
7use celestia_types::hash::Hash;
8use celestia_types::test_utils::ExtendedHeaderGenerator;
9use cid::Cid;
10use lumina_utils::time::timeout;
11use tokio::sync::{mpsc, oneshot, watch};
12
13use crate::{
14    NodeBuilder,
15    block_ranges::{BlockRange, BlockRanges},
16    blockstore::InMemoryBlockstore,
17    daser::DaserCmd,
18    network::Network,
19    p2p::{P2pCmd, P2pError},
20    peer_tracker::PeerTrackerInfo,
21    store::{InMemoryStore, VerifiedExtendedHeaders},
22    utils::OneshotResultSender,
23};
24
25/// Generate a store pre-filled with headers.
26pub async fn gen_filled_store(amount: u64) -> (InMemoryStore, ExtendedHeaderGenerator) {
27    let s = InMemoryStore::new();
28    let mut generator = ExtendedHeaderGenerator::new();
29
30    s.insert(generator.next_many_verified(amount))
31        .await
32        .expect("inserting test data failed");
33
34    (s, generator)
35}
36
37#[cfg(all(test, target_arch = "wasm32"))]
38pub(crate) async fn new_indexed_db_store_name() -> String {
39    use std::sync::atomic::{AtomicU32, Ordering};
40    static NEXT_ID: AtomicU32 = AtomicU32::new(0);
41
42    let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
43    let db_name = format!("indexeddb-lumina-node-store-test-{id}");
44
45    // DB can persist if test run within the browser
46    rexie::Rexie::delete(&db_name).await.unwrap();
47
48    db_name
49}
50
51/// Convenience function for creating a `BlockRange` out of a list of `N..=M` ranges
52pub fn new_block_ranges<const N: usize>(ranges: [BlockRange; N]) -> BlockRanges {
53    BlockRanges::from_vec(ranges.into_iter().collect()).expect("invalid BlockRanges")
54}
55
56/// [`NodeBuilder`] with default values for the usage in tests.
57pub fn test_node_builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
58    NodeBuilder::new().network(Network::custom("private").unwrap())
59}
60
61/// [`NodeBuilder`] with listen address and default values for the usage in tests.
62pub fn listening_test_node_builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
63    test_node_builder().listen(["/ip4/0.0.0.0/tcp/0".parse().unwrap()])
64}
65
66/// Extends test header generator for easier insertion into the store
67pub trait ExtendedHeaderGeneratorExt {
68    /// Generate next amount verified headers
69    fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders;
70    /// Generate next amount empty verified headers
71    fn next_many_empty_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders;
72}
73
74impl ExtendedHeaderGeneratorExt for ExtendedHeaderGenerator {
75    fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders {
76        unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many(amount)) }
77    }
78    fn next_many_empty_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders {
79        unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many_empty(amount)) }
80    }
81}
82
83/// A handle to the mocked [`P2p`] component.
84///
85/// [`P2p`]: crate::p2p::P2p
86pub struct MockP2pHandle {
87    #[allow(dead_code)]
88    pub(crate) cmd_tx: mpsc::Sender<P2pCmd>,
89    pub(crate) cmd_rx: mpsc::Receiver<P2pCmd>,
90    pub(crate) header_sub_tx: Option<mpsc::Sender<ExtendedHeader>>,
91    pub(crate) peer_tracker_tx: watch::Sender<PeerTrackerInfo>,
92}
93
94impl MockP2pHandle {
95    /// Simulate a new connected peer.
96    pub fn announce_peer_connected(&self) {
97        self.peer_tracker_tx.send_modify(|info| {
98            info.num_connected_peers += 1;
99        });
100    }
101
102    /// Simulate a new connected trusted peer.
103    pub fn announce_trusted_peer_connected(&self) {
104        self.peer_tracker_tx.send_modify(|info| {
105            info.num_connected_peers += 1;
106            info.num_connected_trusted_peers += 1;
107        });
108    }
109
110    /// Simulate a disconnect from all peers.
111    pub fn announce_all_peers_disconnected(&self) {
112        self.peer_tracker_tx.send_modify(|info| {
113            info.num_connected_peers = 0;
114            info.num_connected_trusted_peers = 0;
115        });
116    }
117
118    /// Simulate a new header announced in the network.
119    pub fn announce_new_head(&self, header: ExtendedHeader) {
120        if let Some(ref tx) = self.header_sub_tx {
121            let _ = tx.try_send(header);
122        }
123    }
124
125    /// Assert that a command was sent to the [`P2p`] worker.
126    ///
127    /// [`P2p`]: crate::p2p::P2p
128    pub(crate) async fn expect_cmd(&mut self) -> P2pCmd {
129        self.try_recv_cmd()
130            .await
131            .expect("Expecting P2pCmd, but timed-out")
132    }
133
134    /// Assert that no command was sent to the [`P2p`] worker.
135    ///
136    /// [`P2p`]: crate::p2p::P2p
137    pub async fn expect_no_cmd(&mut self) {
138        if let Some(cmd) = self.try_recv_cmd().await {
139            panic!("Expecting no P2pCmd, but received: {cmd:?}");
140        }
141    }
142
143    pub(crate) async fn try_recv_cmd(&mut self) -> Option<P2pCmd> {
144        timeout(Duration::from_millis(300), async move {
145            self.cmd_rx.recv().await.expect("P2p dropped")
146        })
147        .await
148        .ok()
149    }
150
151    /// Assert that a header request was sent to the [`P2p`] worker and obtain a response channel.
152    ///
153    /// [`P2p`]: crate::p2p::P2p
154    pub async fn expect_header_request_cmd(
155        &mut self,
156    ) -> (
157        HeaderRequest,
158        OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
159    ) {
160        match self.expect_cmd().await {
161            P2pCmd::HeaderExRequest {
162                request,
163                respond_to,
164            } => (request, respond_to),
165            cmd => panic!("Expecting HeaderExRequest, but received: {cmd:?}"),
166        }
167    }
168
169    /// Assert that a header request for height was sent to the [`P2p`] worker and obtain a response channel.
170    ///
171    /// [`P2p`]: crate::p2p::P2p
172    pub async fn expect_header_request_for_height_cmd(
173        &mut self,
174    ) -> (u64, u64, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
175        let (req, respond_to) = self.expect_header_request_cmd().await;
176
177        match req.data {
178            Some(Data::Origin(height)) if req.amount > 0 => (height, req.amount, respond_to),
179            _ => panic!("Expecting HeaderExRequest for height, but received: {req:?}"),
180        }
181    }
182
183    /// Assert that a header request for hash was sent to the [`P2p`] worker and obtain a response channel.
184    ///
185    /// [`P2p`]: crate::p2p::P2p
186    pub async fn expect_header_request_for_hash_cmd(
187        &mut self,
188    ) -> (Hash, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
189        let (req, respond_to) = self.expect_header_request_cmd().await;
190
191        match req.data {
192            Some(Data::Hash(bytes)) if req.amount == 1 => {
193                let array = bytes.try_into().expect("Invalid hash");
194                let hash = Hash::Sha256(array);
195                (hash, respond_to)
196            }
197            _ => panic!("Expecting HeaderExRequest for hash, but received: {req:?}"),
198        }
199    }
200
201    /// Assert that a header-sub initialization command was sent to the [`P2p`] worker.
202    ///
203    /// [`P2p`]: crate::p2p::P2p
204    pub async fn expect_init_header_sub(&mut self) -> ExtendedHeader {
205        match self.expect_cmd().await {
206            P2pCmd::InitHeaderSub { head, channel } => {
207                self.header_sub_tx = Some(channel);
208                *head
209            }
210            cmd => panic!("Expecting InitHeaderSub, but received: {cmd:?}"),
211        }
212    }
213
214    /// Assert that a CID request was sent to the [`P2p`] worker and obtain a response channel.
215    ///
216    /// [`P2p`]: crate::p2p::P2p
217    pub async fn expect_get_shwap_cid(&mut self) -> (Cid, OneshotResultSender<Vec<u8>, P2pError>) {
218        match self.expect_cmd().await {
219            P2pCmd::GetShwapCid { cid, respond_to } => (cid, respond_to),
220            cmd => panic!("Expecting GetShwapCid, but received: {cmd:?}"),
221        }
222    }
223}
224
225/// Mock handle for `Daser`.
226pub struct MockDaserHandle {
227    pub(crate) cmd_rx: mpsc::Receiver<DaserCmd>,
228}
229
230impl MockDaserHandle {
231    pub(crate) async fn try_recv_cmd(&mut self) -> Option<DaserCmd> {
232        timeout(Duration::from_millis(300), async move {
233            self.cmd_rx.recv().await.expect("Daser dropped")
234        })
235        .await
236        .ok()
237    }
238
239    pub(crate) async fn expect_cmd(&mut self) -> DaserCmd {
240        self.try_recv_cmd()
241            .await
242            .expect("Expecting DaserCmd, but timed-out")
243    }
244
245    /// Assert that no command was sent to the `Daser` worker.
246    pub async fn expect_no_cmd(&mut self) {
247        if let Some(cmd) = self.try_recv_cmd().await {
248            panic!("Expecting no DaserCmd, but received: {cmd:?}");
249        }
250    }
251
252    /// Assert that `DaserCmd::WantToPrune` was sent to `Daser` and obtain a response channel.
253    pub async fn expect_want_to_prune(&mut self) -> (u64, oneshot::Sender<bool>) {
254        match self.expect_cmd().await {
255            DaserCmd::WantToPrune { height, respond_to } => (height, respond_to),
256            cmd => panic!("Expecting WantToPrune, but received: {cmd:?}"),
257        }
258    }
259
260    /// Assert that `DaserCmd::UpdateHighestPrunableHeight` was sent to `Daser`.
261    pub async fn expect_update_highest_prunable_block(&mut self) -> u64 {
262        match self.expect_cmd().await {
263            DaserCmd::UpdateHighestPrunableHeight { value } => value,
264            cmd => panic!("Expecting UpdateHighestPrunableHeight, but received: {cmd:?}"),
265        }
266    }
267
268    /// Assert that `DaserCmd::UpdateNumberOfPrunableBlocks` was sent to `Daser`.
269    pub async fn expect_update_number_of_prunable_blocks(&mut self) -> u64 {
270        match self.expect_cmd().await {
271            DaserCmd::UpdateNumberOfPrunableBlocks { value } => value,
272            cmd => panic!("Expecting UpdateNumberOfPrunableBlocks, but received: {cmd:?}"),
273        }
274    }
275}