Skip to main content

nd_300/speedtest/
mod.rs

1pub(crate) mod adaptive;
2pub mod applenq;
3pub mod cloudflare;
4pub mod display;
5pub mod fastcom;
6pub mod librespeed;
7pub mod msak;
8pub mod ndt7;
9pub mod statistics;
10
11use serde::Serialize;
12use std::sync::Arc;
13use std::time::Instant;
14
15/// Test duration configuration
16#[derive(Debug, Clone)]
17pub enum TestDuration {
18    /// Fixed duration per direction in seconds (e.g., 30 = 30s download + 30s upload)
19    Seconds(u64),
20    /// Let providers use their natural duration
21    Auto,
22}
23
24/// Which providers to run
25#[derive(Debug, Clone, Copy, PartialEq)]
26pub enum ProviderSet {
27    /// All 6 providers: Cloudflare, NDT7, LibreSpeed, fast.com, M-Lab MSAK,
28    /// Apple networkQuality (speedqx default)
29    All,
30    /// Diagnostic subset: Cloudflare + NDT7 only (nd300 default)
31    Diagnostic,
32}
33
34/// Configuration for the speed test orchestrator
35#[derive(Debug, Clone)]
36pub struct SpeedTestConfig {
37    /// Duration per direction for CF, NDT7, LibreSpeed, MSAK, Apple (default: 30s)
38    pub duration: TestDuration,
39    /// Duration per direction for fast.com (default: Auto)
40    pub fastcom_duration: TestDuration,
41    /// Number of latency probes
42    pub latency_probes: u32,
43    /// Which providers to run
44    pub provider_set: ProviderSet,
45    /// Run the M-Lab MSAK multi-stream provider (All mode only)
46    pub msak_enabled: bool,
47    /// Run the Apple networkQuality provider (All mode only)
48    pub apple_enabled: bool,
49    /// Enable colored output
50    pub use_colors: bool,
51}
52
53impl Default for SpeedTestConfig {
54    fn default() -> Self {
55        Self {
56            duration: TestDuration::Seconds(30),
57            fastcom_duration: TestDuration::Auto,
58            latency_probes: 20,
59            provider_set: ProviderSet::All,
60            msak_enabled: true,
61            apple_enabled: true,
62            use_colors: true,
63        }
64    }
65}
66
67/// Phase indicator for progress callbacks
68#[derive(Debug, Clone, Copy, PartialEq)]
69pub enum Phase {
70    CfLatency,
71    CfDownload,
72    CfUpload,
73    Ndt7Discovery,
74    Ndt7Download,
75    Ndt7Upload,
76    LsDiscovery,
77    LsDownload,
78    LsUpload,
79    FcDiscovery,
80    FcDownload,
81    FcUpload,
82    MsakDiscovery,
83    MsakDownload,
84    MsakUpload,
85    AnqDiscovery,
86    AnqDownload,
87    AnqUpload,
88    Computing,
89}
90
91/// Raw per-request Mbps samples for statistical post-processing.
92#[derive(Debug, Clone, Default, Serialize)]
93pub struct BandwidthSamples {
94    pub download: Vec<f64>,
95    pub upload: Vec<f64>,
96}
97
98/// Connection stability metrics (coefficient of variation).
99#[derive(Debug, Clone, Serialize)]
100pub struct StabilityMetrics {
101    pub download_cv: f64,
102    pub upload_cv: f64,
103    pub download_stable: bool,
104    pub upload_stable: bool,
105}
106
107/// Provider divergence detection.
108#[derive(Debug, Clone, Serialize)]
109pub struct ProviderDivergence {
110    pub download: f64,
111    pub upload: f64,
112    pub significant: bool,
113}
114
115/// A provider direction excluded from the headline merge for insufficient
116/// samples (additive JSON field, v3.4.0+).
117#[derive(Debug, Clone, Serialize)]
118pub struct MergeExclusion {
119    pub provider: String,
120    /// "download" or "upload".
121    pub direction: &'static str,
122    pub samples: usize,
123}
124
125/// Bootstrap confidence intervals on the pooled per-direction sample sets
126/// (additive JSON field, v3.4.0+). The pooled-pipeline CI is an honest proxy
127/// for the headline number's uncertainty: it naturally widens when providers
128/// diverge.
129#[derive(Debug, Clone, Serialize)]
130pub struct ConfidenceIntervals {
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub download: Option<statistics::BootstrapCI>,
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub upload: Option<statistics::BootstrapCI>,
135    pub confidence_level: f64,
136}
137
138/// Per-provider speed test result
139#[derive(Debug, Clone, Serialize)]
140pub struct ProviderResult {
141    pub provider: String,
142    pub server: String,
143    #[serde(skip_serializing_if = "Option::is_none")]
144    pub location: Option<String>,
145    #[serde(skip_serializing_if = "Option::is_none")]
146    pub ping_ms: Option<f64>,
147    #[serde(skip_serializing_if = "Option::is_none")]
148    pub jitter_ms: Option<f64>,
149    #[serde(skip_serializing_if = "Option::is_none")]
150    pub download_mbps: Option<f64>,
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub upload_mbps: Option<f64>,
153    pub download_bytes: u64,
154    pub upload_bytes: u64,
155    pub download_duration_s: f64,
156    pub upload_duration_s: f64,
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub packet_loss_pct: Option<f64>,
159    #[serde(skip_serializing_if = "Option::is_none")]
160    pub error: Option<String>,
161    #[serde(skip_serializing_if = "Option::is_none")]
162    pub bandwidth_samples: Option<BandwidthSamples>,
163}
164
165/// Aggregated speed test result (used by both speedqx and nd300)
166#[derive(Debug, Clone, Serialize)]
167pub struct SpeedTestResult {
168    #[serde(skip_serializing_if = "Option::is_none")]
169    pub ping_ms: Option<f64>,
170    #[serde(skip_serializing_if = "Option::is_none")]
171    pub jitter_ms: Option<f64>,
172    pub download_mbps: f64,
173    pub upload_mbps: f64,
174    #[serde(skip_serializing_if = "Option::is_none")]
175    pub packet_loss_pct: Option<f64>,
176    pub providers: Vec<ProviderResult>,
177    pub duration_s: f64,
178    #[serde(skip_serializing_if = "Option::is_none")]
179    pub stability: Option<StabilityMetrics>,
180    #[serde(skip_serializing_if = "Option::is_none")]
181    pub provider_divergence: Option<ProviderDivergence>,
182    #[serde(skip_serializing_if = "Option::is_none")]
183    pub confidence_intervals: Option<ConfidenceIntervals>,
184    #[serde(skip_serializing_if = "Vec::is_empty")]
185    pub merge_exclusions: Vec<MergeExclusion>,
186}
187
188/// Latency weight for Cloudflare (NDT7 gets 1 - this).
189/// NDT7's MinRTT from TCP kernel is structurally superior, not just lower-variance.
190const CF_LATENCY_WEIGHT: f64 = 0.4;
191
192/// Divergence threshold: flag when providers differ by more than this fraction.
193const DIVERGENCE_THRESHOLD: f64 = 0.3;
194
195fn divergence_ratio(a: f64, b: f64) -> f64 {
196    if a <= 0.0 || b <= 0.0 {
197        return 0.0;
198    }
199    (a - b).abs() / a.max(b)
200}
201
202fn divergence_spread(values: &[(f64, f64)]) -> f64 {
203    let mut min = f64::INFINITY;
204    let mut max = f64::NEG_INFINITY;
205
206    for (value, _) in values {
207        if *value <= 0.0 {
208            continue;
209        }
210        min = min.min(*value);
211        max = max.max(*value);
212    }
213
214    if !min.is_finite() || !max.is_finite() || max <= 0.0 || min == max {
215        0.0
216    } else {
217        divergence_ratio(min, max)
218    }
219}
220
221fn inverse_variance_merge_many(values: &[(f64, f64)]) -> f64 {
222    let positive: Vec<(f64, f64)> = values
223        .iter()
224        .copied()
225        .filter(|(value, _)| value.is_finite() && *value > 0.0)
226        .collect();
227
228    if positive.is_empty() {
229        return 0.0;
230    }
231    if positive.len() == 1 {
232        return positive[0].0;
233    }
234
235    // A variance that is zero, negative, or non-finite means UNKNOWN
236    // precision (a 1-sample provider reports variance 0.0; a fallback value
237    // has no samples at all). Unknown precision must be treated as the LEAST
238    // trusted entry — assign it the maximum known variance. The old rule
239    // clamped unknowns to the variance floor, which handed a degenerate
240    // single-sample provider the highest possible weight.
241    let known = |v: f64| v.is_finite() && v > 0.0;
242
243    let max_known_variance = positive
244        .iter()
245        .filter_map(|(_, variance)| known(*variance).then_some(*variance))
246        .max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
247
248    let Some(max_variance) = max_known_variance else {
249        // No provider has a known variance — plain average.
250        return positive.iter().map(|(value, _)| value).sum::<f64>() / positive.len() as f64;
251    };
252
253    let variance_floor = positive
254        .iter()
255        .filter_map(|(_, variance)| known(*variance).then_some(*variance))
256        .min_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
257        .unwrap_or(max_variance)
258        .max(0.000_001);
259
260    let raw_weights: Vec<f64> = positive
261        .iter()
262        .map(|(_, variance)| {
263            let effective = if known(*variance) {
264                *variance
265            } else {
266                max_variance
267            };
268            1.0 / effective.max(variance_floor)
269        })
270        .collect();
271    let raw_total = raw_weights.iter().sum::<f64>();
272    if raw_total <= 0.0 {
273        return positive.iter().map(|(value, _)| value).sum::<f64>() / positive.len() as f64;
274    }
275
276    let weights = capped_inverse_variance_weights(&raw_weights, raw_total, 0.70);
277
278    positive
279        .iter()
280        .zip(weights.iter())
281        .map(|((value, _), weight)| value * weight)
282        .sum()
283}
284
285fn capped_inverse_variance_weights(raw_weights: &[f64], raw_total: f64, cap: f64) -> Vec<f64> {
286    if raw_weights.is_empty() {
287        return Vec::new();
288    }
289    if raw_weights.len() == 1 {
290        return vec![1.0];
291    }
292    if raw_total <= 0.0 {
293        let equal = 1.0 / raw_weights.len() as f64;
294        return vec![equal; raw_weights.len()];
295    }
296
297    let cap = cap.max(1.0 / raw_weights.len() as f64);
298    let mut weights = vec![0.0; raw_weights.len()];
299    let mut remaining: Vec<usize> = (0..raw_weights.len()).collect();
300    let mut remaining_mass = 1.0;
301
302    loop {
303        if remaining.is_empty() {
304            break;
305        }
306
307        let remaining_raw_total = remaining.iter().map(|idx| raw_weights[*idx]).sum::<f64>();
308        if remaining_raw_total <= 0.0 {
309            let equal = remaining_mass / remaining.len() as f64;
310            for idx in remaining {
311                weights[idx] = equal;
312            }
313            break;
314        }
315
316        let mut capped = Vec::new();
317        for idx in &remaining {
318            let candidate = remaining_mass * raw_weights[*idx] / remaining_raw_total;
319            if candidate > cap {
320                weights[*idx] = cap;
321                remaining_mass = (remaining_mass - cap).max(0.0);
322                capped.push(*idx);
323            }
324        }
325
326        if capped.is_empty() {
327            for idx in remaining {
328                weights[idx] = remaining_mass * raw_weights[idx] / remaining_raw_total;
329            }
330            break;
331        }
332
333        remaining.retain(|idx| !capped.contains(idx));
334    }
335
336    weights
337}
338
339/// Aggregation result including new metrics.
340struct AggregateResult {
341    ping: Option<f64>,
342    jitter: Option<f64>,
343    download: f64,
344    upload: f64,
345    packet_loss: Option<f64>,
346    stability: Option<StabilityMetrics>,
347    divergence: Option<ProviderDivergence>,
348    confidence: Option<ConfidenceIntervals>,
349    exclusions: Vec<MergeExclusion>,
350}
351
352/// Minimum sanitized samples a provider needs in a direction before it joins
353/// the headline inverse-variance merge. Matches the statistics pipeline's own
354/// internal thresholds (slow-start discard, IQR filter, and winsorize all
355/// bail below 4) — a trimean over fewer samples is noise, and its degenerate
356/// variance used to let it dominate the merge.
357const MIN_MERGE_SAMPLES: usize = 4;
358
359/// Pooled samples needed before a bootstrap CI is computed. Below 8 the
360/// percentile method's bounds are too coarse to be honest (and below 4 they
361/// collapse to a misleading "±0").
362const MIN_CI_SAMPLES: usize = 8;
363
364/// One provider direction proposed for the merge.
365struct MergeCandidate {
366    provider: String,
367    value: f64,
368    variance: f64,
369    samples: usize,
370}
371
372/// Apply the minimum-sample floor: providers with enough samples make the
373/// merge; the rest are recorded as exclusions. Degraded fallback: when NO
374/// provider reaches the floor, every positive candidate is kept (the merge
375/// must never return 0.0 while data exists).
376fn select_for_merge(
377    candidates: Vec<MergeCandidate>,
378    direction: &'static str,
379    exclusions: &mut Vec<MergeExclusion>,
380) -> Vec<(f64, f64)> {
381    let any_qualified = candidates.iter().any(|c| c.samples >= MIN_MERGE_SAMPLES);
382    if !any_qualified {
383        return candidates.iter().map(|c| (c.value, c.variance)).collect();
384    }
385
386    let mut kept = Vec::new();
387    for c in candidates {
388        if c.samples >= MIN_MERGE_SAMPLES {
389            kept.push((c.value, c.variance));
390        } else {
391            exclusions.push(MergeExclusion {
392                provider: c.provider,
393                direction,
394                samples: c.samples,
395            });
396        }
397    }
398    kept
399}
400
401/// Inverse-variance weighted aggregation across providers.
402/// Uses accurate bandwidth pipeline on raw samples, fixed latency weights,
403/// stability metrics, and divergence detection.
404fn aggregate(providers: &[ProviderResult]) -> AggregateResult {
405    let successful: Vec<&ProviderResult> = providers.iter().filter(|p| p.error.is_none()).collect();
406
407    if successful.is_empty() {
408        return AggregateResult {
409            ping: None,
410            jitter: None,
411            download: 0.0,
412            upload: 0.0,
413            packet_loss: None,
414            stability: None,
415            divergence: None,
416            confidence: None,
417            exclusions: Vec::new(),
418        };
419    }
420
421    // ── Compute accurate bandwidth per provider from sanitized samples ──
422    let mut dl_candidates: Vec<MergeCandidate> = Vec::new();
423    let mut ul_candidates: Vec<MergeCandidate> = Vec::new();
424    let mut all_dl_samples: Vec<f64> = Vec::new();
425    let mut all_ul_samples: Vec<f64> = Vec::new();
426
427    for p in &successful {
428        if let Some(ref samples) = p.bandwidth_samples {
429            let download = statistics::sanitize(&samples.download);
430            if !download.is_empty() {
431                let acc = statistics::accurate_bandwidth(&download);
432                let var = statistics::variance(&download);
433                if acc > 0.0 {
434                    dl_candidates.push(MergeCandidate {
435                        provider: p.provider.clone(),
436                        value: acc,
437                        variance: var,
438                        samples: download.len(),
439                    });
440                }
441                all_dl_samples.extend_from_slice(&download);
442            }
443            let upload = statistics::sanitize(&samples.upload);
444            if !upload.is_empty() {
445                let acc = statistics::accurate_upload_bandwidth(&upload);
446                let var = statistics::variance(&upload);
447                if acc > 0.0 {
448                    ul_candidates.push(MergeCandidate {
449                        provider: p.provider.clone(),
450                        value: acc,
451                        variance: var,
452                        samples: upload.len(),
453                    });
454                }
455                all_ul_samples.extend_from_slice(&upload);
456            }
457        }
458        // Fallback: use the provider-reported value if no raw samples. It
459        // carries zero samples and an unknown (0.0) variance, so it is
460        // excluded whenever a sampled provider qualifies, and least-trusted
461        // when it does participate.
462        if p.bandwidth_samples
463            .as_ref()
464            .is_none_or(|s| s.download.is_empty())
465        {
466            if let Some(dl) = p.download_mbps {
467                if dl.is_finite() && dl > 0.0 {
468                    dl_candidates.push(MergeCandidate {
469                        provider: p.provider.clone(),
470                        value: dl,
471                        variance: 0.0,
472                        samples: 0,
473                    });
474                }
475            }
476        }
477        if p.bandwidth_samples
478            .as_ref()
479            .is_none_or(|s| s.upload.is_empty())
480        {
481            if let Some(ul) = p.upload_mbps {
482                if ul.is_finite() && ul > 0.0 {
483                    ul_candidates.push(MergeCandidate {
484                        provider: p.provider.clone(),
485                        value: ul,
486                        variance: 0.0,
487                        samples: 0,
488                    });
489                }
490            }
491        }
492    }
493
494    // ── Minimum-sample floor, then inverse-variance merge ──────────
495    let mut exclusions: Vec<MergeExclusion> = Vec::new();
496    let provider_dl = select_for_merge(dl_candidates, "download", &mut exclusions);
497    let provider_ul = select_for_merge(ul_candidates, "upload", &mut exclusions);
498
499    let download = inverse_variance_merge_many(&provider_dl);
500
501    let upload = inverse_variance_merge_many(&provider_ul);
502
503    // ── Bootstrap confidence intervals on the pooled sample sets ───
504    let download_ci = (all_dl_samples.len() >= MIN_CI_SAMPLES).then(|| {
505        statistics::bootstrap_ci(&all_dl_samples, statistics::accurate_bandwidth, 1000, 0.05)
506    });
507    let upload_ci = (all_ul_samples.len() >= MIN_CI_SAMPLES).then(|| {
508        statistics::bootstrap_ci(
509            &all_ul_samples,
510            statistics::accurate_upload_bandwidth,
511            1000,
512            0.05,
513        )
514    });
515    let confidence = if download_ci.is_some() || upload_ci.is_some() {
516        Some(ConfidenceIntervals {
517            download: download_ci,
518            upload: upload_ci,
519            confidence_level: 0.95,
520        })
521    } else {
522        None
523    };
524
525    // ── Latency: confidence-weighted merge (CF 0.4 / NDT7 0.6) ─────
526    let cf_ping = successful
527        .iter()
528        .find(|p| p.provider == "Cloudflare")
529        .and_then(|p| p.ping_ms);
530    let ndt_ping = successful
531        .iter()
532        .find(|p| p.provider == "M-Lab NDT7")
533        .and_then(|p| p.ping_ms);
534    let cf_jitter = successful
535        .iter()
536        .find(|p| p.provider == "Cloudflare")
537        .and_then(|p| p.jitter_ms);
538    let ndt_jitter = successful
539        .iter()
540        .find(|p| p.provider == "M-Lab NDT7")
541        .and_then(|p| p.jitter_ms);
542
543    let ping = match (cf_ping, ndt_ping) {
544        (Some(cf), Some(ndt)) => Some(statistics::weighted_merge(cf, ndt, CF_LATENCY_WEIGHT)),
545        (Some(cf), None) => Some(cf),
546        (None, Some(ndt)) => Some(ndt),
547        (None, None) => {
548            // Fallback: minimum across providers, but prefer those that also
549            // report jitter — jitter presence means the ping came from a
550            // multi-probe measurement, not a single noisy sample that could
551            // otherwise become the global minimum.
552            let min_of = |require_jitter: bool| {
553                successful
554                    .iter()
555                    .filter(|p| !require_jitter || p.jitter_ms.is_some())
556                    .filter_map(|p| p.ping_ms)
557                    .filter(|p| *p > 0.0)
558                    .min_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
559            };
560            min_of(true).or_else(|| min_of(false))
561        }
562    };
563
564    let jitter = match (cf_jitter, ndt_jitter) {
565        (Some(cf), Some(ndt)) => Some(statistics::weighted_merge(cf, ndt, CF_LATENCY_WEIGHT)),
566        (Some(cf), None) => Some(cf),
567        (None, Some(ndt)) => Some(ndt),
568        (None, None) => {
569            let jitters: Vec<f64> = successful
570                .iter()
571                .filter_map(|p| p.jitter_ms)
572                .filter(|j| *j > 0.0)
573                .collect();
574            if jitters.is_empty() {
575                None
576            } else {
577                Some(statistics::mean(&jitters))
578            }
579        }
580    };
581
582    // Packet loss from Cloudflare (only provider that measures it)
583    let packet_loss = successful
584        .iter()
585        .find(|p| p.provider == "Cloudflare")
586        .and_then(|p| p.packet_loss_pct);
587
588    // ── Stability metrics ──────────────────────────────────────────
589    let stability = if all_dl_samples.len() > 2 || all_ul_samples.len() > 2 {
590        let dl_cv = statistics::coefficient_of_variation(&all_dl_samples);
591        let ul_cv = statistics::coefficient_of_variation(&all_ul_samples);
592        Some(StabilityMetrics {
593            download_cv: dl_cv,
594            upload_cv: ul_cv,
595            download_stable: dl_cv < 0.15,
596            upload_stable: ul_cv < 0.15,
597        })
598    } else {
599        None
600    };
601
602    // ── Provider divergence ────────────────────────────────────────
603    let divergence = if provider_dl.len() >= 2 || provider_ul.len() >= 2 {
604        let dl_div = divergence_spread(&provider_dl);
605        let ul_div = divergence_spread(&provider_ul);
606        Some(ProviderDivergence {
607            download: dl_div,
608            upload: ul_div,
609            significant: dl_div > DIVERGENCE_THRESHOLD || ul_div > DIVERGENCE_THRESHOLD,
610        })
611    } else {
612        None
613    };
614
615    AggregateResult {
616        ping,
617        jitter,
618        download,
619        upload,
620        packet_loss,
621        stability,
622        divergence,
623        confidence,
624        exclusions,
625    }
626}
627
628/// Callback type for provider completion notifications.
629pub type ProviderCompleteCallback = Arc<dyn Fn(&ProviderResult) + Send + Sync>;
630
631/// Run the speed test with the given configuration and progress callback.
632/// The `on_provider_complete` callback is called after each provider finishes,
633/// allowing the UI to show per-provider summaries.
634pub async fn run<F>(
635    config: SpeedTestConfig,
636    progress: F,
637    on_provider_complete: Option<ProviderCompleteCallback>,
638) -> SpeedTestResult
639where
640    F: Fn(Phase, f64) + Send + Sync + 'static,
641{
642    let start = Instant::now();
643    let mut providers = Vec::new();
644    let progress = Arc::new(progress);
645
646    // Cloudflare (always runs)
647    {
648        let pg = progress.clone();
649        let cf_result = cloudflare::run(&config, move |phase, p| pg(phase, p)).await;
650        if let Some(ref cb) = on_provider_complete {
651            cb(&cf_result);
652        }
653        providers.push(cf_result);
654    }
655
656    // M-Lab NDT7 (always runs)
657    {
658        let pg = progress.clone();
659        let ndt_result = ndt7::run(&config, move |phase, p| pg(phase, p)).await;
660        if let Some(ref cb) = on_provider_complete {
661            cb(&ndt_result);
662        }
663        providers.push(ndt_result);
664    }
665
666    // LibreSpeed + fast.com + MSAK + Apple networkQuality (only in All mode)
667    if config.provider_set == ProviderSet::All {
668        {
669            let pg = progress.clone();
670            let ls_result = librespeed::run(&config, move |phase, p| pg(phase, p)).await;
671            if let Some(ref cb) = on_provider_complete {
672                cb(&ls_result);
673            }
674            providers.push(ls_result);
675        }
676
677        {
678            let pg = progress.clone();
679            let fc_result = fastcom::run(&config, move |phase, p| pg(phase, p)).await;
680            if let Some(ref cb) = on_provider_complete {
681                cb(&fc_result);
682            }
683            providers.push(fc_result);
684        }
685
686        if config.msak_enabled {
687            let pg = progress.clone();
688            let msak_result = msak::run(&config, move |phase, p| pg(phase, p)).await;
689            if let Some(ref cb) = on_provider_complete {
690                cb(&msak_result);
691            }
692            providers.push(msak_result);
693        }
694
695        if config.apple_enabled {
696            let pg = progress.clone();
697            let anq_result = applenq::run(&config, move |phase, p| pg(phase, p)).await;
698            if let Some(ref cb) = on_provider_complete {
699                cb(&anq_result);
700            }
701            providers.push(anq_result);
702        }
703    }
704
705    progress(Phase::Computing, 1.0);
706
707    let agg = aggregate(&providers);
708    let duration = start.elapsed().as_secs_f64();
709
710    SpeedTestResult {
711        ping_ms: agg.ping,
712        jitter_ms: agg.jitter,
713        download_mbps: agg.download,
714        upload_mbps: agg.upload,
715        packet_loss_pct: agg.packet_loss,
716        providers,
717        duration_s: duration,
718        stability: agg.stability,
719        provider_divergence: agg.divergence,
720        confidence_intervals: agg.confidence,
721        merge_exclusions: agg.exclusions,
722    }
723}
724
725/// Format Mbps value for display
726pub fn format_mbps(mbps: f64) -> String {
727    if mbps >= 1000.0 {
728        format!("{:.1} Gbps", mbps / 1000.0)
729    } else if mbps >= 100.0 {
730        format!("{:.0} Mbps", mbps)
731    } else if mbps >= 10.0 {
732        format!("{:.1} Mbps", mbps)
733    } else {
734        format!("{:.2} Mbps", mbps)
735    }
736}
737
738/// Format bytes for display
739pub fn format_bytes(bytes: u64) -> String {
740    const KB: u64 = 1024;
741    const MB: u64 = 1024 * KB;
742    const GB: u64 = 1024 * MB;
743
744    if bytes >= GB {
745        format!("{:.2} GB", bytes as f64 / GB as f64)
746    } else if bytes >= MB {
747        format!("{:.1} MB", bytes as f64 / MB as f64)
748    } else if bytes >= KB {
749        format!("{:.1} KB", bytes as f64 / KB as f64)
750    } else {
751        format!("{} B", bytes)
752    }
753}
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758
759    fn provider(name: &str, download: f64, upload: f64, variance: f64) -> ProviderResult {
760        let delta = variance.sqrt();
761        ProviderResult {
762            provider: name.to_string(),
763            server: "test".to_string(),
764            location: None,
765            ping_ms: None,
766            jitter_ms: None,
767            download_mbps: Some(download),
768            upload_mbps: Some(upload),
769            download_bytes: 1,
770            upload_bytes: 1,
771            download_duration_s: 1.0,
772            upload_duration_s: 1.0,
773            packet_loss_pct: None,
774            error: None,
775            bandwidth_samples: Some(BandwidthSamples {
776                download: vec![download - delta, download, download + delta, download],
777                upload: vec![upload - delta, upload, upload + delta, upload],
778            }),
779        }
780    }
781
782    #[test]
783    fn aggregate_uses_more_than_first_two_providers() {
784        let first_two = vec![
785            provider("Cloudflare", 100.0, 20.0, 4.0),
786            provider("M-Lab NDT7", 100.0, 20.0, 4.0),
787        ];
788        let with_four = vec![
789            provider("Cloudflare", 100.0, 20.0, 4.0),
790            provider("M-Lab NDT7", 100.0, 20.0, 4.0),
791            provider("LibreSpeed", 900.0, 180.0, 4.0),
792            provider("fast.com", 900.0, 180.0, 4.0),
793        ];
794
795        let two = aggregate(&first_two);
796        let four = aggregate(&with_four);
797
798        assert!(
799            four.download > two.download + 100.0,
800            "third/fourth providers should materially influence aggregate: two={}, four={}",
801            two.download,
802            four.download
803        );
804        assert!(
805            four.upload > two.upload + 20.0,
806            "third/fourth providers should materially influence upload aggregate: two={}, four={}",
807            two.upload,
808            four.upload
809        );
810    }
811
812    #[test]
813    fn divergence_uses_full_provider_spread() {
814        let providers = vec![
815            provider("Cloudflare", 100.0, 20.0, 4.0),
816            provider("M-Lab NDT7", 105.0, 22.0, 4.0),
817            provider("LibreSpeed", 450.0, 90.0, 4.0),
818        ];
819
820        let agg = aggregate(&providers);
821        let div = agg.divergence.expect("divergence should be reported");
822
823        assert!(div.significant);
824        assert!(
825            div.download > 0.70,
826            "expected divergence to use 100 vs 450 spread, got {}",
827            div.download
828        );
829        assert!(
830            div.upload > 0.70,
831            "expected divergence to use 20 vs 90 spread, got {}",
832            div.upload
833        );
834    }
835
836    #[test]
837    fn inverse_variance_merge_caps_single_provider_dominance() {
838        let merged = inverse_variance_merge_many(&[(1000.0, 0.000_001), (1.0, 1000.0)]);
839
840        assert!(
841            merged < 701.0,
842            "dominant provider should be capped near 70%, got {}",
843            merged
844        );
845    }
846
847    /// A provider whose degenerate sample set yields an unknown (0.0)
848    /// variance must adopt the WORST known variance in the merge (least
849    /// trusted), not the best. With known variances {4, 25}, the unknown
850    /// 1000 Mbps entry gets 25; the old floor rule handed it 4 — tied for
851    /// the highest weight — which pulled the merge above 500.
852    #[test]
853    fn nan_variance_provider_does_not_dominate_merge() {
854        let merged = inverse_variance_merge_many(&[(100.0, 4.0), (102.0, 25.0), (1000.0, 0.0)]);
855        assert!(
856            merged < 300.0,
857            "unknown-variance provider must be least-trusted; got {}",
858            merged
859        );
860
861        // Non-finite variance is treated the same as unknown.
862        let merged_nan =
863            inverse_variance_merge_many(&[(100.0, 4.0), (102.0, 25.0), (1000.0, f64::NAN)]);
864        assert!(
865            merged_nan < 300.0,
866            "NaN-variance provider must be least-trusted; got {}",
867            merged_nan
868        );
869    }
870
871    #[test]
872    fn non_finite_values_filtered_from_merge() {
873        let merged = inverse_variance_merge_many(&[(f64::NAN, 4.0), (100.0, 4.0)]);
874        assert_eq!(merged, 100.0);
875        let merged_inf = inverse_variance_merge_many(&[(f64::INFINITY, 4.0), (100.0, 4.0)]);
876        assert_eq!(merged_inf, 100.0);
877    }
878
879    fn provider_with_samples(name: &str, download: Vec<f64>, upload: Vec<f64>) -> ProviderResult {
880        ProviderResult {
881            provider: name.to_string(),
882            server: "test".to_string(),
883            location: None,
884            ping_ms: None,
885            jitter_ms: None,
886            download_mbps: download.last().copied(),
887            upload_mbps: upload.last().copied(),
888            download_bytes: 1,
889            upload_bytes: 1,
890            download_duration_s: 1.0,
891            upload_duration_s: 1.0,
892            packet_loss_pct: None,
893            error: None,
894            bandwidth_samples: Some(BandwidthSamples { download, upload }),
895        }
896    }
897
898    /// A 1-sample provider at a wild value is excluded from the merge when a
899    /// well-sampled provider exists, and the exclusion is reported.
900    #[test]
901    fn single_sample_provider_excluded_from_merge() {
902        let clean: Vec<f64> = (0..10).map(|i| 98.0 + (i % 3) as f64).collect();
903        let providers = vec![
904            provider_with_samples("Cloudflare", clean.clone(), clean.clone()),
905            provider_with_samples("LibreSpeed", vec![1000.0], vec![1000.0]),
906        ];
907
908        let agg = aggregate(&providers);
909
910        assert!(
911            (agg.download - 99.0).abs() < 5.0,
912            "merge should track the well-sampled provider, got {}",
913            agg.download
914        );
915        assert!(
916            agg.exclusions
917                .iter()
918                .any(|e| e.provider == "LibreSpeed" && e.direction == "download" && e.samples == 1),
919            "LibreSpeed download exclusion should be recorded: {:?}",
920            agg.exclusions
921                .iter()
922                .map(|e| (&e.provider, e.direction, e.samples))
923                .collect::<Vec<_>>()
924        );
925    }
926
927    /// When NO provider reaches the floor, all positive candidates still
928    /// merge — never 0.0 while data exists.
929    #[test]
930    fn all_sparse_providers_still_merge() {
931        let providers = vec![
932            provider_with_samples("Cloudflare", vec![100.0, 102.0], vec![20.0, 21.0]),
933            provider_with_samples("LibreSpeed", vec![110.0, 108.0], vec![22.0, 23.0]),
934        ];
935
936        let agg = aggregate(&providers);
937
938        assert!(agg.download > 0.0, "degraded merge must not return 0.0");
939        assert!(agg.exclusions.is_empty(), "no exclusions in degraded mode");
940    }
941
942    /// A no-sample fallback value participates only with least-trust
943    /// weighting (and is excluded entirely when a sampled provider exists).
944    #[test]
945    fn fallback_value_provider_excluded_when_sampled_provider_exists() {
946        let clean: Vec<f64> = (0..10).map(|i| 98.0 + (i % 3) as f64).collect();
947        let mut fallback_only = provider_with_samples("fast.com", vec![], vec![]);
948        fallback_only.download_mbps = Some(1000.0);
949        fallback_only.upload_mbps = Some(1000.0);
950
951        let providers = vec![
952            provider_with_samples("Cloudflare", clean.clone(), clean.clone()),
953            fallback_only,
954        ];
955
956        let agg = aggregate(&providers);
957
958        assert!(
959            (agg.download - 99.0).abs() < 5.0,
960            "fallback-only provider must not skew the merge, got {}",
961            agg.download
962        );
963        assert!(agg
964            .exclusions
965            .iter()
966            .any(|e| e.provider == "fast.com" && e.samples == 0));
967    }
968
969    /// In the no-CF/no-NDT7 latency fallback, a single-probe (jitterless)
970    /// provider must not become the global minimum when a multi-probe
971    /// provider is available.
972    #[test]
973    fn latency_fallback_ignores_jitterless_single_probe() {
974        let mut multi = provider_with_samples("LibreSpeed", vec![100.0; 5], vec![20.0; 5]);
975        multi.ping_ms = Some(15.0);
976        multi.jitter_ms = Some(1.2);
977        let mut single = provider_with_samples("fast.com", vec![100.0; 5], vec![20.0; 5]);
978        single.ping_ms = Some(2.0); // suspiciously low single sample
979        single.jitter_ms = None;
980
981        let agg = aggregate(&[multi, single]);
982
983        assert_eq!(
984            agg.ping,
985            Some(15.0),
986            "jitter-bearing provider should win the fallback"
987        );
988    }
989
990    #[test]
991    fn confidence_intervals_suppressed_below_threshold() {
992        let providers = vec![provider_with_samples(
993            "Cloudflare",
994            vec![100.0, 101.0, 99.0],
995            vec![20.0, 21.0],
996        )];
997        let agg = aggregate(&providers);
998        assert!(agg.confidence.is_none());
999    }
1000
1001    #[test]
1002    fn confidence_intervals_present_with_sufficient_samples() {
1003        let samples: Vec<f64> = (0..12).map(|i| 95.0 + (i % 5) as f64 * 2.0).collect();
1004        let providers = vec![provider_with_samples(
1005            "Cloudflare",
1006            samples.clone(),
1007            samples.clone(),
1008        )];
1009        let agg = aggregate(&providers);
1010        let ci = agg.confidence.expect("CI should be computed");
1011        let dl = ci.download.expect("download CI present");
1012        assert!(dl.lower <= dl.estimate && dl.estimate <= dl.upper);
1013        assert_eq!(ci.confidence_level, 0.95);
1014    }
1015}