1use std::time::Duration;
4
5use celestia_proto::p2p::pb::{HeaderRequest, header_request::Data};
6use celestia_types::ExtendedHeader;
7use celestia_types::hash::Hash;
8use celestia_types::test_utils::ExtendedHeaderGenerator;
9use cid::Cid;
10use lumina_utils::time::timeout;
11use tokio::sync::{mpsc, oneshot, watch};
12
13use crate::{
14 NodeBuilder,
15 block_ranges::{BlockRange, BlockRanges},
16 blockstore::InMemoryBlockstore,
17 daser::DaserCmd,
18 network::Network,
19 p2p::{P2pCmd, P2pError},
20 peer_tracker::PeerTrackerInfo,
21 store::{InMemoryStore, VerifiedExtendedHeaders},
22 utils::OneshotResultSender,
23};
24
25pub async fn gen_filled_store(amount: u64) -> (InMemoryStore, ExtendedHeaderGenerator) {
27 let s = InMemoryStore::new();
28 let mut generator = ExtendedHeaderGenerator::new();
29
30 s.insert(generator.next_many_verified(amount))
31 .await
32 .expect("inserting test data failed");
33
34 (s, generator)
35}
36
37#[cfg(all(test, target_arch = "wasm32"))]
38pub(crate) async fn new_indexed_db_store_name() -> String {
39 use std::sync::atomic::{AtomicU32, Ordering};
40 static NEXT_ID: AtomicU32 = AtomicU32::new(0);
41
42 let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
43 let db_name = format!("indexeddb-lumina-node-store-test-{id}");
44
45 rexie::Rexie::delete(&db_name).await.unwrap();
47
48 db_name
49}
50
51pub fn new_block_ranges<const N: usize>(ranges: [BlockRange; N]) -> BlockRanges {
53 BlockRanges::from_vec(ranges.into_iter().collect()).expect("invalid BlockRanges")
54}
55
56pub fn test_node_builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
58 NodeBuilder::new().network(Network::custom("private").unwrap())
59}
60
61pub fn listening_test_node_builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
63 test_node_builder().listen(["/ip4/0.0.0.0/tcp/0".parse().unwrap()])
64}
65
66pub trait ExtendedHeaderGeneratorExt {
68 fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders;
70 fn next_many_empty_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders;
72}
73
74impl ExtendedHeaderGeneratorExt for ExtendedHeaderGenerator {
75 fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders {
76 unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many(amount)) }
77 }
78 fn next_many_empty_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders {
79 unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many_empty(amount)) }
80 }
81}
82
83pub struct MockP2pHandle {
87 #[allow(dead_code)]
88 pub(crate) cmd_tx: mpsc::Sender<P2pCmd>,
89 pub(crate) cmd_rx: mpsc::Receiver<P2pCmd>,
90 pub(crate) header_sub_tx: Option<mpsc::Sender<ExtendedHeader>>,
91 pub(crate) peer_tracker_tx: watch::Sender<PeerTrackerInfo>,
92}
93
94impl MockP2pHandle {
95 pub fn announce_peer_connected(&self) {
97 self.peer_tracker_tx.send_modify(|info| {
98 info.num_connected_peers += 1;
99 });
100 }
101
102 pub fn announce_trusted_peer_connected(&self) {
104 self.peer_tracker_tx.send_modify(|info| {
105 info.num_connected_peers += 1;
106 info.num_connected_trusted_peers += 1;
107 });
108 }
109
110 pub fn announce_all_peers_disconnected(&self) {
112 self.peer_tracker_tx.send_modify(|info| {
113 info.num_connected_peers = 0;
114 info.num_connected_trusted_peers = 0;
115 });
116 }
117
118 pub fn announce_new_head(&self, header: ExtendedHeader) {
120 if let Some(ref tx) = self.header_sub_tx {
121 let _ = tx.try_send(header);
122 }
123 }
124
125 pub(crate) async fn expect_cmd(&mut self) -> P2pCmd {
129 self.try_recv_cmd()
130 .await
131 .expect("Expecting P2pCmd, but timed-out")
132 }
133
134 pub async fn expect_no_cmd(&mut self) {
138 if let Some(cmd) = self.try_recv_cmd().await {
139 panic!("Expecting no P2pCmd, but received: {cmd:?}");
140 }
141 }
142
143 pub(crate) async fn try_recv_cmd(&mut self) -> Option<P2pCmd> {
144 timeout(Duration::from_millis(300), async move {
145 self.cmd_rx.recv().await.expect("P2p dropped")
146 })
147 .await
148 .ok()
149 }
150
151 pub async fn expect_header_request_cmd(
155 &mut self,
156 ) -> (
157 HeaderRequest,
158 OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
159 ) {
160 match self.expect_cmd().await {
161 P2pCmd::HeaderExRequest {
162 request,
163 respond_to,
164 } => (request, respond_to),
165 cmd => panic!("Expecting HeaderExRequest, but received: {cmd:?}"),
166 }
167 }
168
169 pub async fn expect_header_request_for_height_cmd(
173 &mut self,
174 ) -> (u64, u64, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
175 let (req, respond_to) = self.expect_header_request_cmd().await;
176
177 match req.data {
178 Some(Data::Origin(height)) if req.amount > 0 => (height, req.amount, respond_to),
179 _ => panic!("Expecting HeaderExRequest for height, but received: {req:?}"),
180 }
181 }
182
183 pub async fn expect_header_request_for_hash_cmd(
187 &mut self,
188 ) -> (Hash, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
189 let (req, respond_to) = self.expect_header_request_cmd().await;
190
191 match req.data {
192 Some(Data::Hash(bytes)) if req.amount == 1 => {
193 let array = bytes.try_into().expect("Invalid hash");
194 let hash = Hash::Sha256(array);
195 (hash, respond_to)
196 }
197 _ => panic!("Expecting HeaderExRequest for hash, but received: {req:?}"),
198 }
199 }
200
201 pub async fn expect_init_header_sub(&mut self) -> ExtendedHeader {
205 match self.expect_cmd().await {
206 P2pCmd::InitHeaderSub { head, channel } => {
207 self.header_sub_tx = Some(channel);
208 *head
209 }
210 cmd => panic!("Expecting InitHeaderSub, but received: {cmd:?}"),
211 }
212 }
213
214 pub async fn expect_get_shwap_cid(&mut self) -> (Cid, OneshotResultSender<Vec<u8>, P2pError>) {
218 match self.expect_cmd().await {
219 P2pCmd::GetShwapCid { cid, respond_to } => (cid, respond_to),
220 cmd => panic!("Expecting GetShwapCid, but received: {cmd:?}"),
221 }
222 }
223}
224
225pub struct MockDaserHandle {
227 pub(crate) cmd_rx: mpsc::Receiver<DaserCmd>,
228}
229
230impl MockDaserHandle {
231 pub(crate) async fn try_recv_cmd(&mut self) -> Option<DaserCmd> {
232 timeout(Duration::from_millis(300), async move {
233 self.cmd_rx.recv().await.expect("Daser dropped")
234 })
235 .await
236 .ok()
237 }
238
239 pub(crate) async fn expect_cmd(&mut self) -> DaserCmd {
240 self.try_recv_cmd()
241 .await
242 .expect("Expecting DaserCmd, but timed-out")
243 }
244
245 pub async fn expect_no_cmd(&mut self) {
247 if let Some(cmd) = self.try_recv_cmd().await {
248 panic!("Expecting no DaserCmd, but received: {cmd:?}");
249 }
250 }
251
252 pub async fn expect_want_to_prune(&mut self) -> (u64, oneshot::Sender<bool>) {
254 match self.expect_cmd().await {
255 DaserCmd::WantToPrune { height, respond_to } => (height, respond_to),
256 cmd => panic!("Expecting WantToPrune, but received: {cmd:?}"),
257 }
258 }
259
260 pub async fn expect_update_highest_prunable_block(&mut self) -> u64 {
262 match self.expect_cmd().await {
263 DaserCmd::UpdateHighestPrunableHeight { value } => value,
264 cmd => panic!("Expecting UpdateHighestPrunableHeight, but received: {cmd:?}"),
265 }
266 }
267
268 pub async fn expect_update_number_of_prunable_blocks(&mut self) -> u64 {
270 match self.expect_cmd().await {
271 DaserCmd::UpdateNumberOfPrunableBlocks { value } => value,
272 cmd => panic!("Expecting UpdateNumberOfPrunableBlocks, but received: {cmd:?}"),
273 }
274 }
275}