freenet_test_network/
builder.rs

1use crate::{
2    binary::FreenetBinary,
3    network::TestNetwork,
4    peer::{get_free_port, TestPeer},
5    process::{self, PeerProcess},
6    remote::{PeerLocation, RemoteMachine},
7    Error, Result,
8};
9use chrono::Utc;
10use std::collections::HashMap;
11use std::fs;
12use std::net::Ipv4Addr;
13use std::path::{Path, PathBuf};
14use std::process::Command;
15use std::time::{Duration, SystemTime};
16
17struct GatewayInfo {
18    address: String,
19    public_key_path: PathBuf,
20}
21
22/// Builder for configuring and creating a test network
23pub struct NetworkBuilder {
24    gateways: usize,
25    peers: usize,
26    binary: FreenetBinary,
27    min_connectivity: f64,
28    connectivity_timeout: Duration,
29    preserve_data_on_failure: bool,
30    preserve_data_on_success: bool,
31    peer_locations: HashMap<usize, PeerLocation>,
32    default_location: PeerLocation,
33    min_connections: Option<usize>,
34    max_connections: Option<usize>,
35}
36
37impl Default for NetworkBuilder {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl NetworkBuilder {
44    pub fn new() -> Self {
45        Self {
46            gateways: 1,
47            peers: 3,
48            binary: FreenetBinary::default(),
49            min_connectivity: 1.0, // Default: require all peers connected
50            connectivity_timeout: Duration::from_secs(30),
51            preserve_data_on_failure: false,
52            preserve_data_on_success: false,
53            peer_locations: HashMap::new(),
54            default_location: PeerLocation::Local,
55            min_connections: None,
56            max_connections: None,
57        }
58    }
59
60    /// Set the number of gateway peers
61    pub fn gateways(mut self, n: usize) -> Self {
62        self.gateways = n;
63        self
64    }
65
66    /// Set the number of regular peers
67    pub fn peers(mut self, n: usize) -> Self {
68        self.peers = n;
69        self
70    }
71
72    /// Set which freenet binary to use
73    pub fn binary(mut self, binary: FreenetBinary) -> Self {
74        self.binary = binary;
75        self
76    }
77
78    /// Set minimum connectivity ratio required (0.0 to 1.0)
79    pub fn require_connectivity(mut self, ratio: f64) -> Self {
80        self.min_connectivity = ratio;
81        self
82    }
83
84    /// Set timeout for connectivity check
85    pub fn connectivity_timeout(mut self, timeout: Duration) -> Self {
86        self.connectivity_timeout = timeout;
87        self
88    }
89
90    /// Override min connections target for all peers.
91    pub fn min_connections(mut self, min: usize) -> Self {
92        self.min_connections = Some(min);
93        self
94    }
95
96    /// Override max connections target for all peers.
97    pub fn max_connections(mut self, max: usize) -> Self {
98        self.max_connections = Some(max);
99        self
100    }
101
102    /// Preserve peer data directories in `/tmp` when network startup fails
103    pub fn preserve_temp_dirs_on_failure(mut self, preserve: bool) -> Self {
104        self.preserve_data_on_failure = preserve;
105        self
106    }
107
108    /// Preserve peer data directories in `/tmp` even when the network boots successfully.
109    pub fn preserve_temp_dirs_on_success(mut self, preserve: bool) -> Self {
110        self.preserve_data_on_success = preserve;
111        self
112    }
113
114    /// Set the location for a specific peer (by index)
115    /// Index 0 is the first gateway, subsequent indices are regular peers
116    pub fn peer_location(mut self, index: usize, location: PeerLocation) -> Self {
117        self.peer_locations.insert(index, location);
118        self
119    }
120
121    /// Set the default location for all peers not explicitly configured
122    pub fn default_location(mut self, location: PeerLocation) -> Self {
123        self.default_location = location;
124        self
125    }
126
127    /// Convenience method to set locations for multiple remote machines
128    /// Distributes peers across the provided machines in round-robin fashion
129    pub fn distribute_across_remotes(mut self, machines: Vec<RemoteMachine>) -> Self {
130        let total_peers = self.gateways + self.peers;
131        for (idx, machine) in (0..total_peers).zip(machines.iter().cycle()) {
132            self.peer_locations
133                .insert(idx, PeerLocation::Remote(machine.clone()));
134        }
135        self
136    }
137
138    /// Build and start the network (async)
139    pub async fn build(self) -> Result<TestNetwork> {
140        let binary_path = self.binary.resolve()?;
141
142        tracing::info!(
143            "Starting test network: {} gateways, {} peers",
144            self.gateways,
145            self.peers
146        );
147
148        let base_dir = resolve_base_dir();
149        fs::create_dir_all(&base_dir)?;
150        cleanup_old_runs(&base_dir, 5)?;
151        let run_root = create_run_directory(&base_dir)?;
152
153        let mut run_status = RunStatusGuard::new(&run_root);
154
155        // Start gateways first
156        let mut gateways = Vec::new();
157        for i in 0..self.gateways {
158            let peer = match self.start_peer(&binary_path, i, true, &run_root).await {
159                Ok(peer) => peer,
160                Err(err) => {
161                    let detail = format!("failed to start gateway {i}: {err}");
162                    run_status.mark("failure", Some(&detail));
163                    return Err(err);
164                }
165            };
166            gateways.push(peer);
167        }
168
169        // Collect gateway info for peers to connect to
170        let gateway_info: Vec<_> = gateways
171            .iter()
172            .map(|gw| GatewayInfo {
173                address: format!("{}:{}", gw.network_address, gw.network_port),
174                public_key_path: gw
175                    .public_key_path
176                    .clone()
177                    .expect("Gateway must have public key"),
178            })
179            .collect();
180
181        // Start regular peers
182        let mut peers = Vec::new();
183        for i in 0..self.peers {
184            let peer = match self
185                .start_peer_with_gateways(
186                    &binary_path,
187                    i + self.gateways,
188                    false,
189                    &gateway_info,
190                    &run_root,
191                )
192                .await
193            {
194                Ok(peer) => peer,
195                Err(err) => {
196                    let detail = format!("failed to start peer {}: {}", i + self.gateways, err);
197                    run_status.mark("failure", Some(&detail));
198                    return Err(err);
199                }
200            };
201            peers.push(peer);
202            if i + 1 < self.peers {
203                tokio::time::sleep(Duration::from_millis(500)).await;
204            }
205        }
206
207        let network = TestNetwork::new(gateways, peers, self.min_connectivity, run_root.clone());
208
209        // Wait for network to be ready
210        match network
211            .wait_until_ready_with_timeout(self.connectivity_timeout)
212            .await
213        {
214            Ok(()) => {
215                if self.preserve_data_on_success {
216                    match preserve_network_state(&network) {
217                        Ok(path) => {
218                            println!("Network data directories preserved at {}", path.display());
219                        }
220                        Err(err) => {
221                            eprintln!(
222                                "Failed to preserve network data directories after success: {}",
223                                err
224                            );
225                        }
226                    }
227                }
228                let detail = format!("success: gateways={}, peers={}", self.gateways, self.peers);
229                run_status.mark("success", Some(&detail));
230                Ok(network)
231            }
232            Err(err) => {
233                if let Err(log_err) = dump_recent_logs(&network) {
234                    eprintln!("Failed to dump logs after connectivity error: {}", log_err);
235                }
236                if self.preserve_data_on_failure {
237                    match preserve_network_state(&network) {
238                        Ok(path) => {
239                            eprintln!("Network data directories preserved at {}", path.display());
240                        }
241                        Err(copy_err) => {
242                            eprintln!("Failed to preserve network data directories: {}", copy_err);
243                        }
244                    }
245                }
246                let detail = err.to_string();
247                run_status.mark("failure", Some(&detail));
248                Err(err)
249            }
250        }
251    }
252
253    /// Build the network synchronously (for use in LazyLock)
254    pub fn build_sync(self) -> Result<TestNetwork> {
255        tokio::runtime::Runtime::new()?.block_on(self.build())
256    }
257
258    async fn start_peer(
259        &self,
260        binary_path: &PathBuf,
261        index: usize,
262        is_gateway: bool,
263        run_root: &Path,
264    ) -> Result<TestPeer> {
265        self.start_peer_with_gateways(binary_path, index, is_gateway, &[], run_root)
266            .await
267    }
268
269    async fn start_peer_with_gateways(
270        &self,
271        binary_path: &PathBuf,
272        index: usize,
273        is_gateway: bool,
274        gateway_info: &[GatewayInfo],
275        run_root: &Path,
276    ) -> Result<TestPeer> {
277        // Get location for this peer
278        let location = self
279            .peer_locations
280            .get(&index)
281            .cloned()
282            .unwrap_or_else(|| self.default_location.clone());
283
284        let id = if is_gateway {
285            format!("gw{}", index)
286        } else {
287            format!("peer{}", index)
288        };
289
290        // Determine network address based on location
291        let network_address = match &location {
292            PeerLocation::Local => {
293                let addr_index = index as u32;
294                let second_octet = ((addr_index / 256) % 254 + 1) as u8;
295                let third_octet = (addr_index % 256) as u8;
296                Ipv4Addr::new(127, second_octet, third_octet, 1).to_string()
297            }
298            PeerLocation::Remote(remote) => {
299                // Discover the public IP address of the remote machine
300                remote.discover_public_address()?
301            }
302        };
303
304        // For local peers, allocate ports locally
305        // For remote peers, use port 0 (let remote OS allocate)
306        let (ws_port, network_port) = match &location {
307            PeerLocation::Local => (get_free_port()?, get_free_port()?),
308            PeerLocation::Remote(_) => (0, 0), // Will be allocated on remote
309        };
310
311        let data_dir = create_peer_dir(run_root, &id)?;
312
313        tracing::debug!(
314            "Starting {} {} - ws:{} net:{}",
315            if is_gateway { "gateway" } else { "peer" },
316            id,
317            ws_port,
318            network_port
319        );
320
321        // Generate keypair if gateway
322        let (keypair_path, public_key_path) = if is_gateway {
323            let keypair = data_dir.join("keypair.pem");
324            let pubkey = data_dir.join("public_key.pem");
325            generate_keypair(&keypair, &pubkey)?;
326            (Some(keypair), Some(pubkey))
327        } else {
328            (None, None)
329        };
330
331        // For remote gateways, we need to upload the keypair
332        // For remote regular peers, we need to upload the gateway public keys
333        if let PeerLocation::Remote(remote) = &location {
334            let remote_data_dir = remote.remote_work_dir().join(&id);
335
336            if is_gateway {
337                // Upload keypair to remote
338                if let Some(keypair) = &keypair_path {
339                    let remote_keypair = remote_data_dir.join("keypair.pem");
340                    let remote_pubkey = remote_data_dir.join("public_key.pem");
341                    remote.scp_upload(keypair, remote_keypair.to_str().unwrap())?;
342                    if let Some(pubkey) = &public_key_path {
343                        remote.scp_upload(pubkey, remote_pubkey.to_str().unwrap())?;
344                    }
345                }
346            }
347
348            // Upload gateway public keys for regular peers
349            if !is_gateway {
350                for gw in gateway_info {
351                    let gw_pubkey_name = gw.public_key_path.file_name().ok_or_else(|| {
352                        Error::PeerStartupFailed("Invalid gateway pubkey path".to_string())
353                    })?;
354                    let remote_gw_pubkey = remote_data_dir.join(gw_pubkey_name);
355                    remote.scp_upload(&gw.public_key_path, remote_gw_pubkey.to_str().unwrap())?;
356                }
357            }
358        }
359
360        // Build command arguments (same for local and remote)
361        let mut args = vec![
362            "network".to_string(),
363            "--data-dir".to_string(),
364            match &location {
365                PeerLocation::Local => data_dir.to_string_lossy().to_string(),
366                PeerLocation::Remote(remote) => remote
367                    .remote_work_dir()
368                    .join(&id)
369                    .to_string_lossy()
370                    .to_string(),
371            },
372            "--config-dir".to_string(),
373            match &location {
374                PeerLocation::Local => data_dir.to_string_lossy().to_string(),
375                PeerLocation::Remote(remote) => remote
376                    .remote_work_dir()
377                    .join(&id)
378                    .to_string_lossy()
379                    .to_string(),
380            },
381            "--ws-api-port".to_string(),
382            ws_port.to_string(),
383            "--network-address".to_string(),
384            network_address.clone(),
385            "--network-port".to_string(),
386            network_port.to_string(),
387            "--public-network-address".to_string(),
388            network_address.clone(),
389            "--public-network-port".to_string(),
390            network_port.to_string(),
391            "--skip-load-from-network".to_string(),
392        ];
393
394        if is_gateway {
395            args.push("--is-gateway".to_string());
396            if keypair_path.is_some() {
397                args.push("--transport-keypair".to_string());
398                let keypair_arg = match &location {
399                    PeerLocation::Local => {
400                        data_dir.join("keypair.pem").to_string_lossy().to_string()
401                    }
402                    PeerLocation::Remote(remote) => remote
403                        .remote_work_dir()
404                        .join(&id)
405                        .join("keypair.pem")
406                        .to_string_lossy()
407                        .to_string(),
408                };
409                args.push(keypair_arg);
410            }
411        }
412
413        // Add gateway addresses for regular peers
414        if !is_gateway && !gateway_info.is_empty() {
415            let gateways_toml = data_dir.join("gateways.toml");
416            let mut content = String::new();
417            for gw in gateway_info {
418                let gw_pubkey_path = match &location {
419                    PeerLocation::Local => gw.public_key_path.clone(),
420                    PeerLocation::Remote(remote) => {
421                        let gw_pubkey_name = gw.public_key_path.file_name().ok_or_else(|| {
422                            Error::PeerStartupFailed("Invalid gateway pubkey path".to_string())
423                        })?;
424                        remote.remote_work_dir().join(&id).join(gw_pubkey_name)
425                    }
426                };
427                content.push_str(&format!(
428                    "[[gateways]]\n\
429                     address = {{ hostname = \"{}\" }}\n\
430                     public_key = \"{}\"\n\n",
431                    gw.address,
432                    gw_pubkey_path.display()
433                ));
434            }
435            std::fs::write(&gateways_toml, content)?;
436
437            // Upload gateways.toml to remote if needed
438            if let PeerLocation::Remote(remote) = &location {
439                let remote_gateways_toml = remote.remote_work_dir().join(&id).join("gateways.toml");
440                remote.scp_upload(&gateways_toml, remote_gateways_toml.to_str().unwrap())?;
441            }
442        }
443
444        // Environment variables
445        let env_vars = vec![
446            ("NETWORK_ADDRESS".to_string(), network_address.clone()),
447            (
448                "PUBLIC_NETWORK_ADDRESS".to_string(),
449                network_address.clone(),
450            ),
451            ("PUBLIC_NETWORK_PORT".to_string(), network_port.to_string()),
452        ];
453
454        if let Some(min_conn) = self.min_connections {
455            args.push("--min-number-of-connections".to_string());
456            args.push(min_conn.to_string());
457        }
458        if let Some(max_conn) = self.max_connections {
459            args.push("--max-number-of-connections".to_string());
460            args.push(max_conn.to_string());
461        }
462
463        // Spawn process (local or remote)
464        let process: Box<dyn PeerProcess + Send> = match &location {
465            PeerLocation::Local => Box::new(process::spawn_local_peer(
466                binary_path,
467                &args,
468                &data_dir,
469                &env_vars,
470            )?),
471            PeerLocation::Remote(remote) => {
472                let remote_data_dir = remote.remote_work_dir().join(&id);
473                let local_cache_dir = run_root.join(format!("{}-cache", id));
474                std::fs::create_dir_all(&local_cache_dir)?;
475
476                Box::new(
477                    process::spawn_remote_peer(
478                        binary_path,
479                        &args,
480                        remote,
481                        &remote_data_dir,
482                        &local_cache_dir,
483                        &env_vars,
484                    )
485                    .await?,
486                )
487            }
488        };
489
490        // Give it a moment to start
491        tokio::time::sleep(Duration::from_millis(100)).await;
492
493        Ok(TestPeer {
494            id,
495            is_gateway,
496            ws_port,
497            network_port,
498            network_address,
499            data_dir,
500            process,
501            public_key_path,
502            location,
503        })
504    }
505}
506
507fn resolve_base_dir() -> PathBuf {
508    if let Some(path) = std::env::var_os("FREENET_TEST_NETWORK_BASE_DIR") {
509        PathBuf::from(path)
510    } else if let Ok(home) = std::env::var("HOME") {
511        PathBuf::from(home).join("code/tmp/freenet-test-networks")
512    } else {
513        std::env::temp_dir().join("freenet-test-networks")
514    }
515}
516
517fn cleanup_old_runs(base_dir: &Path, max_runs: usize) -> Result<()> {
518    let mut runs: Vec<(PathBuf, SystemTime)> = fs::read_dir(base_dir)?
519        .filter_map(|entry| {
520            let entry = entry.ok()?;
521            let file_type = entry.file_type().ok()?;
522            if !file_type.is_dir() {
523                return None;
524            }
525            let metadata = entry.metadata().ok()?;
526            let modified = metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH);
527            Some((entry.path(), modified))
528        })
529        .collect();
530
531    if runs.len() <= max_runs {
532        return Ok(());
533    }
534
535    runs.sort_by_key(|(_, modified)| *modified);
536    let remove_count = runs.len() - max_runs;
537    for (path, _) in runs.into_iter().take(remove_count) {
538        if let Err(err) = fs::remove_dir_all(&path) {
539            tracing::warn!(
540                ?err,
541                path = %path.display(),
542                "Failed to remove old freenet test network run directory"
543            );
544        }
545    }
546
547    Ok(())
548}
549
550fn create_run_directory(base_dir: &Path) -> Result<PathBuf> {
551    let timestamp = Utc::now().format("%Y%m%d-%H%M%S").to_string();
552    for attempt in 0..100 {
553        let candidate = if attempt == 0 {
554            base_dir.join(&timestamp)
555        } else {
556            base_dir.join(format!("{}-{}", &timestamp, attempt))
557        };
558        if !candidate.exists() {
559            fs::create_dir_all(&candidate)?;
560            return Ok(candidate);
561        }
562    }
563
564    Err(Error::Other(anyhow::anyhow!(
565        "Unable to allocate run directory after repeated attempts"
566    )))
567}
568
569fn create_peer_dir(run_root: &Path, id: &str) -> Result<PathBuf> {
570    let dir = run_root.join(id);
571    fs::create_dir_all(&dir)?;
572    Ok(dir)
573}
574
575struct RunStatusGuard {
576    status_path: PathBuf,
577}
578
579impl RunStatusGuard {
580    fn new(run_root: &Path) -> Self {
581        let status_path = run_root.join("run_status.txt");
582        let _ = fs::write(&status_path, b"status=initializing\n");
583        Self { status_path }
584    }
585
586    fn mark(&mut self, status: &str, detail: Option<&str>) {
587        let mut content = format!("status={}", status);
588        if let Some(detail) = detail {
589            content.push('\n');
590            content.push_str("detail=");
591            content.push_str(detail);
592        }
593        content.push('\n');
594        if let Err(err) = fs::write(&self.status_path, content) {
595            tracing::warn!(
596                ?err,
597                path = %self.status_path.display(),
598                "Failed to write run status"
599            );
600        }
601    }
602}
603
604fn generate_keypair(
605    private_key_path: &std::path::Path,
606    public_key_path: &std::path::Path,
607) -> Result<()> {
608    // Generate private key
609    let output = Command::new("openssl")
610        .args([
611            "genpkey",
612            "-algorithm",
613            "RSA",
614            "-out",
615            private_key_path.to_str().unwrap(),
616            "-pkeyopt",
617            "rsa_keygen_bits:2048",
618        ])
619        .output()?;
620
621    if !output.status.success() {
622        return Err(Error::Other(anyhow::anyhow!(
623            "Failed to generate private key: {}",
624            String::from_utf8_lossy(&output.stderr)
625        )));
626    }
627
628    // Extract public key
629    let output = Command::new("openssl")
630        .args([
631            "rsa",
632            "-pubout",
633            "-in",
634            private_key_path.to_str().unwrap(),
635            "-out",
636            public_key_path.to_str().unwrap(),
637        ])
638        .output()?;
639
640    if !output.status.success() {
641        return Err(Error::Other(anyhow::anyhow!(
642            "Failed to extract public key: {}",
643            String::from_utf8_lossy(&output.stderr)
644        )));
645    }
646
647    Ok(())
648}
649
650fn dump_recent_logs(network: &TestNetwork) -> Result<()> {
651    const MAX_LOG_LINES: usize = 200;
652
653    let mut logs = network.read_logs()?;
654    let total = logs.len();
655    if total > MAX_LOG_LINES {
656        logs.drain(0..(total - MAX_LOG_LINES));
657    }
658
659    eprintln!(
660        "\n--- Network connectivity check failed; showing {} of {} log entries ---",
661        logs.len(),
662        total
663    );
664
665    for entry in logs {
666        let level = entry.level.as_deref().unwrap_or("INFO");
667        let ts_display = entry
668            .timestamp_raw
669            .clone()
670            .or_else(|| entry.timestamp.map(|ts| ts.to_rfc3339()));
671
672        if let Some(ts) = ts_display {
673            eprintln!("[{}] [{}] {}: {}", entry.peer_id, ts, level, entry.message);
674        } else {
675            eprintln!("[{}] {}: {}", entry.peer_id, level, entry.message);
676        }
677    }
678
679    eprintln!("--- End of network logs ---\n");
680
681    Ok(())
682}
683
684fn preserve_network_state(network: &TestNetwork) -> Result<PathBuf> {
685    Ok(network.run_root().to_path_buf())
686}