netspeed-cli 0.5.1

Command-line interface for testing internet bandwidth using speedtest.net
Documentation
//! Multi-stream upload bandwidth measurement.
//!
//! This module handles uploading test data to speedtest.net servers
//! to measure upload bandwidth. It supports:
//! - Multi-stream concurrent uploads (4 streams by default, 1 with `--single`)
//! - Progressive payload sizing for accurate measurement
//! - Real-time progress tracking with speed calculation
//! - Peak speed detection through periodic sampling

#![allow(
    clippy::cast_precision_loss,
    clippy::cast_possible_truncation,
    clippy::cast_sign_loss
)]

use crate::common;
use crate::error::SpeedtestError;
use crate::progress::SpeedProgress;
use crate::types::Server;
use reqwest::Client;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;

/// Build upload URL
#[must_use]
pub fn build_upload_url(server_url: &str) -> String {
    format!("{server_url}/upload")
}

fn generate_upload_data(size: usize) -> Vec<u8> {
    let mut data = vec![0u8; size];
    for (i, byte) in data.iter_mut().enumerate() {
        *byte = (i % 256) as u8;
    }
    data
}

/// Interval between speed samples in milliseconds.
/// Throttling prevents excessive sampling overhead on all hot-path operations.
const SAMPLE_INTERVAL_MS: u64 = 50; // 50ms between samples (20 Hz max)

/// Number of upload rounds per stream (each round uploads a chunk of test data).
const UPLOAD_TEST_ROUNDS: usize = 4;

/// Run upload bandwidth test against the given server.
///
/// Returns `(avg_speed_bps, peak_speed_bps, total_bytes_uploaded, speed_samples)`.
///
/// # Errors
///
/// Returns [`SpeedtestError::NetworkError`] if all upload streams fail.
pub async fn upload_test(
    client: &Client,
    server: &Server,
    single: bool,
    progress: Arc<SpeedProgress>,
) -> Result<(f64, f64, u64, Vec<f64>), SpeedtestError> {
    let concurrent_uploads = common::determine_stream_count(single);
    let total_bytes = Arc::new(AtomicU64::new(0));
    let peak_bps = Arc::new(AtomicU64::new(0));
    let speed_samples = Arc::new(Mutex::new(Vec::new()));
    let start = Instant::now();

    let upload_data = generate_upload_data(200_000); // 200KB chunks
    let estimated_total: u64 = 4_000_000; // 4 MB estimate

    // Throttle gate: tracks last sample time in millis to limit all expensive ops to 20 Hz.
    // Initialized to 0 so the first upload always triggers a sample (any elapsed > 0 fires).
    let last_sample_ms = Arc::new(AtomicU64::new(0));

    let mut handles = Vec::new();

    for _ in 0..concurrent_uploads {
        let client = client.clone();
        let server_url = server.url.clone();
        let data = upload_data.clone();
        let total_ref = Arc::clone(&total_bytes);
        let peak_ref = Arc::clone(&peak_bps);
        let samples_ref = Arc::clone(&speed_samples);
        let start_ref = start;
        let prog = Arc::clone(&progress);
        let throttle_ref = Arc::clone(&last_sample_ms);

        let handle = tokio::spawn(async move {
            let mut uploaded_bytes = 0u64;

            for _ in 0..UPLOAD_TEST_ROUNDS {
                let upload_url = build_upload_url(&server_url);

                if client
                    .post(&upload_url)
                    .body(data.clone())
                    .send()
                    .await
                    .is_ok()
                {
                    let chunk = data.len() as u64;
                    uploaded_bytes += chunk;
                    // Cheap atomic add — runs on every upload
                    total_ref.fetch_add(chunk, Ordering::Relaxed);

                    // Throttle gate: only run expensive ops every 50ms.
                    // First sample always fires (last_sample_ms == 0 means "never sampled").
                    let elapsed_ms = start_ref.elapsed().as_millis() as u64;
                    let last_ms = throttle_ref.load(Ordering::Relaxed);
                    let should_sample =
                        last_ms == 0 || elapsed_ms.saturating_sub(last_ms) >= SAMPLE_INTERVAL_MS;
                    if should_sample {
                        // Update throttle timestamp
                        throttle_ref.store(elapsed_ms, Ordering::Relaxed);

                        // All expensive ops now run at most every 50ms:
                        // Acquire ensures we see the latest fetch_add results on ARM64.
                        let total_so_far = total_ref.load(Ordering::Acquire);
                        let elapsed = start_ref.elapsed().as_secs_f64();
                        let speed = common::calculate_bandwidth(total_so_far, elapsed);

                        // Peak tracking
                        let current_peak = peak_ref.load(Ordering::Relaxed);
                        if speed > current_peak as f64 {
                            peak_ref.store(speed as u64, Ordering::Relaxed);
                        }

                        // Record speed sample (throttled)
                        if let Ok(mut samples) = samples_ref.lock() {
                            samples.push(speed);
                        }

                        let pct = (total_so_far as f64 / estimated_total as f64).min(1.0);
                        prog.update(speed / 1_000_000.0, pct, total_so_far);
                    }
                }
            }

            uploaded_bytes
        });

        handles.push(handle);
    }

    // Collect results — log any task panics so failures aren't silently swallowed.
    // Bytes are already counted via atomic counters, so we don't need the return values.
    for (i, handle) in handles.into_iter().enumerate() {
        if let Err(e) = handle.await {
            eprintln!("Warning: upload task {i} failed: {e}");
        }
    }

    let final_total_bytes = total_bytes.load(Ordering::Relaxed);
    let final_peak_speed = peak_bps.load(Ordering::Relaxed) as f64;
    let elapsed = start.elapsed().as_secs_f64();
    let samples = speed_samples.lock().unwrap().to_vec();
    Ok((
        common::calculate_bandwidth(final_total_bytes, elapsed),
        final_peak_speed,
        final_total_bytes,
        samples,
    ))
}

