1#![allow(
11 clippy::cast_precision_loss,
12 clippy::cast_possible_truncation,
13 clippy::cast_sign_loss
14)]
15
16use crate::common;
17use crate::error::SpeedtestError;
18use crate::progress::SpeedProgress;
19use crate::types::Server;
20use reqwest::Client;
21use std::sync::Arc;
22use std::sync::Mutex;
23use std::sync::atomic::{AtomicU64, Ordering};
24use std::time::Instant;
25
26#[must_use]
28pub fn build_upload_url(server_url: &str) -> String {
29 format!("{server_url}/upload")
30}
31
32fn generate_upload_data(size: usize) -> Vec<u8> {
33 let mut data = vec![0u8; size];
34 for (i, byte) in data.iter_mut().enumerate() {
35 *byte = (i % 256) as u8;
36 }
37 data
38}
39
40const SAMPLE_INTERVAL_MS: u64 = 50; const UPLOAD_TEST_ROUNDS: usize = 4;
46
47pub async fn upload_test(
55 client: &Client,
56 server: &Server,
57 single: bool,
58 progress: Arc<SpeedProgress>,
59) -> Result<(f64, f64, u64, Vec<f64>), SpeedtestError> {
60 let concurrent_uploads = common::determine_stream_count(single);
61 let total_bytes = Arc::new(AtomicU64::new(0));
62 let peak_bps = Arc::new(AtomicU64::new(0));
63 let speed_samples = Arc::new(Mutex::new(Vec::new()));
64 let start = Instant::now();
65
66 let upload_data = generate_upload_data(200_000); let estimated_total: u64 = 4_000_000; let last_sample_ms = Arc::new(AtomicU64::new(0));
72
73 let mut handles = Vec::new();
74
75 for _ in 0..concurrent_uploads {
76 let client = client.clone();
77 let server_url = server.url.clone();
78 let data = upload_data.clone();
79 let total_ref = Arc::clone(&total_bytes);
80 let peak_ref = Arc::clone(&peak_bps);
81 let samples_ref = Arc::clone(&speed_samples);
82 let start_ref = start;
83 let prog = Arc::clone(&progress);
84 let throttle_ref = Arc::clone(&last_sample_ms);
85
86 let handle = tokio::spawn(async move {
87 let mut uploaded_bytes = 0u64;
88
89 for _ in 0..UPLOAD_TEST_ROUNDS {
90 let upload_url = build_upload_url(&server_url);
91
92 if client
93 .post(&upload_url)
94 .body(data.clone())
95 .send()
96 .await
97 .is_ok()
98 {
99 let chunk = data.len() as u64;
100 uploaded_bytes += chunk;
101 total_ref.fetch_add(chunk, Ordering::Relaxed);
103
104 let elapsed_ms = start_ref.elapsed().as_millis() as u64;
107 let last_ms = throttle_ref.load(Ordering::Relaxed);
108 let should_sample =
109 last_ms == 0 || elapsed_ms.saturating_sub(last_ms) >= SAMPLE_INTERVAL_MS;
110 if should_sample {
111 throttle_ref.store(elapsed_ms, Ordering::Relaxed);
113
114 let total_so_far = total_ref.load(Ordering::Acquire);
117 let elapsed = start_ref.elapsed().as_secs_f64();
118 let speed = common::calculate_bandwidth(total_so_far, elapsed);
119
120 let current_peak = peak_ref.load(Ordering::Relaxed);
122 if speed > current_peak as f64 {
123 peak_ref.store(speed as u64, Ordering::Relaxed);
124 }
125
126 if let Ok(mut samples) = samples_ref.lock() {
128 samples.push(speed);
129 }
130
131 let pct = (total_so_far as f64 / estimated_total as f64).min(1.0);
132 prog.update(speed / 1_000_000.0, pct, total_so_far);
133 }
134 }
135 }
136
137 uploaded_bytes
138 });
139
140 handles.push(handle);
141 }
142
143 for (i, handle) in handles.into_iter().enumerate() {
146 if let Err(e) = handle.await {
147 eprintln!("Warning: upload task {i} failed: {e}");
148 }
149 }
150
151 let final_total_bytes = total_bytes.load(Ordering::Relaxed);
152 let final_peak_speed = peak_bps.load(Ordering::Relaxed) as f64;
153 let elapsed = start.elapsed().as_secs_f64();
154 let samples = speed_samples.lock().unwrap().to_vec();
155 Ok((
156 common::calculate_bandwidth(final_total_bytes, elapsed),
157 final_peak_speed,
158 final_total_bytes,
159 samples,
160 ))
161}
162
163#[cfg(test)]
164mod tests {
165 use crate::common;
166
167 use super::*;
168
169 #[test]
170 fn test_upload_bandwidth_calculation() {
171 let result = common::calculate_bandwidth(1_000_000, 2.0);
172 assert_eq!(result, 4_000_000.0);
173 }
174
175 #[test]
176 fn test_upload_bandwidth_zero_elapsed() {
177 let result = common::calculate_bandwidth(1_000_000, 0.0);
178 assert_eq!(result, 0.0);
179 }
180
181 #[test]
182 fn test_upload_concurrent_count_single() {
183 assert_eq!(common::determine_stream_count(true), 1);
184 }
185
186 #[test]
187 fn test_upload_concurrent_count_multiple() {
188 assert_eq!(common::determine_stream_count(false), 4);
189 }
190
191 #[test]
192 fn test_upload_url_generation() {
193 let url = build_upload_url("http://server.example.com");
194 assert!(url.ends_with("/upload"));
195 }
196
197 #[test]
198 fn test_upload_url_generation_full_path() {
199 let url = build_upload_url("http://server.example.com/speedtest");
200 assert_eq!(url, "http://server.example.com/speedtest/upload");
201 }
202
203 #[test]
204 fn test_generate_upload_data_size() {
205 let data = generate_upload_data(1000);
206 assert_eq!(data.len(), 1000);
207 }
208
209 #[test]
210 fn test_generate_upload_data_pattern() {
211 let data = generate_upload_data(300);
212 for (i, &byte) in data.iter().enumerate() {
213 assert_eq!(byte, (i % 256) as u8);
214 }
215 }
216
217 #[test]
218 fn test_generate_upload_data_wraps_at_256() {
219 let data = generate_upload_data(512);
220 assert_eq!(data[0], 0u8);
221 assert_eq!(data[255], 255u8);
222 assert_eq!(data[256], 0u8);
223 assert_eq!(data[511], 255u8);
224 }
225
226 #[test]
227 fn test_generate_upload_data_empty() {
228 let data = generate_upload_data(0);
229 assert!(data.is_empty());
230 }
231
232 #[test]
233 fn test_upload_data_size_constant() {
234 let data = generate_upload_data(200_000);
236 assert_eq!(data.len(), 200_000);
237 }
238}