lumina_node/
test_utils.rs1use std::time::Duration;
4
5use celestia_proto::p2p::pb::{header_request::Data, HeaderRequest};
6use celestia_types::hash::Hash;
7use celestia_types::test_utils::ExtendedHeaderGenerator;
8use celestia_types::ExtendedHeader;
9use cid::Cid;
10use lumina_utils::time::timeout;
11use tokio::sync::{mpsc, watch};
12
13use crate::{
14 block_ranges::{BlockRange, BlockRanges},
15 blockstore::InMemoryBlockstore,
16 network::Network,
17 p2p::{P2pCmd, P2pError},
18 peer_tracker::PeerTrackerInfo,
19 store::{InMemoryStore, VerifiedExtendedHeaders},
20 utils::OneshotResultSender,
21 NodeBuilder,
22};
23
24pub async fn gen_filled_store(amount: u64) -> (InMemoryStore, ExtendedHeaderGenerator) {
26 let s = InMemoryStore::new();
27 let mut gen = ExtendedHeaderGenerator::new();
28
29 s.insert(gen.next_many_verified(amount))
30 .await
31 .expect("inserting test data failed");
32
33 (s, gen)
34}
35
36pub fn new_block_ranges<const N: usize>(ranges: [BlockRange; N]) -> BlockRanges {
38 BlockRanges::from_vec(ranges.into_iter().collect()).expect("invalid BlockRanges")
39}
40
41pub fn test_node_builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
43 NodeBuilder::new().network(Network::custom("private").unwrap())
44}
45
46pub fn listening_test_node_builder() -> NodeBuilder<InMemoryBlockstore, InMemoryStore> {
48 test_node_builder().listen(["/ip4/0.0.0.0/tcp/0".parse().unwrap()])
49}
50
51pub trait ExtendedHeaderGeneratorExt {
53 fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders;
55}
56
57impl ExtendedHeaderGeneratorExt for ExtendedHeaderGenerator {
58 fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders {
59 unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many(amount)) }
60 }
61}
62
63pub struct MockP2pHandle {
67 #[allow(dead_code)]
68 pub(crate) cmd_tx: mpsc::Sender<P2pCmd>,
69 pub(crate) cmd_rx: mpsc::Receiver<P2pCmd>,
70 pub(crate) header_sub_tx: Option<mpsc::Sender<ExtendedHeader>>,
71 pub(crate) peer_tracker_tx: watch::Sender<PeerTrackerInfo>,
72}
73
74impl MockP2pHandle {
75 pub fn announce_peer_connected(&self) {
77 self.peer_tracker_tx.send_modify(|info| {
78 info.num_connected_peers += 1;
79 });
80 }
81
82 pub fn announce_trusted_peer_connected(&self) {
84 self.peer_tracker_tx.send_modify(|info| {
85 info.num_connected_peers += 1;
86 info.num_connected_trusted_peers += 1;
87 });
88 }
89
90 pub fn announce_all_peers_disconnected(&self) {
92 self.peer_tracker_tx.send_modify(|info| {
93 info.num_connected_peers = 0;
94 info.num_connected_trusted_peers = 0;
95 });
96 }
97
98 pub fn announce_new_head(&self, header: ExtendedHeader) {
100 if let Some(ref tx) = self.header_sub_tx {
101 let _ = tx.try_send(header);
102 }
103 }
104
105 pub(crate) async fn expect_cmd(&mut self) -> P2pCmd {
109 self.try_recv_cmd()
110 .await
111 .expect("Expecting P2pCmd, but timed-out")
112 }
113
114 pub async fn expect_no_cmd(&mut self) {
118 if let Some(cmd) = self.try_recv_cmd().await {
119 panic!("Expecting no P2pCmd, but received: {cmd:?}");
120 }
121 }
122
123 pub(crate) async fn try_recv_cmd(&mut self) -> Option<P2pCmd> {
124 timeout(Duration::from_millis(300), async move {
125 self.cmd_rx.recv().await.expect("P2p dropped")
126 })
127 .await
128 .ok()
129 }
130
131 pub async fn expect_header_request_cmd(
135 &mut self,
136 ) -> (
137 HeaderRequest,
138 OneshotResultSender<Vec<ExtendedHeader>, P2pError>,
139 ) {
140 match self.expect_cmd().await {
141 P2pCmd::HeaderExRequest {
142 request,
143 respond_to,
144 } => (request, respond_to),
145 cmd => panic!("Expecting HeaderExRequest, but received: {cmd:?}"),
146 }
147 }
148
149 pub async fn expect_header_request_for_height_cmd(
153 &mut self,
154 ) -> (u64, u64, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
155 let (req, respond_to) = self.expect_header_request_cmd().await;
156
157 match req.data {
158 Some(Data::Origin(height)) if req.amount > 0 => (height, req.amount, respond_to),
159 _ => panic!("Expecting HeaderExRequest for height, but received: {req:?}"),
160 }
161 }
162
163 pub async fn expect_header_request_for_hash_cmd(
167 &mut self,
168 ) -> (Hash, OneshotResultSender<Vec<ExtendedHeader>, P2pError>) {
169 let (req, respond_to) = self.expect_header_request_cmd().await;
170
171 match req.data {
172 Some(Data::Hash(bytes)) if req.amount == 1 => {
173 let array = bytes.try_into().expect("Invalid hash");
174 let hash = Hash::Sha256(array);
175 (hash, respond_to)
176 }
177 _ => panic!("Expecting HeaderExRequest for hash, but received: {req:?}"),
178 }
179 }
180
181 pub async fn expect_init_header_sub(&mut self) -> ExtendedHeader {
185 match self.expect_cmd().await {
186 P2pCmd::InitHeaderSub { head, channel } => {
187 self.header_sub_tx = Some(channel);
188 *head
189 }
190 cmd => panic!("Expecting InitHeaderSub, but received: {cmd:?}"),
191 }
192 }
193
194 pub async fn expect_get_shwap_cid(&mut self) -> (Cid, OneshotResultSender<Vec<u8>, P2pError>) {
198 match self.expect_cmd().await {
199 P2pCmd::GetShwapCid { cid, respond_to } => (cid, respond_to),
200 cmd => panic!("Expecting GetShwapCid, but received: {cmd:?}"),
201 }
202 }
203}