Skip to main content

cli_speedtest/
client.rs

1// src/client.rs
2
3use bytes::Bytes;
4use futures_util::StreamExt;
5use indicatif::HumanBytes;
6use rand::RngCore;
7use reqwest::Client;
8use std::sync::Arc;
9use std::sync::OnceLock;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::time::{Duration, Instant};
12use tokio::sync::Barrier;
13use tokio_util::sync::CancellationToken;
14
15use crate::models::{AppConfig, PingStats};
16use crate::theme;
17use crate::utils::{WARMUP_SECS, calculate_mbps, create_spinner, with_retry};
18
19pub async fn test_ping_stats(
20    client: &Client,
21    base_url: &str,
22    count: u32,
23    config: Arc<AppConfig>,
24) -> anyhow::Result<PingStats> {
25    let pb = create_spinner(
26        "Measuring latency & jitter...",
27        &config,
28        "{spinner:.cyan} {msg}",
29    );
30
31    let url = format!("{}/cdn-cgi/trace", base_url);
32    let mut samples: Vec<u128> = Vec::with_capacity(count as usize);
33    let mut lost: u32 = 0;
34
35    for _ in 0..count {
36        let start = Instant::now();
37        match tokio::time::timeout(Duration::from_secs(2), client.head(&url).send()).await {
38            Ok(Ok(_)) => samples.push(start.elapsed().as_millis()),
39            _ => lost += 1,
40        }
41        tokio::time::sleep(Duration::from_millis(50)).await;
42    }
43
44    pb.finish_and_clear();
45
46    if samples.is_empty() {
47        anyhow::bail!("All ping attempts failed — server unreachable");
48    }
49
50    let min_ms = *samples.iter().min().unwrap();
51    let max_ms = *samples.iter().max().unwrap();
52    let avg_ms = samples.iter().sum::<u128>() as f64 / samples.len() as f64;
53
54    let jitter_ms = if samples.len() > 1 {
55        let diffs: Vec<f64> = samples
56            .windows(2)
57            .map(|w| (w[1] as f64 - w[0] as f64).abs())
58            .collect();
59        diffs.iter().sum::<f64>() / diffs.len() as f64
60    } else {
61        0.0
62    };
63
64    let packet_loss_pct = (lost as f64 / count as f64) * 100.0;
65
66    if !config.quiet {
67        println!(
68            "📡 Ping: {} avg  |  Jitter: {}  |  Loss: {}\n",
69            theme::color_ping(avg_ms, &config),
70            theme::color_jitter(jitter_ms, &config),
71            theme::color_loss(packet_loss_pct, &config)
72        );
73    }
74
75    Ok(PingStats {
76        min_ms,
77        max_ms,
78        avg_ms,
79        jitter_ms,
80        packet_loss_pct,
81    })
82}
83
84pub async fn test_download(
85    client: &Client,
86    base_url: &str,
87    duration_secs: u64,
88    num_connections: usize,
89    config: Arc<AppConfig>,
90) -> anyhow::Result<f64> {
91    let chunk_size_bytes = 50 * 1024 * 1024;
92    let total_downloaded = Arc::new(AtomicU64::new(0));
93
94    let pb = create_spinner(
95        "Downloading...",
96        &config,
97        "{spinner:.green} [{elapsed_precise}] {msg}",
98    );
99
100    let token = CancellationToken::new();
101    let barrier = Arc::new(Barrier::new(num_connections + 1)); // +1 for the display task
102    let shared_start: Arc<OnceLock<Instant>> = Arc::new(OnceLock::new());
103    let mut tasks = vec![];
104
105    // Worker tasks
106    for _ in 0..num_connections {
107        let client = client.clone();
108        let pb = pb.clone();
109        let total_downloaded = total_downloaded.clone();
110        let url = format!("{}/__down?bytes={}", base_url, chunk_size_bytes);
111        let barrier = barrier.clone();
112        let shared_start = shared_start.clone();
113        let token = token.clone();
114
115        let task = tokio::spawn(async move {
116            barrier.wait().await;
117            let start = *shared_start.get_or_init(Instant::now);
118
119            'request: loop {
120                if token.is_cancelled() {
121                    break;
122                }
123
124                let res = match with_retry(3, || async {
125                    let r = client.get(&url).send().await?;
126                    if !r.status().is_success() {
127                        anyhow::bail!("Download failed with status: {}", r.status());
128                    }
129                    Ok(r)
130                })
131                .await
132                {
133                    Ok(r) => r,
134                    Err(e) => return Err(e),
135                };
136
137                let mut stream = res.bytes_stream();
138
139                loop {
140                    tokio::select! {
141                        biased;
142                        _ = token.cancelled() => break 'request,
143                        item = stream.next() => {
144                            match item {
145                                Some(Ok(chunk)) => {
146                                    let len = chunk.len() as u64;
147                                    pb.inc(len);
148                                    if start.elapsed().as_secs_f64() >= WARMUP_SECS {
149                                        total_downloaded.fetch_add(len, Ordering::Relaxed);
150                                    }
151                                }
152                                Some(Err(e)) => return Err(e.into()),
153                                None => break,
154                            }
155                        }
156                    }
157                }
158            }
159
160            Ok::<(), anyhow::Error>(())
161        });
162
163        tasks.push(task);
164    }
165
166    // Display task
167    let display_task = {
168        let pb = pb.clone();
169        let total_downloaded = total_downloaded.clone();
170        let token = token.clone();
171        let config = config.clone();
172        let barrier = barrier.clone();
173
174        tokio::spawn(async move {
175            barrier.wait().await;
176            let mut prev_bytes = 0;
177            let mut prev_instant = Instant::now();
178
179            loop {
180                tokio::select! {
181                    _ = token.cancelled() => break,
182                    _ = tokio::time::sleep(Duration::from_millis(250)) => {
183                        let now_bytes = total_downloaded.load(Ordering::Relaxed);
184                        let delta = now_bytes.saturating_sub(prev_bytes);
185                        let elapsed = prev_instant.elapsed().as_secs_f64();
186                        let speed = calculate_mbps(delta, elapsed);
187
188                        let speed_str = if speed == 0.0 && now_bytes == 0 {
189                            "↓  --.- Mbps".to_string()
190                        } else {
191                            format!("↓  {}", theme::color_speed(speed, &config))
192                        };
193
194                        pb.set_message(format!(
195                            "{}    {} total",
196                            speed_str,
197                            HumanBytes(now_bytes)
198                        ));
199
200                        prev_bytes = now_bytes;
201                        prev_instant = Instant::now();
202                    }
203                }
204            }
205        })
206    };
207
208    tokio::time::sleep(Duration::from_secs(duration_secs)).await;
209    token.cancel();
210
211    for task in tasks {
212        task.await??;
213    }
214    display_task.await?;
215
216    pb.finish_and_clear();
217
218    let start = shared_start.get().copied().unwrap_or_else(Instant::now);
219    let effective_duration = (start.elapsed().as_secs_f64() - WARMUP_SECS).max(0.0);
220    Ok(calculate_mbps(
221        total_downloaded.load(Ordering::Relaxed),
222        effective_duration,
223    ))
224}
225
226pub async fn test_upload(
227    client: &Client,
228    base_url: &str,
229    duration_secs: u64,
230    num_connections: usize,
231    config: Arc<AppConfig>,
232) -> anyhow::Result<f64> {
233    let chunk_size = 2 * 1024 * 1024;
234    let total_uploaded = Arc::new(AtomicU64::new(0));
235
236    let pb = create_spinner(
237        "Uploading...",
238        &config,
239        "{spinner:.red} [{elapsed_precise}] {msg}",
240    );
241
242    let token = CancellationToken::new();
243    let barrier = Arc::new(Barrier::new(num_connections + 1));
244    let shared_start: Arc<OnceLock<Instant>> = Arc::new(OnceLock::new());
245    let mut tasks = vec![];
246
247    // Worker tasks
248    for _ in 0..num_connections {
249        let client = client.clone();
250        let pb = pb.clone();
251        let total_uploaded = total_uploaded.clone();
252        let url = format!("{}/__up", base_url);
253        let barrier = barrier.clone();
254        let shared_start = shared_start.clone();
255        let token = token.clone();
256
257        let task = tokio::spawn(async move {
258            barrier.wait().await;
259            let start = *shared_start.get_or_init(Instant::now);
260
261            let mut raw_payload = vec![0u8; chunk_size];
262            rand::rng().fill_bytes(&mut raw_payload);
263            let payload = Bytes::from(raw_payload);
264
265            loop {
266                if token.is_cancelled() {
267                    break;
268                }
269
270                match with_retry(3, || async {
271                    let r = client
272                        .post(url.clone())
273                        .body(payload.clone())
274                        .send()
275                        .await?;
276                    if !r.status().is_success() {
277                        anyhow::bail!("Upload failed with status: {}", r.status());
278                    }
279                    Ok(r)
280                })
281                .await
282                {
283                    Ok(_) => {
284                        let len = payload.len() as u64;
285                        pb.inc(len);
286                        if start.elapsed().as_secs_f64() >= WARMUP_SECS {
287                            total_uploaded.fetch_add(len, Ordering::Relaxed);
288                        }
289                    }
290                    Err(e) => return Err(e),
291                }
292            }
293
294            Ok::<(), anyhow::Error>(())
295        });
296
297        tasks.push(task);
298    }
299
300    // Display task
301    let display_task = {
302        let pb = pb.clone();
303        let total_uploaded = total_uploaded.clone();
304        let token = token.clone();
305        let config = config.clone();
306        let barrier = barrier.clone();
307
308        tokio::spawn(async move {
309            barrier.wait().await;
310            let mut prev_bytes = 0;
311            let mut prev_instant = Instant::now();
312
313            loop {
314                tokio::select! {
315                    _ = token.cancelled() => break,
316                    _ = tokio::time::sleep(Duration::from_millis(250)) => {
317                        let now_bytes = total_uploaded.load(Ordering::Relaxed);
318                        let delta = now_bytes.saturating_sub(prev_bytes);
319                        let elapsed = prev_instant.elapsed().as_secs_f64();
320                        let speed = calculate_mbps(delta, elapsed);
321
322                        let speed_str = if speed == 0.0 && now_bytes == 0 {
323                            "↑  --.- Mbps".to_string()
324                        } else {
325                            format!("↑  {}", theme::color_speed(speed, &config))
326                        };
327
328                        pb.set_message(format!(
329                            "{}    {} total",
330                            speed_str,
331                            HumanBytes(now_bytes)
332                        ));
333
334                        prev_bytes = now_bytes;
335                        prev_instant = Instant::now();
336                    }
337                }
338            }
339        })
340    };
341
342    tokio::time::sleep(Duration::from_secs(duration_secs)).await;
343    token.cancel();
344
345    for task in tasks {
346        task.await??;
347    }
348    display_task.await?;
349
350    pb.finish_and_clear();
351
352    let start = shared_start.get().copied().unwrap_or_else(Instant::now);
353    let effective_duration = (start.elapsed().as_secs_f64() - WARMUP_SECS).max(0.0);
354    Ok(calculate_mbps(
355        total_uploaded.load(Ordering::Relaxed),
356        effective_duration,
357    ))
358}