#[cfg(test)]
mod tests {
    use crate::common;

    use super::*;

    #[test]
    fn test_upload_bandwidth_calculation() {
        let result = common::calculate_bandwidth(1_000_000, 2.0);
        assert_eq!(result, 4_000_000.0);
    }

    #[test]
    fn test_upload_bandwidth_zero_elapsed() {
        let result = common::calculate_bandwidth(1_000_000, 0.0);
        assert_eq!(result, 0.0);
    }

    #[test]
    fn test_upload_concurrent_count_single() {
        assert_eq!(common::determine_stream_count(true), 1);
    }

    #[test]
    fn test_upload_concurrent_count_multiple() {
        assert_eq!(common::determine_stream_count(false), 4);
    }

    #[test]
    fn test_upload_url_generation() {
        let url = build_upload_url("http://server.example.com");
        assert!(url.ends_with("/upload"));
    }

    #[test]
    fn test_upload_url_generation_full_path() {
        let url = build_upload_url("http://server.example.com/speedtest");
        assert_eq!(url, "http://server.example.com/speedtest/upload");
    }

    #[test]
    fn test_generate_upload_data_size() {
        let data = generate_upload_data(1000);
        assert_eq!(data.len(), 1000);
    }

    #[test]
    fn test_generate_upload_data_pattern() {
        let data = generate_upload_data(300);
        for (i, &byte) in data.iter().enumerate() {
            assert_eq!(byte, (i % 256) as u8);
        }
    }

    #[test]
    fn test_generate_upload_data_wraps_at_256() {
        let data = generate_upload_data(512);
        assert_eq!(data[0], 0u8);
        assert_eq!(data[255], 255u8);
        assert_eq!(data[256], 0u8);
        assert_eq!(data[511], 255u8);
    }

    #[test]
    fn test_generate_upload_data_empty() {
        let data = generate_upload_data(0);
        assert!(data.is_empty());
    }

    #[test]
    fn test_upload_data_size_constant() {
        // Verify the upload data size used in upload_test (200KB)
        let data = generate_upload_data(200_000);
        assert_eq!(data.len(), 200_000);
    }
}