freenet_test_network/
network.rs1use crate::{peer::TestPeer, Error, Result};
2use freenet_stdlib::client_api::{ClientRequest, HostResponse, NodeQuery, QueryResponse, WebApi};
3use serde::{Deserialize, Serialize};
4use std::path::PathBuf;
5use std::time::Duration;
6
7pub struct TestNetwork {
9 pub(crate) gateways: Vec<TestPeer>,
10 pub(crate) peers: Vec<TestPeer>,
11 pub(crate) min_connectivity: f64,
12 pub(crate) run_root: PathBuf,
13}
14
15impl TestNetwork {
16 pub fn builder() -> crate::builder::NetworkBuilder {
18 crate::builder::NetworkBuilder::new()
19 }
20
21 pub fn gateway(&self, index: usize) -> &TestPeer {
23 &self.gateways[index]
24 }
25
26 pub fn peer(&self, index: usize) -> &TestPeer {
28 &self.peers[index]
29 }
30
31 pub fn gateway_ws_urls(&self) -> Vec<String> {
33 self.gateways.iter().map(|p| p.ws_url()).collect()
34 }
35
36 pub fn peer_ws_urls(&self) -> Vec<String> {
38 self.peers.iter().map(|p| p.ws_url()).collect()
39 }
40
41 pub async fn wait_until_ready(&self) -> Result<()> {
46 self.wait_until_ready_with_timeout(Duration::from_secs(30))
47 .await
48 }
49
50 pub async fn wait_until_ready_with_timeout(&self, timeout: Duration) -> Result<()> {
52 let start = std::time::Instant::now();
53
54 tracing::info!(
55 "Waiting for network connectivity (timeout: {}s, required: {}%)",
56 timeout.as_secs(),
57 (self.min_connectivity * 100.0) as u8
58 );
59
60 loop {
61 if start.elapsed() > timeout {
62 return Err(Error::ConnectivityFailed(format!(
63 "Network did not reach {}% connectivity within {}s",
64 (self.min_connectivity * 100.0) as u8,
65 timeout.as_secs()
66 )));
67 }
68
69 match self.check_connectivity().await {
71 Ok(ratio) if ratio >= self.min_connectivity => {
72 tracing::info!("Network ready: {:.1}% connectivity", ratio * 100.0);
73 return Ok(());
74 }
75 Ok(ratio) => {
76 tracing::debug!("Network connectivity: {:.1}%", ratio * 100.0);
77 }
78 Err(e) => {
79 tracing::debug!("Connectivity check failed: {}", e);
80 }
81 }
82
83 tokio::time::sleep(Duration::from_millis(500)).await;
84 }
85 }
86
87 async fn check_connectivity(&self) -> Result<f64> {
89 let all_peers: Vec<_> = self.gateways.iter().chain(self.peers.iter()).collect();
90 let total = all_peers.len();
91
92 if total == 0 {
93 return Ok(1.0);
94 }
95
96 let mut connected_count = 0;
97
98 for peer in &all_peers {
99 match self.query_peer_connections(peer).await {
100 Ok(0) => {
101 tracing::trace!("{} has no connections (isolated)", peer.id());
102 }
103 Ok(connections) => {
104 connected_count += 1;
105 tracing::trace!("{} has {} connections", peer.id(), connections);
106 }
107 Err(e) => {
108 tracing::debug!("Failed to query {}: {}", peer.id(), e);
109 }
110 }
111 }
112
113 let ratio = connected_count as f64 / total as f64;
114 Ok(ratio)
115 }
116
117 async fn query_peer_connections(&self, peer: &TestPeer) -> Result<usize> {
119 use tokio_tungstenite::connect_async;
120
121 let url = format!("{}?encodingProtocol=native", peer.ws_url());
122 let (ws_stream, _) =
123 tokio::time::timeout(std::time::Duration::from_secs(5), connect_async(&url))
124 .await
125 .map_err(|_| Error::ConnectivityFailed(format!("Timeout connecting to {}", url)))?
126 .map_err(|e| {
127 Error::ConnectivityFailed(format!("Failed to connect to {}: {}", url, e))
128 })?;
129
130 let mut client = WebApi::start(ws_stream);
131
132 client
133 .send(ClientRequest::NodeQueries(NodeQuery::ConnectedPeers))
134 .await
135 .map_err(|e| Error::ConnectivityFailed(format!("Failed to send query: {}", e)))?;
136
137 let response = tokio::time::timeout(std::time::Duration::from_secs(5), client.recv())
138 .await
139 .map_err(|_| Error::ConnectivityFailed("Timeout waiting for response".into()))?;
140
141 let result = match response {
142 Ok(HostResponse::QueryResponse(QueryResponse::ConnectedPeers { peers })) => {
143 Ok(peers.len())
144 }
145 Ok(other) => Err(Error::ConnectivityFailed(format!(
146 "Unexpected response: {:?}",
147 other
148 ))),
149 Err(e) => Err(Error::ConnectivityFailed(format!("Query failed: {}", e))),
150 };
151
152 client.disconnect("connectivity probe").await;
153
154 result
155 }
156
157 pub async fn topology(&self) -> Result<NetworkTopology> {
159 Ok(NetworkTopology {
161 peers: vec![],
162 connections: vec![],
163 })
164 }
165
166 pub fn export_for_viz(&self) -> String {
168 let peers: Vec<_> = self
169 .gateways
170 .iter()
171 .chain(self.peers.iter())
172 .map(|p| {
173 serde_json::json!({
174 "id": p.id(),
175 "is_gateway": p.is_gateway(),
176 "ws_port": p.ws_port,
177 "network_port": p.network_port,
178 })
179 })
180 .collect();
181
182 serde_json::to_string_pretty(&serde_json::json!({
183 "peers": peers
184 }))
185 .unwrap_or_default()
186 }
187}
188
189impl TestNetwork {
190 pub(crate) fn new(
191 gateways: Vec<TestPeer>,
192 peers: Vec<TestPeer>,
193 min_connectivity: f64,
194 run_root: PathBuf,
195 ) -> Self {
196 Self {
197 gateways,
198 peers,
199 min_connectivity,
200 run_root,
201 }
202 }
203
204 pub fn run_root(&self) -> &std::path::Path {
206 &self.run_root
207 }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct NetworkTopology {
213 pub peers: Vec<PeerInfo>,
214 pub connections: Vec<Connection>,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct PeerInfo {
220 pub id: String,
221 pub is_gateway: bool,
222 pub ws_port: u16,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct Connection {
228 pub from: String,
229 pub to: String,
230}