Skip to main content

netspeed_cli/
upload.rs

1//! Multi-stream upload bandwidth measurement.
2//!
3//! This module handles uploading test data to speedtest.net servers
4//! to measure upload bandwidth. It supports:
5//! - Multi-stream concurrent uploads (4 streams by default, 1 with `--single`)
6//! - Progressive payload sizing for accurate measurement
7//! - Real-time progress tracking with speed calculation
8//! - Peak speed detection through periodic sampling
9
10#![allow(
11    clippy::cast_precision_loss,
12    clippy::cast_possible_truncation,
13    clippy::cast_sign_loss
14)]
15
16use crate::common;
17use crate::error::SpeedtestError;
18use crate::progress::SpeedProgress;
19use crate::types::Server;
20use reqwest::Client;
21use std::sync::Arc;
22use std::sync::Mutex;
23use std::sync::atomic::{AtomicU64, Ordering};
24use std::time::Instant;
25
26/// Build upload URL
27#[must_use]
28pub fn build_upload_url(server_url: &str) -> String {
29    format!("{server_url}/upload")
30}
31
32fn generate_upload_data(size: usize) -> Vec<u8> {
33    let mut data = vec![0u8; size];
34    for (i, byte) in data.iter_mut().enumerate() {
35        *byte = (i % 256) as u8;
36    }
37    data
38}
39
40/// Interval between speed samples in milliseconds.
41/// Throttling prevents excessive sampling overhead on all hot-path operations.
42const SAMPLE_INTERVAL_MS: u64 = 50; // 50ms between samples (20 Hz max)
43
44/// Number of upload rounds per stream (each round uploads a chunk of test data).
45const UPLOAD_TEST_ROUNDS: usize = 4;
46
47/// Run upload bandwidth test against the given server.
48///
49/// Returns `(avg_speed_bps, peak_speed_bps, total_bytes_uploaded, speed_samples)`.
50///
51/// # Errors
52///
53/// Returns [`SpeedtestError::NetworkError`] if all upload streams fail.
54pub async fn upload_test(
55    client: &Client,
56    server: &Server,
57    single: bool,
58    progress: Arc<SpeedProgress>,
59) -> Result<(f64, f64, u64, Vec<f64>), SpeedtestError> {
60    let concurrent_uploads = common::determine_stream_count(single);
61    let total_bytes = Arc::new(AtomicU64::new(0));
62    let peak_bps = Arc::new(AtomicU64::new(0));
63    let speed_samples = Arc::new(Mutex::new(Vec::new()));
64    let start = Instant::now();
65
66    let upload_data = generate_upload_data(200_000); // 200KB chunks
67    let estimated_total: u64 = 4_000_000; // 4 MB estimate
68
69    // Throttle gate: tracks last sample time in millis to limit all expensive ops to 20 Hz.
70    // Initialized to 0 so the first upload always triggers a sample (any elapsed > 0 fires).
71    let last_sample_ms = Arc::new(AtomicU64::new(0));
72
73    let mut handles = Vec::new();
74
75    for _ in 0..concurrent_uploads {
76        let client = client.clone();
77        let server_url = server.url.clone();
78        let data = upload_data.clone();
79        let total_ref = Arc::clone(&total_bytes);
80        let peak_ref = Arc::clone(&peak_bps);
81        let samples_ref = Arc::clone(&speed_samples);
82        let start_ref = start;
83        let prog = Arc::clone(&progress);
84        let throttle_ref = Arc::clone(&last_sample_ms);
85
86        let handle = tokio::spawn(async move {
87            let mut uploaded_bytes = 0u64;
88
89            for _ in 0..UPLOAD_TEST_ROUNDS {
90                let upload_url = build_upload_url(&server_url);
91
92                if client
93                    .post(&upload_url)
94                    .body(data.clone())
95                    .send()
96                    .await
97                    .is_ok()
98                {
99                    let chunk = data.len() as u64;
100                    uploaded_bytes += chunk;
101                    // Cheap atomic add — runs on every upload
102                    total_ref.fetch_add(chunk, Ordering::Relaxed);
103
104                    // Throttle gate: only run expensive ops every 50ms.
105                    // First sample always fires (last_sample_ms == 0 means "never sampled").
106                    let elapsed_ms = start_ref.elapsed().as_millis() as u64;
107                    let last_ms = throttle_ref.load(Ordering::Relaxed);
108                    let should_sample =
109                        last_ms == 0 || elapsed_ms.saturating_sub(last_ms) >= SAMPLE_INTERVAL_MS;
110                    if should_sample {
111                        // Update throttle timestamp
112                        throttle_ref.store(elapsed_ms, Ordering::Relaxed);
113
114                        // All expensive ops now run at most every 50ms:
115                        // Acquire ensures we see the latest fetch_add results on ARM64.
116                        let total_so_far = total_ref.load(Ordering::Acquire);
117                        let elapsed = start_ref.elapsed().as_secs_f64();
118                        let speed = common::calculate_bandwidth(total_so_far, elapsed);
119
120                        // Peak tracking
121                        let current_peak = peak_ref.load(Ordering::Relaxed);
122                        if speed > current_peak as f64 {
123                            peak_ref.store(speed as u64, Ordering::Relaxed);
124                        }
125
126                        // Record speed sample (throttled)
127                        if let Ok(mut samples) = samples_ref.lock() {
128                            samples.push(speed);
129                        }
130
131                        let pct = (total_so_far as f64 / estimated_total as f64).min(1.0);
132                        prog.update(speed / 1_000_000.0, pct, total_so_far);
133                    }
134                }
135            }
136
137            uploaded_bytes
138        });
139
140        handles.push(handle);
141    }
142
143    // Collect results — log any task panics so failures aren't silently swallowed.
144    // Bytes are already counted via atomic counters, so we don't need the return values.
145    for (i, handle) in handles.into_iter().enumerate() {
146        if let Err(e) = handle.await {
147            eprintln!("Warning: upload task {i} failed: {e}");
148        }
149    }
150
151    let final_total_bytes = total_bytes.load(Ordering::Relaxed);
152    let final_peak_speed = peak_bps.load(Ordering::Relaxed) as f64;
153    let elapsed = start.elapsed().as_secs_f64();
154    let samples = speed_samples.lock().unwrap().to_vec();
155    Ok((
156        common::calculate_bandwidth(final_total_bytes, elapsed),
157        final_peak_speed,
158        final_total_bytes,
159        samples,
160    ))
161}
162
163#[cfg(test)]
164mod tests {
165    use crate::common;
166
167    use super::*;
168
169    #[test]
170    fn test_upload_bandwidth_calculation() {
171        let result = common::calculate_bandwidth(1_000_000, 2.0);
172        assert_eq!(result, 4_000_000.0);
173    }
174
175    #[test]
176    fn test_upload_bandwidth_zero_elapsed() {
177        let result = common::calculate_bandwidth(1_000_000, 0.0);
178        assert_eq!(result, 0.0);
179    }
180
181    #[test]
182    fn test_upload_concurrent_count_single() {
183        assert_eq!(common::determine_stream_count(true), 1);
184    }
185
186    #[test]
187    fn test_upload_concurrent_count_multiple() {
188        assert_eq!(common::determine_stream_count(false), 4);
189    }
190
191    #[test]
192    fn test_upload_url_generation() {
193        let url = build_upload_url("http://server.example.com");
194        assert!(url.ends_with("/upload"));
195    }
196
197    #[test]
198    fn test_upload_url_generation_full_path() {
199        let url = build_upload_url("http://server.example.com/speedtest");
200        assert_eq!(url, "http://server.example.com/speedtest/upload");
201    }
202
203    #[test]
204    fn test_generate_upload_data_size() {
205        let data = generate_upload_data(1000);
206        assert_eq!(data.len(), 1000);
207    }
208
209    #[test]
210    fn test_generate_upload_data_pattern() {
211        let data = generate_upload_data(300);
212        for (i, &byte) in data.iter().enumerate() {
213            assert_eq!(byte, (i % 256) as u8);
214        }
215    }
216
217    #[test]
218    fn test_generate_upload_data_wraps_at_256() {
219        let data = generate_upload_data(512);
220        assert_eq!(data[0], 0u8);
221        assert_eq!(data[255], 255u8);
222        assert_eq!(data[256], 0u8);
223        assert_eq!(data[511], 255u8);
224    }
225
226    #[test]
227    fn test_generate_upload_data_empty() {
228        let data = generate_upload_data(0);
229        assert!(data.is_empty());
230    }
231
232    #[test]
233    fn test_upload_data_size_constant() {
234        // Verify the upload data size used in upload_test (200KB)
235        let data = generate_upload_data(200_000);
236        assert_eq!(data.len(), 200_000);
237    }
238}