blueprint_networking/test_utils/
mod.rs1use crate::{
2 NetworkConfig, NetworkService, service::AllowedKeys, service_handle::NetworkServiceHandle,
3};
4use blueprint_core::info;
5use blueprint_crypto::KeyType;
6use libp2p::{
7 Multiaddr, PeerId,
8 identity::{self, Keypair},
9};
10use std::{collections::HashSet, time::Duration};
11use tokio::time::timeout;
12
13pub fn setup_log() {
14 let _ = tracing_subscriber::fmt()
15 .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
16 .with_target(true)
17 .with_thread_ids(true)
18 .with_file(true)
19 .with_line_number(true)
20 .try_init();
21}
22
23pub struct TestNode<K: KeyType> {
25 pub service: Option<NetworkService<K>>,
26 pub peer_id: PeerId,
27 pub listen_addr: Option<Multiaddr>,
28 pub instance_key_pair: K::Secret,
29 pub local_key: Keypair,
30 pub using_evm_address_for_handshake_verification: bool,
31}
32
33impl<K: KeyType> TestNode<K> {
34 pub fn new(
36 network_name: &str,
37 instance_id: &str,
38 allowed_keys: AllowedKeys<K>,
39 bootstrap_peers: Vec<Multiaddr>,
40 using_evm_address_for_handshake_verification: bool,
41 ) -> Self {
42 Self::new_with_keys(
43 network_name,
44 instance_id,
45 allowed_keys,
46 bootstrap_peers,
47 None,
48 None,
49 using_evm_address_for_handshake_verification,
50 )
51 }
52
53 pub fn new_with_keys(
73 network_name: &str,
74 instance_id: &str,
75 allowed_keys: AllowedKeys<K>,
76 bootstrap_peers: Vec<Multiaddr>,
77 instance_key_pair: Option<K::Secret>,
78 local_key: Option<Keypair>,
79 using_evm_address_for_handshake_verification: bool,
80 ) -> Self {
81 let local_key = local_key.unwrap_or_else(identity::Keypair::generate_ed25519);
82 let peer_id = local_key.public().to_peer_id();
83
84 let listen_addr: Multiaddr = "/ip4/0.0.0.0/tcp/0".parse().unwrap();
86 info!("Creating test node {peer_id} with TCP address: {listen_addr}");
87
88 let instance_key_pair =
89 instance_key_pair.unwrap_or_else(|| K::generate_with_seed(None).unwrap());
90
91 let config = NetworkConfig {
92 network_name: network_name.to_string(),
93 instance_id: instance_id.to_string(),
94 instance_key_pair: instance_key_pair.clone(),
95 local_key: local_key.clone(),
96 listen_addr: listen_addr.clone(),
97 target_peer_count: 10,
98 bootstrap_peers,
99 enable_mdns: true,
100 enable_kademlia: true,
101 using_evm_address_for_handshake_verification,
102 };
103
104 let (_, allowed_keys_rx) = crossbeam_channel::unbounded();
105 let service = NetworkService::new(config, allowed_keys, allowed_keys_rx)
106 .expect("Failed to create network service");
107
108 Self {
109 service: Some(service),
110 peer_id,
111 listen_addr: None,
112 instance_key_pair,
113 local_key,
114 using_evm_address_for_handshake_verification,
115 }
116 }
117
118 pub async fn start(&mut self) -> Result<NetworkServiceHandle<K>, &'static str> {
124 let service = self.service.take().ok_or("Service already started")?;
126 let handle = service.start();
127
128 let timeout_duration = Duration::from_secs(10); match timeout(timeout_duration, async {
131 while self.listen_addr.is_none() {
133 if let Some(addr) = handle.get_listen_addr() {
134 info!("Node {} listening on {}", self.peer_id, addr);
135 self.listen_addr = Some(addr.clone());
136
137 let addr_str = addr.to_string();
139 let port = addr_str.split('/').nth(4).unwrap_or("0").to_string();
140
141 let localhost_addr = format!("127.0.0.1:{}", port);
143 match tokio::net::TcpStream::connect(&localhost_addr).await {
144 Ok(_) => {
145 info!("Successfully verified localhost port for {}", self.peer_id);
146 break;
147 }
148 Err(e) => {
149 info!("Localhost port not ready for {}: {}", self.peer_id, e);
150 let external_addr = format!("10.0.1.142:{}", port);
152 match tokio::net::TcpStream::connect(&external_addr).await {
153 Ok(_) => {
154 info!(
155 "Successfully verified external port for {}",
156 self.peer_id
157 );
158 break;
159 }
160 Err(e) => {
161 info!("External port not ready for {}: {}", self.peer_id, e);
162 tokio::time::sleep(Duration::from_millis(100)).await;
163 continue;
164 }
165 }
166 }
167 }
168 }
169 tokio::time::sleep(Duration::from_millis(100)).await;
170 }
171
172 tokio::time::sleep(Duration::from_millis(500)).await;
174
175 Ok::<(), &'static str>(())
176 })
177 .await
178 {
179 Ok(Ok(())) => {
180 info!("Node {} fully initialized", self.peer_id);
181 Ok(handle)
182 }
183 Ok(Err(e)) => Err(e),
184 Err(_) => Err("Timeout waiting for node to initialize"),
185 }
186 }
187
188 pub fn get_listen_addr(&self) -> Option<Multiaddr> {
190 self.listen_addr.clone()
191 }
192
193 pub fn insert_allowed_keys(&self, allowed_keys: AllowedKeys<K>) {
195 if let Some(service) = &self.service {
196 service.peer_manager.insert_whitelisted_keys(allowed_keys);
197 }
198 }
199}
200
201pub async fn wait_for_condition<F>(timeout: Duration, mut condition: F) -> Result<(), &'static str>
212where
213 F: FnMut() -> bool,
214{
215 let start = std::time::Instant::now();
216 while !condition() {
217 if start.elapsed() > timeout {
218 return Err("Timeout waiting for condition");
219 }
220 tokio::time::sleep(Duration::from_millis(100)).await;
221 }
222 Ok(())
223}
224
225pub async fn wait_for_peer_discovery<K: KeyType>(
236 handles: &[NetworkServiceHandle<K>],
237 timeout: Duration,
238) -> Result<(), &'static str> {
239 info!("Waiting for peer discovery...");
240
241 wait_for_condition(timeout, || {
242 for (i, handle1) in handles.iter().enumerate() {
243 for (j, handle2) in handles.iter().enumerate() {
244 if i != j && !handle1.peers().contains(&handle2.local_peer_id) {
245 return false;
246 }
247 }
248 }
249 true
250 })
251 .await
252}
253
254pub async fn wait_for_peer_info<K: KeyType>(
266 handle1: &NetworkServiceHandle<K>,
267 handle2: &NetworkServiceHandle<K>,
268 timeout: Duration,
269) {
270 info!("Waiting for identify info...");
271
272 match tokio::time::timeout(timeout, async {
273 loop {
274 let peer_info1 = handle1.peer_info(&handle2.local_peer_id);
275 let peer_info2 = handle2.peer_info(&handle1.local_peer_id);
276
277 if let Some(peer_info) = peer_info1 {
278 if peer_info.identify_info.is_some() {
279 if let Some(peer_info) = peer_info2 {
281 if peer_info.identify_info.is_some() {
282 info!("Identify info populated in both directions");
283 break;
284 }
285 }
286 }
287 }
288 tokio::time::sleep(Duration::from_millis(100)).await;
289 }
290 })
291 .await
292 {
293 Ok(()) => info!("Peer info updated successfully in both directions"),
294 Err(e) => panic!("Peer info update timed out: {e}"),
295 }
296}
297
298pub async fn wait_for_all_handshakes<K: KeyType>(
309 handles: &[&mut NetworkServiceHandle<K>],
310 timeout_length: Duration,
311) {
312 info!("Starting handshake wait for {} nodes", handles.len());
313 timeout(timeout_length, async {
314 loop {
315 let mut all_verified = true;
316 for (i, handle1) in handles.iter().enumerate() {
317 for (j, handle2) in handles.iter().enumerate() {
318 if i != j {
319 let verified = handle1
320 .peer_manager
321 .is_peer_verified(&handle2.local_peer_id);
322 if !verified {
323 info!("Node {} -> Node {}: handshake not verified yet", i, j);
324 all_verified = false;
325 break;
326 }
327 }
328 }
329 if !all_verified {
330 break;
331 }
332 }
333 if all_verified {
334 info!("All handshakes completed successfully");
335 break;
336 }
337 tokio::time::sleep(Duration::from_millis(100)).await;
338 }
339 })
340 .await
341 .expect("Handshake verification timed out");
342}
343
344pub async fn wait_for_handshake_completion<K: KeyType>(
356 handle1: &NetworkServiceHandle<K>,
357 handle2: &NetworkServiceHandle<K>,
358 timeout_length: Duration,
359) {
360 timeout(timeout_length, async {
361 loop {
362 if handle1
363 .peer_manager
364 .is_peer_verified(&handle2.local_peer_id)
365 && handle2
366 .peer_manager
367 .is_peer_verified(&handle1.local_peer_id)
368 {
369 break;
370 }
371 tokio::time::sleep(Duration::from_millis(100)).await;
372 }
373 })
374 .await
375 .expect("Handshake verification timed out");
376}
377
378pub fn create_node_with_keys<K: KeyType>(
392 network: &str,
393 instance: &str,
394 allowed_keys: AllowedKeys<K>,
395 key_pair: Option<K::Secret>,
396 using_evm_address_for_handshake_verification: bool,
397) -> TestNode<K> {
398 TestNode::new_with_keys(
399 network,
400 instance,
401 allowed_keys,
402 vec![],
403 key_pair,
404 None,
405 using_evm_address_for_handshake_verification,
406 )
407}
408
409#[must_use]
424pub fn create_whitelisted_nodes<K: KeyType>(
425 count: usize,
426 network_name: &str,
427 instance_name: &str,
428 using_evm_address_for_handshake_verification: bool,
429) -> Vec<TestNode<K>> {
430 let mut nodes = Vec::with_capacity(count);
431 let mut key_pairs = Vec::with_capacity(count);
432 let mut allowed_keys = HashSet::new();
433
434 for _ in 0..count {
436 let key_pair = K::generate_with_seed(None).unwrap();
437 key_pairs.push(key_pair.clone());
438 allowed_keys.insert(K::public_from_secret(&key_pair));
439 }
440
441 for key_pair in &key_pairs {
443 nodes.push(create_node_with_keys(
444 network_name,
445 instance_name,
446 AllowedKeys::InstancePublicKeys(allowed_keys.clone()),
447 Some(key_pair.clone()),
448 using_evm_address_for_handshake_verification,
449 ));
450 }
451
452 nodes
453}