lumina_node/
test_utils.rs1use std::time::Duration;
4
5use celestia_proto::p2p::pb::{HeaderRequest, header_request::Data};
6use celestia_types::ExtendedHeader;
7use celestia_types::hash::Hash;
8use celestia_types::test_utils::ExtendedHeaderGenerator;
9use cid::Cid;
10use lumina_utils::time::timeout;
11use tokio::sync::{mpsc, oneshot, watch};
12
13use crate::{
14 NodeBuilder,
15 block_ranges::{BlockRange, BlockRanges},
16 blockstore::InMemoryBlockstore,
17 daser::DaserCmd,
18 network::Network,
19 p2p::{P2pCmd, P2pError},
20 peer_tracker::PeerTrackerInfo,
21 store::{InMemoryStore, VerifiedExtendedHeaders},
22 utils::OneshotResultSender,
23};
24
25pub async fn gen_filled_store(amount: u64) -> (InMemoryStore, ExtendedHeaderGenerator) {
27 let s = InMemoryStore::new();
28 let mut generator = ExtendedHeaderGenerator::new();
29
30 s.insert(generator.next_many_verified(amount))
31 .await
32 .expect("inserting test data failed");
33
34 (s, generator)
35}
36
37#[cfg(all(test, target_arch = "wasm32"))]
38pub(crate) async fn new_indexed_db_store_name() -> String {
39 use std::sync::atomic::{AtomicU32, Ordering};
40 static NEXT_ID: AtomicU32 = AtomicU32::new(0);
41
42 let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
43 let db_name = format!("indexeddb-lumina-node-store-test-{id}");
44
45 rexie::Rexie::delete(&db_name).await.unwrap();
47
48 db_name
49}
50
51pub fn new_block_ranges<const N: usize>(ranges: [BlockRange; N]) -> BlockRanges {
53 BlockRanges::from_vec(ranges.into_iter().collect()).expect("invalid BlockRanges")
54}
55
56pub fn test_node_builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
58 NodeBuilder::new().network(Network::custom("private").unwrap())
59}
60
61pub fn listening_test_node_builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
63 test_node_builder().listen(["/ip4/0.0.0.0/tcp/0".parse().unwrap()])
64}
65
66pub trait ExtendedHeaderGeneratorExt {
68 fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders;
70}
71
72impl ExtendedHeaderGeneratorExt for ExtendedHeaderGenerator {
73 fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders {
74 unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many(amount)) }
75 }
76}
77
78pub struct MockP2pHandle {
82 #[allow(dead_code)]
83 pub(crate) cmd_tx: mpsc::Sender<P2pCmd>,
84 pub(crate) cmd_rx: mpsc::Receiver<P2pCmd>,
85 pub(crate) header_sub_tx: Option<mpsc::Sender<ExtendedHeader>>,
86 pub(crate) peer_tracker_tx: watch::Sender<PeerTrackerInfo>,
87}
88
89impl MockP2pHandle {
90 pub fn announce_peer_connected(&self) {
92 self.peer_tracker_tx.send_modify(|info| {
93 info.num_connected_peers += 1;
94 });
95 }
96
97 pub fn announce_trusted_peer_connected(&self) {
99 self.peer_tracker_tx.send_modify(|info| {
100 info.num_connected_peers += 1;
101 info.num_connected_trusted_peers += 1;
102 });
103 }
104
105 pub fn announce_all_peers_disconnected(&self) {
107 self.peer_tracker_tx.send_modify(|info| {
108 info.num_connected_peers = 0;
109 info.num_connected_trusted_peers = 0;
110 });
111 }
112
113 pub fn announce_new_head(&self, header: ExtendedHeader) {
115 if let Some(ref tx) = self.header_sub_tx {
116 let _ = tx.try_send(header);
117 }
118 }
119
120 pub(crate) async fn expect_cmd(&mut self) -> P2pCmd {
124 self.try_recv_cmd()
125 .await
126 .expect("Expecting P2pCmd, but timed-out")
127 }
128
129 pub async fn expect_no_cmd(&mut self) {
133 if let Some(cmd) = self.try_recv_cmd().await {
134 panic!("Expecting no P2pCmd, but received: {cmd:?}");
135 }
136 }
137
138 pub(crate) async fn try_recv_cmd(&mut self) -> Option<P2pCmd> {
139 timeout(Duration::from_millis(300), async move {
140 self.cmd_rx.recv().await.expect("P2p dropped")
141 })
142 .await
143 .ok()
144 }
145
146 pub async fn expect_header_request_cmd(
150 &mut self,
151 ) -> (
152 HeaderRequest,
153 OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
154 ) {
155 match self.expect_cmd().await {
156 P2pCmd::HeaderExRequest {
157 request,
158 respond_to,
159 } => (request, respond_to),
160 cmd => panic!("Expecting HeaderExRequest, but received: {cmd:?}"),
161 }
162 }
163
164 pub async fn expect_header_request_for_height_cmd(
168 &mut self,
169 ) -> (u64, u64, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
170 let (req, respond_to) = self.expect_header_request_cmd().await;
171
172 match req.data {
173 Some(Data::Origin(height)) if req.amount > 0 => (height, req.amount, respond_to),
174 _ => panic!("Expecting HeaderExRequest for height, but received: {req:?}"),
175 }
176 }
177
178 pub async fn expect_header_request_for_hash_cmd(
182 &mut self,
183 ) -> (Hash, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
184 let (req, respond_to) = self.expect_header_request_cmd().await;
185
186 match req.data {
187 Some(Data::Hash(bytes)) if req.amount == 1 => {
188 let array = bytes.try_into().expect("Invalid hash");
189 let hash = Hash::Sha256(array);
190 (hash, respond_to)
191 }
192 _ => panic!("Expecting HeaderExRequest for hash, but received: {req:?}"),
193 }
194 }
195
196 pub async fn expect_init_header_sub(&mut self) -> ExtendedHeader {
200 match self.expect_cmd().await {
201 P2pCmd::InitHeaderSub { head, channel } => {
202 self.header_sub_tx = Some(channel);
203 *head
204 }
205 cmd => panic!("Expecting InitHeaderSub, but received: {cmd:?}"),
206 }
207 }
208
209 pub async fn expect_get_shwap_cid(&mut self) -> (Cid, OneshotResultSender<Vec<u8>, P2pError>) {
213 match self.expect_cmd().await {
214 P2pCmd::GetShwapCid { cid, respond_to } => (cid, respond_to),
215 cmd => panic!("Expecting GetShwapCid, but received: {cmd:?}"),
216 }
217 }
218}
219
220pub struct MockDaserHandle {
222 pub(crate) cmd_rx: mpsc::Receiver<DaserCmd>,
223}
224
225impl MockDaserHandle {
226 pub(crate) async fn try_recv_cmd(&mut self) -> Option<DaserCmd> {
227 timeout(Duration::from_millis(300), async move {
228 self.cmd_rx.recv().await.expect("Daser dropped")
229 })
230 .await
231 .ok()
232 }
233
234 pub(crate) async fn expect_cmd(&mut self) -> DaserCmd {
235 self.try_recv_cmd()
236 .await
237 .expect("Expecting DaserCmd, but timed-out")
238 }
239
240 pub async fn expect_no_cmd(&mut self) {
242 if let Some(cmd) = self.try_recv_cmd().await {
243 panic!("Expecting no DaserCmd, but received: {cmd:?}");
244 }
245 }
246
247 pub async fn expect_want_to_prune(&mut self) -> (u64, oneshot::Sender<bool>) {
249 match self.expect_cmd().await {
250 DaserCmd::WantToPrune { height, respond_to } => (height, respond_to),
251 cmd => panic!("Expecting WantToPrune, but received: {cmd:?}"),
252 }
253 }
254
255 pub async fn expect_update_highest_prunable_block(&mut self) -> u64 {
257 match self.expect_cmd().await {
258 DaserCmd::UpdateHighestPrunableHeight { value } => value,
259 cmd => panic!("Expecting UpdateHighestPrunableHeight, but received: {cmd:?}"),
260 }
261 }
262
263 pub async fn expect_update_number_of_prunable_blocks(&mut self) -> u64 {
265 match self.expect_cmd().await {
266 DaserCmd::UpdateNumberOfPrunableBlocks { value } => value,
267 cmd => panic!("Expecting UpdateNumberOfPrunableBlocks, but received: {cmd:?}"),
268 }
269 }
270}