1use crate::{
2 binary::FreenetBinary,
3 network::TestNetwork,
4 peer::{get_free_port, TestPeer},
5 process::{self, PeerProcess},
6 remote::{PeerLocation, RemoteMachine},
7 Error, Result,
8};
9use chrono::Utc;
10use std::collections::HashMap;
11use std::fs;
12use std::net::Ipv4Addr;
13use std::path::{Path, PathBuf};
14use std::process::Command;
15use std::time::{Duration, SystemTime};
16
17struct GatewayInfo {
18 address: String,
19 public_key_path: PathBuf,
20}
21
22pub struct NetworkBuilder {
24 gateways: usize,
25 peers: usize,
26 binary: FreenetBinary,
27 min_connectivity: f64,
28 connectivity_timeout: Duration,
29 preserve_data_on_failure: bool,
30 preserve_data_on_success: bool,
31 peer_locations: HashMap<usize, PeerLocation>,
32 default_location: PeerLocation,
33 min_connections: Option<usize>,
34 max_connections: Option<usize>,
35}
36
37impl Default for NetworkBuilder {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl NetworkBuilder {
44 pub fn new() -> Self {
45 Self {
46 gateways: 1,
47 peers: 3,
48 binary: FreenetBinary::default(),
49 min_connectivity: 1.0, connectivity_timeout: Duration::from_secs(30),
51 preserve_data_on_failure: false,
52 preserve_data_on_success: false,
53 peer_locations: HashMap::new(),
54 default_location: PeerLocation::Local,
55 min_connections: None,
56 max_connections: None,
57 }
58 }
59
60 pub fn gateways(mut self, n: usize) -> Self {
62 self.gateways = n;
63 self
64 }
65
66 pub fn peers(mut self, n: usize) -> Self {
68 self.peers = n;
69 self
70 }
71
72 pub fn binary(mut self, binary: FreenetBinary) -> Self {
74 self.binary = binary;
75 self
76 }
77
78 pub fn require_connectivity(mut self, ratio: f64) -> Self {
80 self.min_connectivity = ratio;
81 self
82 }
83
84 pub fn connectivity_timeout(mut self, timeout: Duration) -> Self {
86 self.connectivity_timeout = timeout;
87 self
88 }
89
90 pub fn min_connections(mut self, min: usize) -> Self {
92 self.min_connections = Some(min);
93 self
94 }
95
96 pub fn max_connections(mut self, max: usize) -> Self {
98 self.max_connections = Some(max);
99 self
100 }
101
102 pub fn preserve_temp_dirs_on_failure(mut self, preserve: bool) -> Self {
104 self.preserve_data_on_failure = preserve;
105 self
106 }
107
108 pub fn preserve_temp_dirs_on_success(mut self, preserve: bool) -> Self {
110 self.preserve_data_on_success = preserve;
111 self
112 }
113
114 pub fn peer_location(mut self, index: usize, location: PeerLocation) -> Self {
117 self.peer_locations.insert(index, location);
118 self
119 }
120
121 pub fn default_location(mut self, location: PeerLocation) -> Self {
123 self.default_location = location;
124 self
125 }
126
127 pub fn distribute_across_remotes(mut self, machines: Vec<RemoteMachine>) -> Self {
130 let total_peers = self.gateways + self.peers;
131 for (idx, machine) in (0..total_peers).zip(machines.iter().cycle()) {
132 self.peer_locations
133 .insert(idx, PeerLocation::Remote(machine.clone()));
134 }
135 self
136 }
137
138 pub async fn build(self) -> Result<TestNetwork> {
140 let binary_path = self.binary.resolve()?;
141
142 tracing::info!(
143 "Starting test network: {} gateways, {} peers",
144 self.gateways,
145 self.peers
146 );
147
148 let base_dir = resolve_base_dir();
149 fs::create_dir_all(&base_dir)?;
150 cleanup_old_runs(&base_dir, 5)?;
151 let run_root = create_run_directory(&base_dir)?;
152
153 let mut run_status = RunStatusGuard::new(&run_root);
154
155 let mut gateways = Vec::new();
157 for i in 0..self.gateways {
158 let peer = match self.start_peer(&binary_path, i, true, &run_root).await {
159 Ok(peer) => peer,
160 Err(err) => {
161 let detail = format!("failed to start gateway {i}: {err}");
162 run_status.mark("failure", Some(&detail));
163 return Err(err);
164 }
165 };
166 gateways.push(peer);
167 }
168
169 let gateway_info: Vec<_> = gateways
171 .iter()
172 .map(|gw| GatewayInfo {
173 address: format!("{}:{}", gw.network_address, gw.network_port),
174 public_key_path: gw
175 .public_key_path
176 .clone()
177 .expect("Gateway must have public key"),
178 })
179 .collect();
180
181 let mut peers = Vec::new();
183 for i in 0..self.peers {
184 let peer = match self
185 .start_peer_with_gateways(
186 &binary_path,
187 i + self.gateways,
188 false,
189 &gateway_info,
190 &run_root,
191 )
192 .await
193 {
194 Ok(peer) => peer,
195 Err(err) => {
196 let detail = format!("failed to start peer {}: {}", i + self.gateways, err);
197 run_status.mark("failure", Some(&detail));
198 return Err(err);
199 }
200 };
201 peers.push(peer);
202 if i + 1 < self.peers {
203 tokio::time::sleep(Duration::from_millis(500)).await;
204 }
205 }
206
207 let network = TestNetwork::new(gateways, peers, self.min_connectivity, run_root.clone());
208
209 match network
211 .wait_until_ready_with_timeout(self.connectivity_timeout)
212 .await
213 {
214 Ok(()) => {
215 if self.preserve_data_on_success {
216 match preserve_network_state(&network) {
217 Ok(path) => {
218 println!("Network data directories preserved at {}", path.display());
219 }
220 Err(err) => {
221 eprintln!(
222 "Failed to preserve network data directories after success: {}",
223 err
224 );
225 }
226 }
227 }
228 let detail = format!("success: gateways={}, peers={}", self.gateways, self.peers);
229 run_status.mark("success", Some(&detail));
230 Ok(network)
231 }
232 Err(err) => {
233 if let Err(log_err) = dump_recent_logs(&network) {
234 eprintln!("Failed to dump logs after connectivity error: {}", log_err);
235 }
236 if self.preserve_data_on_failure {
237 match preserve_network_state(&network) {
238 Ok(path) => {
239 eprintln!("Network data directories preserved at {}", path.display());
240 }
241 Err(copy_err) => {
242 eprintln!("Failed to preserve network data directories: {}", copy_err);
243 }
244 }
245 }
246 let detail = err.to_string();
247 run_status.mark("failure", Some(&detail));
248 Err(err)
249 }
250 }
251 }
252
253 pub fn build_sync(self) -> Result<TestNetwork> {
255 tokio::runtime::Runtime::new()?.block_on(self.build())
256 }
257
258 async fn start_peer(
259 &self,
260 binary_path: &PathBuf,
261 index: usize,
262 is_gateway: bool,
263 run_root: &Path,
264 ) -> Result<TestPeer> {
265 self.start_peer_with_gateways(binary_path, index, is_gateway, &[], run_root)
266 .await
267 }
268
269 async fn start_peer_with_gateways(
270 &self,
271 binary_path: &PathBuf,
272 index: usize,
273 is_gateway: bool,
274 gateway_info: &[GatewayInfo],
275 run_root: &Path,
276 ) -> Result<TestPeer> {
277 let location = self
279 .peer_locations
280 .get(&index)
281 .cloned()
282 .unwrap_or_else(|| self.default_location.clone());
283
284 let id = if is_gateway {
285 format!("gw{}", index)
286 } else {
287 format!("peer{}", index)
288 };
289
290 let network_address = match &location {
292 PeerLocation::Local => {
293 let addr_index = index as u32;
294 let second_octet = ((addr_index / 256) % 254 + 1) as u8;
295 let third_octet = (addr_index % 256) as u8;
296 Ipv4Addr::new(127, second_octet, third_octet, 1).to_string()
297 }
298 PeerLocation::Remote(remote) => {
299 remote.discover_public_address()?
301 }
302 };
303
304 let (ws_port, network_port) = match &location {
307 PeerLocation::Local => (get_free_port()?, get_free_port()?),
308 PeerLocation::Remote(_) => (0, 0), };
310
311 let data_dir = create_peer_dir(run_root, &id)?;
312
313 tracing::debug!(
314 "Starting {} {} - ws:{} net:{}",
315 if is_gateway { "gateway" } else { "peer" },
316 id,
317 ws_port,
318 network_port
319 );
320
321 let (keypair_path, public_key_path) = if is_gateway {
323 let keypair = data_dir.join("keypair.pem");
324 let pubkey = data_dir.join("public_key.pem");
325 generate_keypair(&keypair, &pubkey)?;
326 (Some(keypair), Some(pubkey))
327 } else {
328 (None, None)
329 };
330
331 if let PeerLocation::Remote(remote) = &location {
334 let remote_data_dir = remote.remote_work_dir().join(&id);
335
336 if is_gateway {
337 if let Some(keypair) = &keypair_path {
339 let remote_keypair = remote_data_dir.join("keypair.pem");
340 let remote_pubkey = remote_data_dir.join("public_key.pem");
341 remote.scp_upload(keypair, remote_keypair.to_str().unwrap())?;
342 if let Some(pubkey) = &public_key_path {
343 remote.scp_upload(pubkey, remote_pubkey.to_str().unwrap())?;
344 }
345 }
346 }
347
348 if !is_gateway {
350 for gw in gateway_info {
351 let gw_pubkey_name = gw.public_key_path.file_name().ok_or_else(|| {
352 Error::PeerStartupFailed("Invalid gateway pubkey path".to_string())
353 })?;
354 let remote_gw_pubkey = remote_data_dir.join(gw_pubkey_name);
355 remote.scp_upload(&gw.public_key_path, remote_gw_pubkey.to_str().unwrap())?;
356 }
357 }
358 }
359
360 let mut args = vec![
362 "network".to_string(),
363 "--data-dir".to_string(),
364 match &location {
365 PeerLocation::Local => data_dir.to_string_lossy().to_string(),
366 PeerLocation::Remote(remote) => remote
367 .remote_work_dir()
368 .join(&id)
369 .to_string_lossy()
370 .to_string(),
371 },
372 "--config-dir".to_string(),
373 match &location {
374 PeerLocation::Local => data_dir.to_string_lossy().to_string(),
375 PeerLocation::Remote(remote) => remote
376 .remote_work_dir()
377 .join(&id)
378 .to_string_lossy()
379 .to_string(),
380 },
381 "--ws-api-port".to_string(),
382 ws_port.to_string(),
383 "--network-address".to_string(),
384 network_address.clone(),
385 "--network-port".to_string(),
386 network_port.to_string(),
387 "--public-network-address".to_string(),
388 network_address.clone(),
389 "--public-network-port".to_string(),
390 network_port.to_string(),
391 "--skip-load-from-network".to_string(),
392 ];
393
394 if is_gateway {
395 args.push("--is-gateway".to_string());
396 if keypair_path.is_some() {
397 args.push("--transport-keypair".to_string());
398 let keypair_arg = match &location {
399 PeerLocation::Local => {
400 data_dir.join("keypair.pem").to_string_lossy().to_string()
401 }
402 PeerLocation::Remote(remote) => remote
403 .remote_work_dir()
404 .join(&id)
405 .join("keypair.pem")
406 .to_string_lossy()
407 .to_string(),
408 };
409 args.push(keypair_arg);
410 }
411 }
412
413 if !is_gateway && !gateway_info.is_empty() {
415 let gateways_toml = data_dir.join("gateways.toml");
416 let mut content = String::new();
417 for gw in gateway_info {
418 let gw_pubkey_path = match &location {
419 PeerLocation::Local => gw.public_key_path.clone(),
420 PeerLocation::Remote(remote) => {
421 let gw_pubkey_name = gw.public_key_path.file_name().ok_or_else(|| {
422 Error::PeerStartupFailed("Invalid gateway pubkey path".to_string())
423 })?;
424 remote.remote_work_dir().join(&id).join(gw_pubkey_name)
425 }
426 };
427 content.push_str(&format!(
428 "[[gateways]]\n\
429 address = {{ hostname = \"{}\" }}\n\
430 public_key = \"{}\"\n\n",
431 gw.address,
432 gw_pubkey_path.display()
433 ));
434 }
435 std::fs::write(&gateways_toml, content)?;
436
437 if let PeerLocation::Remote(remote) = &location {
439 let remote_gateways_toml = remote.remote_work_dir().join(&id).join("gateways.toml");
440 remote.scp_upload(&gateways_toml, remote_gateways_toml.to_str().unwrap())?;
441 }
442 }
443
444 let env_vars = vec![
446 ("NETWORK_ADDRESS".to_string(), network_address.clone()),
447 (
448 "PUBLIC_NETWORK_ADDRESS".to_string(),
449 network_address.clone(),
450 ),
451 ("PUBLIC_NETWORK_PORT".to_string(), network_port.to_string()),
452 ];
453
454 if let Some(min_conn) = self.min_connections {
455 args.push("--min-number-of-connections".to_string());
456 args.push(min_conn.to_string());
457 }
458 if let Some(max_conn) = self.max_connections {
459 args.push("--max-number-of-connections".to_string());
460 args.push(max_conn.to_string());
461 }
462
463 let process: Box<dyn PeerProcess + Send> = match &location {
465 PeerLocation::Local => Box::new(process::spawn_local_peer(
466 binary_path,
467 &args,
468 &data_dir,
469 &env_vars,
470 )?),
471 PeerLocation::Remote(remote) => {
472 let remote_data_dir = remote.remote_work_dir().join(&id);
473 let local_cache_dir = run_root.join(format!("{}-cache", id));
474 std::fs::create_dir_all(&local_cache_dir)?;
475
476 Box::new(
477 process::spawn_remote_peer(
478 binary_path,
479 &args,
480 remote,
481 &remote_data_dir,
482 &local_cache_dir,
483 &env_vars,
484 )
485 .await?,
486 )
487 }
488 };
489
490 tokio::time::sleep(Duration::from_millis(100)).await;
492
493 Ok(TestPeer {
494 id,
495 is_gateway,
496 ws_port,
497 network_port,
498 network_address,
499 data_dir,
500 process,
501 public_key_path,
502 location,
503 })
504 }
505}
506
507fn resolve_base_dir() -> PathBuf {
508 if let Some(path) = std::env::var_os("FREENET_TEST_NETWORK_BASE_DIR") {
509 PathBuf::from(path)
510 } else if let Ok(home) = std::env::var("HOME") {
511 PathBuf::from(home).join("code/tmp/freenet-test-networks")
512 } else {
513 std::env::temp_dir().join("freenet-test-networks")
514 }
515}
516
517fn cleanup_old_runs(base_dir: &Path, max_runs: usize) -> Result<()> {
518 let mut runs: Vec<(PathBuf, SystemTime)> = fs::read_dir(base_dir)?
519 .filter_map(|entry| {
520 let entry = entry.ok()?;
521 let file_type = entry.file_type().ok()?;
522 if !file_type.is_dir() {
523 return None;
524 }
525 let metadata = entry.metadata().ok()?;
526 let modified = metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH);
527 Some((entry.path(), modified))
528 })
529 .collect();
530
531 if runs.len() <= max_runs {
532 return Ok(());
533 }
534
535 runs.sort_by_key(|(_, modified)| *modified);
536 let remove_count = runs.len() - max_runs;
537 for (path, _) in runs.into_iter().take(remove_count) {
538 if let Err(err) = fs::remove_dir_all(&path) {
539 tracing::warn!(
540 ?err,
541 path = %path.display(),
542 "Failed to remove old freenet test network run directory"
543 );
544 }
545 }
546
547 Ok(())
548}
549
550fn create_run_directory(base_dir: &Path) -> Result<PathBuf> {
551 let timestamp = Utc::now().format("%Y%m%d-%H%M%S").to_string();
552 for attempt in 0..100 {
553 let candidate = if attempt == 0 {
554 base_dir.join(×tamp)
555 } else {
556 base_dir.join(format!("{}-{}", ×tamp, attempt))
557 };
558 if !candidate.exists() {
559 fs::create_dir_all(&candidate)?;
560 return Ok(candidate);
561 }
562 }
563
564 Err(Error::Other(anyhow::anyhow!(
565 "Unable to allocate run directory after repeated attempts"
566 )))
567}
568
569fn create_peer_dir(run_root: &Path, id: &str) -> Result<PathBuf> {
570 let dir = run_root.join(id);
571 fs::create_dir_all(&dir)?;
572 Ok(dir)
573}
574
575struct RunStatusGuard {
576 status_path: PathBuf,
577}
578
579impl RunStatusGuard {
580 fn new(run_root: &Path) -> Self {
581 let status_path = run_root.join("run_status.txt");
582 let _ = fs::write(&status_path, b"status=initializing\n");
583 Self { status_path }
584 }
585
586 fn mark(&mut self, status: &str, detail: Option<&str>) {
587 let mut content = format!("status={}", status);
588 if let Some(detail) = detail {
589 content.push('\n');
590 content.push_str("detail=");
591 content.push_str(detail);
592 }
593 content.push('\n');
594 if let Err(err) = fs::write(&self.status_path, content) {
595 tracing::warn!(
596 ?err,
597 path = %self.status_path.display(),
598 "Failed to write run status"
599 );
600 }
601 }
602}
603
604fn generate_keypair(
605 private_key_path: &std::path::Path,
606 public_key_path: &std::path::Path,
607) -> Result<()> {
608 let output = Command::new("openssl")
610 .args([
611 "genpkey",
612 "-algorithm",
613 "RSA",
614 "-out",
615 private_key_path.to_str().unwrap(),
616 "-pkeyopt",
617 "rsa_keygen_bits:2048",
618 ])
619 .output()?;
620
621 if !output.status.success() {
622 return Err(Error::Other(anyhow::anyhow!(
623 "Failed to generate private key: {}",
624 String::from_utf8_lossy(&output.stderr)
625 )));
626 }
627
628 let output = Command::new("openssl")
630 .args([
631 "rsa",
632 "-pubout",
633 "-in",
634 private_key_path.to_str().unwrap(),
635 "-out",
636 public_key_path.to_str().unwrap(),
637 ])
638 .output()?;
639
640 if !output.status.success() {
641 return Err(Error::Other(anyhow::anyhow!(
642 "Failed to extract public key: {}",
643 String::from_utf8_lossy(&output.stderr)
644 )));
645 }
646
647 Ok(())
648}
649
650fn dump_recent_logs(network: &TestNetwork) -> Result<()> {
651 const MAX_LOG_LINES: usize = 200;
652
653 let mut logs = network.read_logs()?;
654 let total = logs.len();
655 if total > MAX_LOG_LINES {
656 logs.drain(0..(total - MAX_LOG_LINES));
657 }
658
659 eprintln!(
660 "\n--- Network connectivity check failed; showing {} of {} log entries ---",
661 logs.len(),
662 total
663 );
664
665 for entry in logs {
666 let level = entry.level.as_deref().unwrap_or("INFO");
667 let ts_display = entry
668 .timestamp_raw
669 .clone()
670 .or_else(|| entry.timestamp.map(|ts| ts.to_rfc3339()));
671
672 if let Some(ts) = ts_display {
673 eprintln!("[{}] [{}] {}: {}", entry.peer_id, ts, level, entry.message);
674 } else {
675 eprintln!("[{}] {}: {}", entry.peer_id, level, entry.message);
676 }
677 }
678
679 eprintln!("--- End of network logs ---\n");
680
681 Ok(())
682}
683
684fn preserve_network_state(network: &TestNetwork) -> Result<PathBuf> {
685 Ok(network.run_root().to_path_buf())
686}