1use bytes::Bytes;
4use futures_util::StreamExt;
5use indicatif::HumanBytes;
6use rand::RngCore;
7use reqwest::Client;
8use std::sync::Arc;
9use std::sync::OnceLock;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::time::{Duration, Instant};
12use tokio::sync::Barrier;
13use tokio_util::sync::CancellationToken;
14
15use crate::models::{AppConfig, PingStats};
16use crate::theme;
17use crate::utils::{WARMUP_SECS, calculate_mbps, create_spinner, with_retry};
18
19pub async fn test_ping_stats(
20 client: &Client,
21 base_url: &str,
22 count: u32,
23 config: Arc<AppConfig>,
24) -> anyhow::Result<PingStats> {
25 let pb = create_spinner(
26 "Measuring latency & jitter...",
27 &config,
28 "{spinner:.cyan} {msg}",
29 );
30
31 let url = format!("{}/cdn-cgi/trace", base_url);
32 let mut samples: Vec<u128> = Vec::with_capacity(count as usize);
33 let mut lost: u32 = 0;
34
35 for _ in 0..count {
36 let start = Instant::now();
37 match tokio::time::timeout(Duration::from_secs(2), client.head(&url).send()).await {
38 Ok(Ok(_)) => samples.push(start.elapsed().as_millis()),
39 _ => lost += 1,
40 }
41 tokio::time::sleep(Duration::from_millis(50)).await;
42 }
43
44 pb.finish_and_clear();
45
46 if samples.is_empty() {
47 anyhow::bail!("All ping attempts failed — server unreachable");
48 }
49
50 let min_ms = *samples.iter().min().unwrap();
51 let max_ms = *samples.iter().max().unwrap();
52 let avg_ms = samples.iter().sum::<u128>() as f64 / samples.len() as f64;
53
54 let jitter_ms = if samples.len() > 1 {
55 let diffs: Vec<f64> = samples
56 .windows(2)
57 .map(|w| (w[1] as f64 - w[0] as f64).abs())
58 .collect();
59 diffs.iter().sum::<f64>() / diffs.len() as f64
60 } else {
61 0.0
62 };
63
64 let packet_loss_pct = (lost as f64 / count as f64) * 100.0;
65
66 if !config.quiet {
67 println!(
68 "📡 Ping: {} avg | Jitter: {} | Loss: {}\n",
69 theme::color_ping(avg_ms, &config),
70 theme::color_jitter(jitter_ms, &config),
71 theme::color_loss(packet_loss_pct, &config)
72 );
73 }
74
75 Ok(PingStats {
76 min_ms,
77 max_ms,
78 avg_ms,
79 jitter_ms,
80 packet_loss_pct,
81 })
82}
83
84pub async fn test_download(
85 client: &Client,
86 base_url: &str,
87 duration_secs: u64,
88 num_connections: usize,
89 config: Arc<AppConfig>,
90) -> anyhow::Result<f64> {
91 let chunk_size_bytes = 50 * 1024 * 1024;
92 let total_downloaded = Arc::new(AtomicU64::new(0));
93
94 let pb = create_spinner(
95 "Downloading...",
96 &config,
97 "{spinner:.green} [{elapsed_precise}] {msg}",
98 );
99
100 let token = CancellationToken::new();
101 let barrier = Arc::new(Barrier::new(num_connections + 1)); let shared_start: Arc<OnceLock<Instant>> = Arc::new(OnceLock::new());
103 let mut tasks = vec![];
104
105 for _ in 0..num_connections {
107 let client = client.clone();
108 let pb = pb.clone();
109 let total_downloaded = total_downloaded.clone();
110 let url = format!("{}/__down?bytes={}", base_url, chunk_size_bytes);
111 let barrier = barrier.clone();
112 let shared_start = shared_start.clone();
113 let token = token.clone();
114
115 let task = tokio::spawn(async move {
116 barrier.wait().await;
117 let start = *shared_start.get_or_init(Instant::now);
118
119 'request: loop {
120 if token.is_cancelled() {
121 break;
122 }
123
124 let res = match with_retry(3, || async {
125 let r = client.get(&url).send().await?;
126 if !r.status().is_success() {
127 anyhow::bail!("Download failed with status: {}", r.status());
128 }
129 Ok(r)
130 })
131 .await
132 {
133 Ok(r) => r,
134 Err(e) => return Err(e),
135 };
136
137 let mut stream = res.bytes_stream();
138
139 loop {
140 tokio::select! {
141 biased;
142 _ = token.cancelled() => break 'request,
143 item = stream.next() => {
144 match item {
145 Some(Ok(chunk)) => {
146 let len = chunk.len() as u64;
147 pb.inc(len);
148 if start.elapsed().as_secs_f64() >= WARMUP_SECS {
149 total_downloaded.fetch_add(len, Ordering::Relaxed);
150 }
151 }
152 Some(Err(e)) => return Err(e.into()),
153 None => break,
154 }
155 }
156 }
157 }
158 }
159
160 Ok::<(), anyhow::Error>(())
161 });
162
163 tasks.push(task);
164 }
165
166 let display_task = {
168 let pb = pb.clone();
169 let total_downloaded = total_downloaded.clone();
170 let token = token.clone();
171 let config = config.clone();
172 let barrier = barrier.clone();
173
174 tokio::spawn(async move {
175 barrier.wait().await;
176 let mut prev_bytes = 0;
177 let mut prev_instant = Instant::now();
178
179 loop {
180 tokio::select! {
181 _ = token.cancelled() => break,
182 _ = tokio::time::sleep(Duration::from_millis(250)) => {
183 let now_bytes = total_downloaded.load(Ordering::Relaxed);
184 let delta = now_bytes.saturating_sub(prev_bytes);
185 let elapsed = prev_instant.elapsed().as_secs_f64();
186 let speed = calculate_mbps(delta, elapsed);
187
188 let speed_str = if speed == 0.0 && now_bytes == 0 {
189 "↓ --.- Mbps".to_string()
190 } else {
191 format!("↓ {}", theme::color_speed(speed, &config))
192 };
193
194 pb.set_message(format!(
195 "{} {} total",
196 speed_str,
197 HumanBytes(now_bytes)
198 ));
199
200 prev_bytes = now_bytes;
201 prev_instant = Instant::now();
202 }
203 }
204 }
205 })
206 };
207
208 tokio::time::sleep(Duration::from_secs(duration_secs)).await;
209 token.cancel();
210
211 for task in tasks {
212 task.await??;
213 }
214 display_task.await?;
215
216 pb.finish_and_clear();
217
218 let start = shared_start.get().copied().unwrap_or_else(Instant::now);
219 let effective_duration = (start.elapsed().as_secs_f64() - WARMUP_SECS).max(0.0);
220 Ok(calculate_mbps(
221 total_downloaded.load(Ordering::Relaxed),
222 effective_duration,
223 ))
224}
225
226pub async fn test_upload(
227 client: &Client,
228 base_url: &str,
229 duration_secs: u64,
230 num_connections: usize,
231 config: Arc<AppConfig>,
232) -> anyhow::Result<f64> {
233 let chunk_size = 2 * 1024 * 1024;
234 let total_uploaded = Arc::new(AtomicU64::new(0));
235
236 let pb = create_spinner(
237 "Uploading...",
238 &config,
239 "{spinner:.red} [{elapsed_precise}] {msg}",
240 );
241
242 let token = CancellationToken::new();
243 let barrier = Arc::new(Barrier::new(num_connections + 1));
244 let shared_start: Arc<OnceLock<Instant>> = Arc::new(OnceLock::new());
245 let mut tasks = vec![];
246
247 for _ in 0..num_connections {
249 let client = client.clone();
250 let pb = pb.clone();
251 let total_uploaded = total_uploaded.clone();
252 let url = format!("{}/__up", base_url);
253 let barrier = barrier.clone();
254 let shared_start = shared_start.clone();
255 let token = token.clone();
256
257 let task = tokio::spawn(async move {
258 barrier.wait().await;
259 let start = *shared_start.get_or_init(Instant::now);
260
261 let mut raw_payload = vec![0u8; chunk_size];
262 rand::rng().fill_bytes(&mut raw_payload);
263 let payload = Bytes::from(raw_payload);
264
265 loop {
266 if token.is_cancelled() {
267 break;
268 }
269
270 match with_retry(3, || async {
271 let r = client
272 .post(url.clone())
273 .body(payload.clone())
274 .send()
275 .await?;
276 if !r.status().is_success() {
277 anyhow::bail!("Upload failed with status: {}", r.status());
278 }
279 Ok(r)
280 })
281 .await
282 {
283 Ok(_) => {
284 let len = payload.len() as u64;
285 pb.inc(len);
286 if start.elapsed().as_secs_f64() >= WARMUP_SECS {
287 total_uploaded.fetch_add(len, Ordering::Relaxed);
288 }
289 }
290 Err(e) => return Err(e),
291 }
292 }
293
294 Ok::<(), anyhow::Error>(())
295 });
296
297 tasks.push(task);
298 }
299
300 let display_task = {
302 let pb = pb.clone();
303 let total_uploaded = total_uploaded.clone();
304 let token = token.clone();
305 let config = config.clone();
306 let barrier = barrier.clone();
307
308 tokio::spawn(async move {
309 barrier.wait().await;
310 let mut prev_bytes = 0;
311 let mut prev_instant = Instant::now();
312
313 loop {
314 tokio::select! {
315 _ = token.cancelled() => break,
316 _ = tokio::time::sleep(Duration::from_millis(250)) => {
317 let now_bytes = total_uploaded.load(Ordering::Relaxed);
318 let delta = now_bytes.saturating_sub(prev_bytes);
319 let elapsed = prev_instant.elapsed().as_secs_f64();
320 let speed = calculate_mbps(delta, elapsed);
321
322 let speed_str = if speed == 0.0 && now_bytes == 0 {
323 "↑ --.- Mbps".to_string()
324 } else {
325 format!("↑ {}", theme::color_speed(speed, &config))
326 };
327
328 pb.set_message(format!(
329 "{} {} total",
330 speed_str,
331 HumanBytes(now_bytes)
332 ));
333
334 prev_bytes = now_bytes;
335 prev_instant = Instant::now();
336 }
337 }
338 }
339 })
340 };
341
342 tokio::time::sleep(Duration::from_secs(duration_secs)).await;
343 token.cancel();
344
345 for task in tasks {
346 task.await??;
347 }
348 display_task.await?;
349
350 pb.finish_and_clear();
351
352 let start = shared_start.get().copied().unwrap_or_else(Instant::now);
353 let effective_duration = (start.elapsed().as_secs_f64() - WARMUP_SECS).max(0.0);
354 Ok(calculate_mbps(
355 total_uploaded.load(Ordering::Relaxed),
356 effective_duration,
357 ))
358}