1use crate::measurements::format_bytes;
2use crate::measurements::log_measurements;
3use crate::measurements::LatencyMeasurement;
4use crate::measurements::Measurement;
5use crate::measurements::PayloadAttemptStats;
6use crate::progress::print_progress;
7use crate::OutputFormat;
8use crate::SpeedTestCLIOptions;
9use jiff::Zoned;
10use log;
11use regex::Regex;
12use reqwest::{blocking::Client, header::RETRY_AFTER, StatusCode};
13use serde::Serialize;
14use std::{
15 fmt::Display,
16 io::Write,
17 sync::atomic::{AtomicBool, Ordering},
18 thread,
19 time::{Duration, Instant},
20};
21
22const BASE_URL: &str = "https://speed.cloudflare.com";
23const DOWNLOAD_URL: &str = "__down?bytes=";
24const UPLOAD_URL: &str = "__up";
25static WARNED_NEGATIVE_LATENCY: AtomicBool = AtomicBool::new(false);
26const TIME_THRESHOLD: Duration = Duration::from_secs(5);
27const MAX_ATTEMPT_FACTOR: u32 = 4;
28const RETRY_BASE_BACKOFF: Duration = Duration::from_millis(250);
29const RETRY_MAX_BACKOFF: Duration = Duration::from_secs(3);
30
31#[derive(Clone, Copy, Debug)]
32struct RetryRunOptions {
33 nr_tests: u32,
34 output_format: OutputFormat,
35 disable_dynamic_max_payload_size: bool,
36}
37
38#[derive(Clone, Copy, Debug, Hash, Serialize, Eq, PartialEq)]
39pub enum TestType {
40 Download,
41 Upload,
42}
43
44#[derive(Clone, Debug, PartialEq, Eq)]
45pub enum PayloadSize {
46 K100 = 100_000,
47 M1 = 1_000_000,
48 M10 = 10_000_000,
49 M25 = 25_000_000,
50 M100 = 100_000_000,
51}
52
53impl Display for PayloadSize {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 write!(f, "{}", format_bytes(self.clone() as usize))
56 }
57}
58
59impl PayloadSize {
60 pub fn from(payload_string: String) -> Result<Self, String> {
61 match payload_string.to_lowercase().as_str() {
62 "100_000" | "100000" | "100k" | "100kb" => Ok(Self::K100),
63 "1_000_000" | "1000000" | "1m" | "1mb" => Ok(Self::M1),
64 "10_000_000" | "10000000" | "10m" | "10mb" => Ok(Self::M10),
65 "25_000_000" | "25000000" | "25m" | "25mb" => Ok(Self::M25),
66 "100_000_000" | "100000000" | "100m" | "100mb" => Ok(Self::M100),
67 _ => Err("Value needs to be one of 100k, 1m, 10m, 25m or 100m".to_string()),
68 }
69 }
70
71 pub fn sizes_from_max(max_payload_size: PayloadSize) -> Vec<usize> {
72 log::debug!("getting payload iterations for max_payload_size {max_payload_size:?}");
73 let payload_bytes: Vec<usize> =
74 vec![100_000, 1_000_000, 10_000_000, 25_000_000, 100_000_000];
75 match max_payload_size {
76 PayloadSize::K100 => payload_bytes[0..1].to_vec(),
77 PayloadSize::M1 => payload_bytes[0..2].to_vec(),
78 PayloadSize::M10 => payload_bytes[0..3].to_vec(),
79 PayloadSize::M25 => payload_bytes[0..4].to_vec(),
80 PayloadSize::M100 => payload_bytes[0..5].to_vec(),
81 }
82 }
83}
84
85#[derive(Clone, Debug, Serialize)]
86pub struct Metadata {
87 pub country: String,
88 pub ip: String,
89 pub colo: String,
90}
91
92impl Display for Metadata {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 write!(
95 f,
96 "Country: {}\nIp: {}\nColo: {}",
97 self.country, self.ip, self.colo
98 )
99 }
100}
101
102pub fn speed_test(client: Client, options: SpeedTestCLIOptions) -> Vec<Measurement> {
103 let metadata = match fetch_metadata(&client) {
104 Ok(metadata) => metadata,
105 Err(e) => {
106 eprintln!("Error fetching metadata: {e}");
107 std::process::exit(1);
108 }
109 };
110 if options.output_format == OutputFormat::StdOut {
111 println!("{metadata}");
112 }
113 let (latency_measurements, avg_latency) =
114 run_latency_test(&client, options.nr_latency_tests, options.output_format);
115 let latency_measurement = if !latency_measurements.is_empty() {
116 Some(LatencyMeasurement {
117 avg_latency_ms: avg_latency,
118 min_latency_ms: latency_measurements
119 .iter()
120 .copied()
121 .fold(f64::INFINITY, f64::min),
122 max_latency_ms: latency_measurements
123 .iter()
124 .copied()
125 .fold(f64::NEG_INFINITY, f64::max),
126 latency_measurements,
127 })
128 } else {
129 None
130 };
131
132 let payload_sizes = PayloadSize::sizes_from_max(options.max_payload_size.clone());
133 let mut measurements = Vec::new();
134 let mut payload_attempt_stats = Vec::new();
135
136 if options.should_download() {
137 let (download_measurements, download_attempt_stats) = run_tests_with_retries(
138 &client,
139 TestType::Download,
140 payload_sizes.clone(),
141 options.nr_tests,
142 options.output_format,
143 options.disable_dynamic_max_payload_size,
144 );
145 measurements.extend(download_measurements);
146 payload_attempt_stats.extend(download_attempt_stats);
147 }
148
149 if options.should_upload() {
150 let (upload_measurements, upload_attempt_stats) = run_tests_with_retries(
151 &client,
152 TestType::Upload,
153 payload_sizes.clone(),
154 options.nr_tests,
155 options.output_format,
156 options.disable_dynamic_max_payload_size,
157 );
158 measurements.extend(upload_measurements);
159 payload_attempt_stats.extend(upload_attempt_stats);
160 }
161
162 log_measurements(
163 &measurements,
164 &payload_attempt_stats,
165 latency_measurement.as_ref(),
166 payload_sizes,
167 options.verbose,
168 options.output_format,
169 Some(&metadata),
170 );
171 measurements
172}
173
174pub fn run_latency_test(
175 client: &Client,
176 nr_latency_tests: u32,
177 output_format: OutputFormat,
178) -> (Vec<f64>, f64) {
179 let mut measurements: Vec<f64> = Vec::new();
180 for i in 0..nr_latency_tests {
181 if output_format == OutputFormat::StdOut {
182 print_progress("latency test", i + 1, nr_latency_tests);
183 }
184 let latency = test_latency(client);
185 measurements.push(latency);
186 }
187 let avg_latency = measurements.iter().sum::<f64>() / measurements.len() as f64;
188
189 if output_format == OutputFormat::StdOut {
190 println!(
191 "\nAvg GET request latency {avg_latency:.2} ms (RTT excluding server processing time)\n"
192 );
193 }
194 (measurements, avg_latency)
195}
196
197pub fn test_latency(client: &Client) -> f64 {
198 let url = &format!("{}/{}{}", BASE_URL, DOWNLOAD_URL, 0);
199 let req_builder = client.get(url);
200
201 let start = Instant::now();
202 let mut response = req_builder.send().expect("failed to get response");
203 let _status_code = response.status();
204 let _ = std::io::copy(&mut response, &mut std::io::sink());
206 let total_ms = start.elapsed().as_secs_f64() * 1_000.0;
207
208 let re = Regex::new(r"cfRequestDuration;dur=([\d.]+)").unwrap();
209 let server_timing = response
210 .headers()
211 .get("Server-Timing")
212 .expect("No Server-Timing in response header")
213 .to_str()
214 .unwrap();
215 let cf_req_duration: f64 = re
216 .captures(server_timing)
217 .unwrap()
218 .get(1)
219 .unwrap()
220 .as_str()
221 .parse()
222 .unwrap();
223 let mut req_latency = total_ms - cf_req_duration;
224 log::debug!(
225 "latency debug: total_ms={total_ms:.3} cf_req_duration_ms={cf_req_duration:.3} req_latency_total={req_latency:.3} server_timing={server_timing}"
226 );
227 if req_latency < 0.0 {
228 if !WARNED_NEGATIVE_LATENCY.swap(true, Ordering::Relaxed) {
229 log::warn!(
230 "negative latency after server timing subtraction; clamping to 0.0 (total_ms={total_ms:.3} cf_req_duration_ms={cf_req_duration:.3})"
231 );
232 }
233 req_latency = 0.0
234 }
235 req_latency
236}
237
238#[derive(Debug)]
239enum SampleOutcome {
240 Success {
241 mbits: f64,
242 duration: Duration,
243 status_code: StatusCode,
244 },
245 RetryableFailure {
246 duration: Duration,
247 status_code: Option<StatusCode>,
248 retry_after: Option<Duration>,
249 reason: String,
250 },
251 Failed {
252 duration: Duration,
253 status_code: Option<StatusCode>,
254 reason: String,
255 },
256}
257
258pub fn run_tests(
259 client: &Client,
260 test_fn: fn(&Client, usize, OutputFormat) -> f64,
261 test_type: TestType,
262 payload_sizes: Vec<usize>,
263 nr_tests: u32,
264 output_format: OutputFormat,
265 disable_dynamic_max_payload_size: bool,
266) -> Vec<Measurement> {
267 let mut measurements: Vec<Measurement> = Vec::new();
268 for payload_size in payload_sizes {
269 log::debug!("running compatibility test loop for payload_size {payload_size}");
270 let start = Instant::now();
271 for i in 0..nr_tests {
272 if output_format == OutputFormat::StdOut {
273 print_progress(
274 &format!("{:?} {:<5}", test_type, format_bytes(payload_size)),
275 i,
276 nr_tests,
277 );
278 }
279 let mbit = test_fn(client, payload_size, output_format);
280 if mbit.is_finite() {
281 measurements.push(Measurement {
282 test_type,
283 payload_size,
284 mbit,
285 });
286 }
287 }
288 if output_format == OutputFormat::StdOut {
289 print_progress(
290 &format!("{:?} {:<5}", test_type, format_bytes(payload_size)),
291 nr_tests,
292 nr_tests,
293 );
294 println!();
295 }
296 if !disable_dynamic_max_payload_size && start.elapsed() > TIME_THRESHOLD {
297 log::info!("Exceeded threshold");
298 break;
299 }
300 }
301 measurements
302}
303
304pub fn run_tests_with_retries(
305 client: &Client,
306 test_type: TestType,
307 payload_sizes: Vec<usize>,
308 nr_tests: u32,
309 output_format: OutputFormat,
310 disable_dynamic_max_payload_size: bool,
311) -> (Vec<Measurement>, Vec<PayloadAttemptStats>) {
312 let options = RetryRunOptions {
313 nr_tests,
314 output_format,
315 disable_dynamic_max_payload_size,
316 };
317 run_tests_with_sleep(
318 client,
319 test_type,
320 payload_sizes,
321 options,
322 BASE_URL,
323 thread::sleep,
324 )
325}
326
327fn run_tests_with_sleep<S>(
328 client: &Client,
329 test_type: TestType,
330 payload_sizes: Vec<usize>,
331 options: RetryRunOptions,
332 base_url: &str,
333 sleep_fn: S,
334) -> (Vec<Measurement>, Vec<PayloadAttemptStats>)
335where
336 S: Fn(Duration),
337{
338 let mut measurements: Vec<Measurement> = Vec::new();
339 let mut payload_attempt_stats = Vec::new();
340
341 for payload_size in payload_sizes {
342 let label = format!("{:?} {:<5}", test_type, format_bytes(payload_size));
343 log::debug!("running tests for payload_size {payload_size}");
344 let start = Instant::now();
345
346 let mut attempts = 0;
347 let mut successes = 0;
348 let mut skipped = 0;
349 let mut retry_streak = 0;
350 let max_attempts = options
351 .nr_tests
352 .saturating_mul(MAX_ATTEMPT_FACTOR)
353 .max(options.nr_tests);
354
355 while successes < options.nr_tests && attempts < max_attempts {
356 if options.output_format == OutputFormat::StdOut {
357 print_progress(&label, successes, options.nr_tests);
358 }
359
360 attempts += 1;
361 let sample_outcome = match test_type {
362 TestType::Download => test_download_with_base_url(
363 client,
364 payload_size,
365 options.output_format,
366 base_url,
367 ),
368 TestType::Upload => {
369 test_upload_with_base_url(client, payload_size, options.output_format, base_url)
370 }
371 };
372
373 match sample_outcome {
374 SampleOutcome::Success {
375 mbits,
376 duration,
377 status_code,
378 } => {
379 log::debug!(
380 "{test_type:?} {} success: status={} duration={}ms throughput={mbits:.2} mbit/s",
381 format_bytes(payload_size),
382 status_code,
383 duration.as_millis(),
384 );
385 successes += 1;
386 retry_streak = 0;
387 measurements.push(Measurement {
388 test_type,
389 payload_size,
390 mbit: mbits,
391 });
392 }
393 SampleOutcome::RetryableFailure {
394 duration,
395 status_code,
396 retry_after,
397 reason,
398 } => {
399 skipped += 1;
400 retry_streak += 1;
401 if attempts < max_attempts {
402 let delay = compute_retry_delay(retry_streak, retry_after);
403 let status = status_code
404 .map(|code| code.to_string())
405 .unwrap_or_else(|| "transport error".to_string());
406 log::warn!(
407 "{test_type:?} {} failed ({status}) after {}ms: {reason}. retrying in {}ms ({attempts}/{max_attempts})",
408 format_bytes(payload_size),
409 duration.as_millis(),
410 delay.as_millis(),
411 );
412 if options.output_format == OutputFormat::StdOut {
413 print_retry_notice(delay, attempts, max_attempts);
414 }
415 sleep_fn(delay);
416 }
417 }
418 SampleOutcome::Failed {
419 duration,
420 status_code,
421 reason,
422 } => {
423 skipped += 1;
424 let status = status_code
425 .map(|code| code.to_string())
426 .unwrap_or_else(|| "transport error".to_string());
427 log::warn!(
428 "{test_type:?} {} failed ({status}) after {}ms: {reason}. aborting this payload",
429 format_bytes(payload_size),
430 duration.as_millis(),
431 );
432 break;
433 }
434 }
435 }
436
437 if options.output_format == OutputFormat::StdOut {
438 print_progress(&label, successes, options.nr_tests);
439 println!();
440 }
441
442 payload_attempt_stats.push(PayloadAttemptStats {
443 test_type,
444 payload_size,
445 attempts,
446 successes,
447 skipped,
448 target_successes: options.nr_tests,
449 });
450
451 if successes < options.nr_tests {
452 log::warn!(
453 "{test_type:?} {} collected {successes}/{} successful samples after {attempts} attempts",
454 format_bytes(payload_size),
455 options.nr_tests,
456 );
457 }
458
459 let duration = start.elapsed();
460 if !options.disable_dynamic_max_payload_size && duration > TIME_THRESHOLD {
461 log::info!("Exceeded threshold");
462 break;
463 }
464 }
465
466 (measurements, payload_attempt_stats)
467}
468
469pub fn test_upload(client: &Client, payload_size_bytes: usize, output_format: OutputFormat) -> f64 {
470 match test_upload_with_base_url(client, payload_size_bytes, output_format, BASE_URL) {
471 SampleOutcome::Success { mbits, .. } => mbits,
472 SampleOutcome::RetryableFailure { .. } | SampleOutcome::Failed { .. } => f64::NAN,
473 }
474}
475
476pub fn test_download(
477 client: &Client,
478 payload_size_bytes: usize,
479 output_format: OutputFormat,
480) -> f64 {
481 match test_download_with_base_url(client, payload_size_bytes, output_format, BASE_URL) {
482 SampleOutcome::Success { mbits, .. } => mbits,
483 SampleOutcome::RetryableFailure { .. } | SampleOutcome::Failed { .. } => f64::NAN,
484 }
485}
486
487fn test_upload_with_base_url(
488 client: &Client,
489 payload_size_bytes: usize,
490 output_format: OutputFormat,
491 base_url: &str,
492) -> SampleOutcome {
493 let url = format!("{base_url}/{UPLOAD_URL}");
494 let payload: Vec<u8> = vec![1; payload_size_bytes];
495 let req_builder = client.post(&url).body(payload);
496
497 let start = Instant::now();
498 let mut response = match req_builder.send() {
499 Ok(response) => response,
500 Err(error) => {
501 let duration = start.elapsed();
502 if output_format == OutputFormat::StdOut {
503 print_transport_failure(duration, payload_size_bytes, &error);
504 }
505 if error.is_timeout() {
506 return SampleOutcome::RetryableFailure {
507 duration,
508 status_code: None,
509 retry_after: None,
510 reason: error.to_string(),
511 };
512 }
513 return SampleOutcome::Failed {
514 duration,
515 status_code: None,
516 reason: error.to_string(),
517 };
518 }
519 };
520
521 let status_code = response.status();
522 let retry_after = parse_retry_after(response.headers().get(RETRY_AFTER));
523 let duration = start.elapsed();
525 let _ = std::io::copy(&mut response, &mut std::io::sink());
527 if !status_code.is_success() {
528 if output_format == OutputFormat::StdOut {
529 print_skipped_sample(duration, status_code, payload_size_bytes);
530 }
531 return if is_retryable_status(status_code) {
532 SampleOutcome::RetryableFailure {
533 duration,
534 status_code: Some(status_code),
535 retry_after,
536 reason: "retryable HTTP status".to_string(),
537 }
538 } else {
539 SampleOutcome::Failed {
540 duration,
541 status_code: Some(status_code),
542 reason: "non-retryable HTTP status".to_string(),
543 }
544 };
545 }
546
547 let mbits = (payload_size_bytes as f64 * 8.0 / 1_000_000.0) / duration.as_secs_f64();
548 if output_format == OutputFormat::StdOut {
549 print_current_speed(mbits, duration, payload_size_bytes);
550 }
551 SampleOutcome::Success {
552 mbits,
553 duration,
554 status_code,
555 }
556}
557
558fn test_download_with_base_url(
559 client: &Client,
560 payload_size_bytes: usize,
561 output_format: OutputFormat,
562 base_url: &str,
563) -> SampleOutcome {
564 let url = format!("{base_url}/{DOWNLOAD_URL}{payload_size_bytes}");
565 let req_builder = client.get(&url);
566
567 let start = Instant::now();
568 let mut response = match req_builder.send() {
569 Ok(response) => response,
570 Err(error) => {
571 let duration = start.elapsed();
572 if output_format == OutputFormat::StdOut {
573 print_transport_failure(duration, payload_size_bytes, &error);
574 }
575 if error.is_timeout() {
576 return SampleOutcome::RetryableFailure {
577 duration,
578 status_code: None,
579 retry_after: None,
580 reason: error.to_string(),
581 };
582 }
583 return SampleOutcome::Failed {
584 duration,
585 status_code: None,
586 reason: error.to_string(),
587 };
588 }
589 };
590
591 let status_code = response.status();
592 let _ = std::io::copy(&mut response, &mut std::io::sink());
594 let duration = start.elapsed();
595 if !status_code.is_success() {
596 if output_format == OutputFormat::StdOut {
597 print_skipped_sample(duration, status_code, payload_size_bytes);
598 }
599 let retry_after = parse_retry_after(response.headers().get(RETRY_AFTER));
600 return if is_retryable_status(status_code) {
601 SampleOutcome::RetryableFailure {
602 duration,
603 status_code: Some(status_code),
604 retry_after,
605 reason: "retryable HTTP status".to_string(),
606 }
607 } else {
608 SampleOutcome::Failed {
609 duration,
610 status_code: Some(status_code),
611 reason: "non-retryable HTTP status".to_string(),
612 }
613 };
614 }
615
616 let mbits = (payload_size_bytes as f64 * 8.0 / 1_000_000.0) / duration.as_secs_f64();
617 if output_format == OutputFormat::StdOut {
618 print_current_speed(mbits, duration, payload_size_bytes);
619 }
620 SampleOutcome::Success {
621 mbits,
622 duration,
623 status_code,
624 }
625}
626
627fn is_retryable_status(status_code: StatusCode) -> bool {
628 matches!(
629 status_code.as_u16(),
630 408 | 425 | 429 | 500 | 502 | 503 | 504
631 )
632}
633
634fn parse_retry_after(retry_after: Option<&reqwest::header::HeaderValue>) -> Option<Duration> {
635 retry_after
636 .and_then(|header| header.to_str().ok())
637 .and_then(|value| value.trim().parse::<u64>().ok())
638 .map(Duration::from_secs)
639}
640
641fn compute_retry_delay(retry_count: u32, retry_after: Option<Duration>) -> Duration {
642 if let Some(delay) = retry_after {
643 return delay;
644 }
645
646 let exponent = retry_count.saturating_sub(1).min(4);
647 let base_delay_ms = RETRY_BASE_BACKOFF.as_millis() as u64;
648 let capped_delay_ms = RETRY_MAX_BACKOFF.as_millis() as u64;
649 let delay_ms = base_delay_ms
650 .saturating_mul(1_u64 << exponent)
651 .min(capped_delay_ms);
652
653 let jitter = delay_ms / 5;
654 let jittered_delay = if retry_count.is_multiple_of(2) {
655 delay_ms.saturating_add(jitter).min(capped_delay_ms)
656 } else {
657 delay_ms.saturating_sub(jitter)
658 };
659
660 Duration::from_millis(jittered_delay)
661}
662
663fn print_current_speed(mbits: f64, duration: Duration, payload_size_bytes: usize) {
664 print!(
665 " {:>6.2} mbit/s | {:>5} in {:>4}ms ",
666 mbits,
667 format_bytes(payload_size_bytes),
668 duration.as_millis(),
669 );
670 flush_stdout();
671}
672
673fn print_skipped_sample(duration: Duration, status_code: StatusCode, payload_size_bytes: usize) {
674 print!(
675 " {:>6} mbit/s | {:>5} in {:>4}ms -> status: {} ",
676 "N/A",
677 format_bytes(payload_size_bytes),
678 duration.as_millis(),
679 status_code
680 );
681 flush_stdout();
682}
683
684fn print_retry_notice(delay: Duration, attempt: u32, max_attempts: u32) {
685 let delay_display = format_retry_delay(delay);
686 let eta_display = format_retry_eta(delay);
687 print!(
688 " retrying in {}{} ({}/{}) ",
689 delay_display, eta_display, attempt, max_attempts
690 );
691 flush_stdout();
692}
693
694fn print_transport_failure(duration: Duration, payload_size_bytes: usize, error: &reqwest::Error) {
695 print!(
696 " {:>6} mbit/s | {:>5} in {:>4}ms -> error: {} ",
697 "N/A",
698 format_bytes(payload_size_bytes),
699 duration.as_millis(),
700 error
701 );
702 flush_stdout();
703}
704
705fn format_retry_delay(delay: Duration) -> String {
706 let total_seconds = delay.as_secs();
707 if total_seconds == 0 {
708 return format!("{}ms", delay.as_millis());
709 }
710 if total_seconds < 60 {
711 return format!("{total_seconds}s");
712 }
713 if total_seconds < 3600 {
714 return format!("{}m {:02}s", total_seconds / 60, total_seconds % 60);
715 }
716 let hours = total_seconds / 3600;
717 let minutes = (total_seconds % 3600) / 60;
718 format!("{hours}h {minutes:02}m")
719}
720
721fn format_retry_eta(delay: Duration) -> String {
722 if delay.as_secs() < 60 {
723 return String::new();
724 }
725 let eta = Zoned::now().saturating_add(delay);
726 format!(" (until {})", eta.strftime("%H:%M:%S %Z"))
727}
728
729fn flush_stdout() {
730 let _ = std::io::stdout().flush();
731}
732
733pub fn fetch_metadata(client: &Client) -> Result<Metadata, reqwest::Error> {
734 const TRACE_URL: &str = "https://speed.cloudflare.com/cdn-cgi/trace";
735
736 let response = client.get(TRACE_URL).send()?;
737 let body = response.text()?;
738
739 let trace_data = parse_trace_response(&body);
741
742 Ok(Metadata {
743 country: trace_data
744 .get("loc")
745 .unwrap_or(&"N/A".to_string())
746 .to_owned(),
747 ip: trace_data
748 .get("ip")
749 .unwrap_or(&"N/A".to_string())
750 .to_owned(),
751 colo: trace_data
752 .get("colo")
753 .unwrap_or(&"N/A".to_string())
754 .to_owned(),
755 })
756}
757
758fn parse_trace_response(body: &str) -> std::collections::HashMap<String, String> {
766 body.lines()
767 .filter_map(|line| {
768 let parts: Vec<&str> = line.splitn(2, '=').collect();
769 if parts.len() == 2 {
770 Some((parts[0].trim().to_string(), parts[1].trim().to_string()))
771 } else {
772 log::debug!("Skipping malformed trace line: {}", line);
773 None
774 }
775 })
776 .collect()
777}
778
779#[cfg(test)]
780mod tests {
781 use super::*;
782 use std::io::{Read, Write};
783 use std::net::TcpListener;
784 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
785 use std::sync::{Arc, Mutex};
786 use std::thread;
787 use std::time::Duration;
788
789 #[derive(Clone)]
790 struct MockHttpResponse {
791 status_code: u16,
792 reason: &'static str,
793 headers: Vec<(&'static str, &'static str)>,
794 body: &'static str,
795 }
796
797 fn spawn_mock_http_server(
798 responses: Vec<MockHttpResponse>,
799 ) -> (String, Arc<AtomicUsize>, thread::JoinHandle<()>) {
800 let listener = TcpListener::bind("127.0.0.1:0").expect("failed to bind mock HTTP server");
801 let addr = listener
802 .local_addr()
803 .expect("failed to read mock HTTP server addr");
804 listener
805 .set_nonblocking(true)
806 .expect("failed to set nonblocking mode");
807 let served = Arc::new(AtomicUsize::new(0));
808 let served_counter = Arc::clone(&served);
809 let handle = thread::spawn(move || {
810 let mut idx = 0usize;
811 let mut idle_since = Instant::now();
812 while idx < responses.len() {
813 match listener.accept() {
814 Ok((mut stream, _)) => {
815 let mut buf = [0_u8; 1024];
816 let _ = stream.read(&mut buf);
817
818 let response = &responses[idx];
819 let mut response_head = format!(
820 "HTTP/1.1 {} {}\r\nContent-Length: {}\r\nConnection: close\r\n",
821 response.status_code,
822 response.reason,
823 response.body.len(),
824 );
825 let mut delay_before_body = Duration::ZERO;
826 for (header, value) in &response.headers {
827 if header.eq_ignore_ascii_case("X-Test-Delay-Ms") {
828 if let Ok(ms) = value.parse::<u64>() {
829 delay_before_body = Duration::from_millis(ms);
830 }
831 continue;
832 }
833 response_head.push_str(&format!("{header}: {value}\r\n"));
834 }
835 response_head.push_str("\r\n");
836
837 stream
838 .write_all(response_head.as_bytes())
839 .expect("failed to write mock response head");
840 if !delay_before_body.is_zero() {
841 thread::sleep(delay_before_body);
842 }
843 if !response.body.is_empty() {
844 stream
845 .write_all(response.body.as_bytes())
846 .expect("failed to write mock response body");
847 }
848 idx += 1;
849 served_counter.store(idx, AtomicOrdering::SeqCst);
850 idle_since = Instant::now();
851 }
852 Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
853 if idle_since.elapsed() > Duration::from_secs(2) {
854 break;
855 }
856 thread::sleep(Duration::from_millis(10));
857 }
858 Err(_) => break,
859 }
860 }
861 });
862
863 (format!("http://{}", addr), served, handle)
864 }
865
866 #[test]
867 fn test_payload_size_from_valid_inputs() {
868 assert_eq!(PayloadSize::from("100k".to_string()), Ok(PayloadSize::K100));
870 assert_eq!(PayloadSize::from("100K".to_string()), Ok(PayloadSize::K100));
871 assert_eq!(
872 PayloadSize::from("100kb".to_string()),
873 Ok(PayloadSize::K100)
874 );
875 assert_eq!(
876 PayloadSize::from("100KB".to_string()),
877 Ok(PayloadSize::K100)
878 );
879 assert_eq!(
880 PayloadSize::from("100000".to_string()),
881 Ok(PayloadSize::K100)
882 );
883 assert_eq!(
884 PayloadSize::from("100_000".to_string()),
885 Ok(PayloadSize::K100)
886 );
887
888 assert_eq!(PayloadSize::from("1m".to_string()), Ok(PayloadSize::M1));
890 assert_eq!(PayloadSize::from("1M".to_string()), Ok(PayloadSize::M1));
891 assert_eq!(PayloadSize::from("1mb".to_string()), Ok(PayloadSize::M1));
892 assert_eq!(PayloadSize::from("1MB".to_string()), Ok(PayloadSize::M1));
893 assert_eq!(
894 PayloadSize::from("1000000".to_string()),
895 Ok(PayloadSize::M1)
896 );
897 assert_eq!(
898 PayloadSize::from("1_000_000".to_string()),
899 Ok(PayloadSize::M1)
900 );
901
902 assert_eq!(PayloadSize::from("10m".to_string()), Ok(PayloadSize::M10));
904 assert_eq!(PayloadSize::from("10M".to_string()), Ok(PayloadSize::M10));
905 assert_eq!(PayloadSize::from("10mb".to_string()), Ok(PayloadSize::M10));
906 assert_eq!(PayloadSize::from("10MB".to_string()), Ok(PayloadSize::M10));
907 assert_eq!(
908 PayloadSize::from("10000000".to_string()),
909 Ok(PayloadSize::M10)
910 );
911 assert_eq!(
912 PayloadSize::from("10_000_000".to_string()),
913 Ok(PayloadSize::M10)
914 );
915
916 assert_eq!(PayloadSize::from("25m".to_string()), Ok(PayloadSize::M25));
918 assert_eq!(PayloadSize::from("25M".to_string()), Ok(PayloadSize::M25));
919 assert_eq!(PayloadSize::from("25mb".to_string()), Ok(PayloadSize::M25));
920 assert_eq!(PayloadSize::from("25MB".to_string()), Ok(PayloadSize::M25));
921 assert_eq!(
922 PayloadSize::from("25000000".to_string()),
923 Ok(PayloadSize::M25)
924 );
925 assert_eq!(
926 PayloadSize::from("25_000_000".to_string()),
927 Ok(PayloadSize::M25)
928 );
929
930 assert_eq!(PayloadSize::from("100m".to_string()), Ok(PayloadSize::M100));
932 assert_eq!(PayloadSize::from("100M".to_string()), Ok(PayloadSize::M100));
933 assert_eq!(
934 PayloadSize::from("100mb".to_string()),
935 Ok(PayloadSize::M100)
936 );
937 assert_eq!(
938 PayloadSize::from("100MB".to_string()),
939 Ok(PayloadSize::M100)
940 );
941 assert_eq!(
942 PayloadSize::from("100000000".to_string()),
943 Ok(PayloadSize::M100)
944 );
945 assert_eq!(
946 PayloadSize::from("100_000_000".to_string()),
947 Ok(PayloadSize::M100)
948 );
949 }
950
951 #[test]
952 fn test_payload_size_from_invalid_inputs() {
953 assert!(PayloadSize::from("invalid".to_string()).is_err());
954 assert!(PayloadSize::from("50m".to_string()).is_err());
955 assert!(PayloadSize::from("200k".to_string()).is_err());
956 assert!(PayloadSize::from("".to_string()).is_err());
957 assert!(PayloadSize::from("1g".to_string()).is_err());
958
959 let error_msg = PayloadSize::from("invalid".to_string()).unwrap_err();
960 assert_eq!(
961 error_msg,
962 "Value needs to be one of 100k, 1m, 10m, 25m or 100m"
963 );
964 }
965
966 #[test]
967 fn test_payload_size_values() {
968 assert_eq!(PayloadSize::K100 as usize, 100_000);
969 assert_eq!(PayloadSize::M1 as usize, 1_000_000);
970 assert_eq!(PayloadSize::M10 as usize, 10_000_000);
971 assert_eq!(PayloadSize::M25 as usize, 25_000_000);
972 assert_eq!(PayloadSize::M100 as usize, 100_000_000);
973 }
974
975 #[test]
976 fn test_payload_size_sizes_from_max() {
977 assert_eq!(
978 PayloadSize::sizes_from_max(PayloadSize::K100),
979 vec![100_000]
980 );
981 assert_eq!(
982 PayloadSize::sizes_from_max(PayloadSize::M1),
983 vec![100_000, 1_000_000]
984 );
985 assert_eq!(
986 PayloadSize::sizes_from_max(PayloadSize::M10),
987 vec![100_000, 1_000_000, 10_000_000]
988 );
989 assert_eq!(
990 PayloadSize::sizes_from_max(PayloadSize::M25),
991 vec![100_000, 1_000_000, 10_000_000, 25_000_000]
992 );
993 assert_eq!(
994 PayloadSize::sizes_from_max(PayloadSize::M100),
995 vec![100_000, 1_000_000, 10_000_000, 25_000_000, 100_000_000]
996 );
997 }
998
999 #[test]
1000 fn test_payload_size_display() {
1001 let size = PayloadSize::K100;
1002 let display_str = format!("{size}");
1003 assert!(!display_str.is_empty());
1004 }
1005
1006 #[test]
1007 fn test_fetch_metadata_ipv6_timeout_error() {
1008 use std::time::Duration;
1009
1010 let client = reqwest::blocking::Client::builder()
1011 .local_address("::".parse::<std::net::IpAddr>().unwrap())
1012 .timeout(Duration::from_millis(100))
1013 .build()
1014 .unwrap();
1015
1016 let result = fetch_metadata(&client);
1017 assert!(result.is_err());
1018 }
1019
1020 #[test]
1021 fn test_parse_trace_response_valid() {
1022 let body = "ip=178.197.211.5\ncolo=ZRH\nloc=CH\nts=1768250090.213\n";
1023 let parsed = parse_trace_response(body);
1024
1025 assert_eq!(parsed.get("ip"), Some(&"178.197.211.5".to_string()));
1026 assert_eq!(parsed.get("colo"), Some(&"ZRH".to_string()));
1027 assert_eq!(parsed.get("loc"), Some(&"CH".to_string()));
1028 assert_eq!(parsed.get("ts"), Some(&"1768250090.213".to_string()));
1029 }
1030
1031 #[test]
1032 fn test_parse_trace_response_empty() {
1033 let body = "";
1034 let parsed = parse_trace_response(body);
1035 assert!(parsed.is_empty());
1036 }
1037
1038 #[test]
1039 fn test_parse_trace_response_malformed_lines() {
1040 let body = "ip=178.197.211.5\nmalformed_line\ncolo=ZRH\n";
1041 let parsed = parse_trace_response(body);
1042
1043 assert_eq!(parsed.get("ip"), Some(&"178.197.211.5".to_string()));
1044 assert_eq!(parsed.get("colo"), Some(&"ZRH".to_string()));
1045 assert_eq!(parsed.len(), 2); }
1047
1048 #[test]
1049 fn test_parse_trace_response_with_equals_in_value() {
1050 let body = "key1=value1\nkey2=value=with=equals\n";
1051 let parsed = parse_trace_response(body);
1052
1053 assert_eq!(parsed.get("key1"), Some(&"value1".to_string()));
1054 assert_eq!(parsed.get("key2"), Some(&"value=with=equals".to_string()));
1055 }
1056
1057 #[test]
1058 fn test_run_tests_retries_429_and_records_success() {
1059 let responses = vec![
1060 MockHttpResponse {
1061 status_code: 429,
1062 reason: "Too Many Requests",
1063 headers: vec![("Retry-After", "0")],
1064 body: "",
1065 },
1066 MockHttpResponse {
1067 status_code: 200,
1068 reason: "OK",
1069 headers: vec![],
1070 body: "ok",
1071 },
1072 ];
1073 let (base_url, served_counter, handle) = spawn_mock_http_server(responses);
1074 let client = reqwest::blocking::Client::builder()
1075 .timeout(Duration::from_secs(2))
1076 .build()
1077 .expect("failed to build test client");
1078
1079 let (measurements, payload_stats) = run_tests_with_sleep(
1080 &client,
1081 TestType::Download,
1082 vec![100_000],
1083 RetryRunOptions {
1084 nr_tests: 1,
1085 output_format: OutputFormat::None,
1086 disable_dynamic_max_payload_size: true,
1087 },
1088 &base_url,
1089 |_| {},
1090 );
1091
1092 assert_eq!(measurements.len(), 1);
1093 assert_eq!(payload_stats.len(), 1);
1094 assert_eq!(payload_stats[0].attempts, 2);
1095 assert_eq!(payload_stats[0].successes, 1);
1096 assert_eq!(payload_stats[0].skipped, 1);
1097
1098 handle.join().expect("mock server thread panicked");
1099 assert_eq!(served_counter.load(AtomicOrdering::SeqCst), 2);
1100 }
1101
1102 #[test]
1103 fn test_run_tests_retry_delay_uses_retry_streak_not_total_attempts() {
1104 let mut responses = (0..5)
1105 .map(|_| MockHttpResponse {
1106 status_code: 200,
1107 reason: "OK",
1108 headers: vec![],
1109 body: "ok",
1110 })
1111 .collect::<Vec<_>>();
1112 responses.push(MockHttpResponse {
1113 status_code: 429,
1114 reason: "Too Many Requests",
1115 headers: vec![],
1116 body: "",
1117 });
1118 responses.push(MockHttpResponse {
1119 status_code: 200,
1120 reason: "OK",
1121 headers: vec![],
1122 body: "ok",
1123 });
1124
1125 let (base_url, served_counter, handle) = spawn_mock_http_server(responses);
1126 let client = reqwest::blocking::Client::builder()
1127 .timeout(Duration::from_secs(2))
1128 .build()
1129 .expect("failed to build test client");
1130 let observed_delays = Arc::new(Mutex::new(Vec::<Duration>::new()));
1131 let delay_sink = Arc::clone(&observed_delays);
1132
1133 let (measurements, payload_stats) = run_tests_with_sleep(
1134 &client,
1135 TestType::Download,
1136 vec![100_000],
1137 RetryRunOptions {
1138 nr_tests: 6,
1139 output_format: OutputFormat::None,
1140 disable_dynamic_max_payload_size: true,
1141 },
1142 &base_url,
1143 move |delay| {
1144 delay_sink
1145 .lock()
1146 .expect("failed to lock delay sink")
1147 .push(delay);
1148 },
1149 );
1150
1151 assert_eq!(measurements.len(), 6);
1152 assert_eq!(payload_stats.len(), 1);
1153 assert_eq!(payload_stats[0].attempts, 7);
1154 assert_eq!(payload_stats[0].successes, 6);
1155 assert_eq!(payload_stats[0].skipped, 1);
1156 assert_eq!(
1157 *observed_delays
1158 .lock()
1159 .expect("failed to read observed delays"),
1160 vec![compute_retry_delay(1, None)]
1161 );
1162
1163 handle.join().expect("mock server thread panicked");
1164 assert_eq!(served_counter.load(AtomicOrdering::SeqCst), 7);
1165 }
1166
1167 #[test]
1168 fn test_run_tests_retry_delay_resets_after_success() {
1169 let responses = vec![
1170 MockHttpResponse {
1171 status_code: 429,
1172 reason: "Too Many Requests",
1173 headers: vec![],
1174 body: "",
1175 },
1176 MockHttpResponse {
1177 status_code: 200,
1178 reason: "OK",
1179 headers: vec![],
1180 body: "ok",
1181 },
1182 MockHttpResponse {
1183 status_code: 429,
1184 reason: "Too Many Requests",
1185 headers: vec![],
1186 body: "",
1187 },
1188 MockHttpResponse {
1189 status_code: 200,
1190 reason: "OK",
1191 headers: vec![],
1192 body: "ok",
1193 },
1194 ];
1195
1196 let (base_url, served_counter, handle) = spawn_mock_http_server(responses);
1197 let client = reqwest::blocking::Client::builder()
1198 .timeout(Duration::from_secs(2))
1199 .build()
1200 .expect("failed to build test client");
1201 let observed_delays = Arc::new(Mutex::new(Vec::<Duration>::new()));
1202 let delay_sink = Arc::clone(&observed_delays);
1203
1204 let (measurements, payload_stats) = run_tests_with_sleep(
1205 &client,
1206 TestType::Download,
1207 vec![100_000],
1208 RetryRunOptions {
1209 nr_tests: 2,
1210 output_format: OutputFormat::None,
1211 disable_dynamic_max_payload_size: true,
1212 },
1213 &base_url,
1214 move |delay| {
1215 delay_sink
1216 .lock()
1217 .expect("failed to lock delay sink")
1218 .push(delay);
1219 },
1220 );
1221
1222 assert_eq!(measurements.len(), 2);
1223 assert_eq!(payload_stats.len(), 1);
1224 assert_eq!(payload_stats[0].attempts, 4);
1225 assert_eq!(payload_stats[0].successes, 2);
1226 assert_eq!(payload_stats[0].skipped, 2);
1227 assert_eq!(
1228 *observed_delays
1229 .lock()
1230 .expect("failed to read observed delays"),
1231 vec![compute_retry_delay(1, None), compute_retry_delay(1, None)]
1232 );
1233
1234 handle.join().expect("mock server thread panicked");
1235 assert_eq!(served_counter.load(AtomicOrdering::SeqCst), 4);
1236 }
1237
1238 #[test]
1239 fn test_run_tests_stops_after_max_attempts_on_retryable_failures() {
1240 let responses = (0..8)
1241 .map(|_| MockHttpResponse {
1242 status_code: 429,
1243 reason: "Too Many Requests",
1244 headers: vec![("Retry-After", "0")],
1245 body: "",
1246 })
1247 .collect::<Vec<_>>();
1248 let (base_url, served_counter, handle) = spawn_mock_http_server(responses);
1249 let client = reqwest::blocking::Client::builder()
1250 .timeout(Duration::from_secs(2))
1251 .build()
1252 .expect("failed to build test client");
1253
1254 let (measurements, payload_stats) = run_tests_with_sleep(
1255 &client,
1256 TestType::Download,
1257 vec![100_000],
1258 RetryRunOptions {
1259 nr_tests: 2,
1260 output_format: OutputFormat::None,
1261 disable_dynamic_max_payload_size: true,
1262 },
1263 &base_url,
1264 |_| {},
1265 );
1266
1267 assert!(measurements.is_empty());
1268 assert_eq!(payload_stats.len(), 1);
1269 assert_eq!(payload_stats[0].attempts, 8);
1270 assert_eq!(payload_stats[0].successes, 0);
1271 assert_eq!(payload_stats[0].skipped, 8);
1272
1273 handle.join().expect("mock server thread panicked");
1274 assert_eq!(served_counter.load(AtomicOrdering::SeqCst), 8);
1275 }
1276
1277 #[test]
1278 fn test_run_tests_does_not_retry_non_retryable_4xx() {
1279 let responses = vec![MockHttpResponse {
1280 status_code: 404,
1281 reason: "Not Found",
1282 headers: vec![],
1283 body: "",
1284 }];
1285 let (base_url, served_counter, handle) = spawn_mock_http_server(responses);
1286 let client = reqwest::blocking::Client::builder()
1287 .timeout(Duration::from_secs(2))
1288 .build()
1289 .expect("failed to build test client");
1290
1291 let (measurements, payload_stats) = run_tests_with_sleep(
1292 &client,
1293 TestType::Download,
1294 vec![100_000],
1295 RetryRunOptions {
1296 nr_tests: 2,
1297 output_format: OutputFormat::None,
1298 disable_dynamic_max_payload_size: true,
1299 },
1300 &base_url,
1301 |_| {},
1302 );
1303
1304 assert!(measurements.is_empty());
1305 assert_eq!(payload_stats.len(), 1);
1306 assert_eq!(payload_stats[0].attempts, 1);
1307 assert_eq!(payload_stats[0].successes, 0);
1308 assert_eq!(payload_stats[0].skipped, 1);
1309
1310 handle.join().expect("mock server thread panicked");
1311 assert_eq!(served_counter.load(AtomicOrdering::SeqCst), 1);
1312 }
1313
1314 #[test]
1315 fn test_upload_duration_excludes_delayed_response_body() {
1316 let responses = vec![MockHttpResponse {
1317 status_code: 200,
1318 reason: "OK",
1319 headers: vec![("X-Test-Delay-Ms", "300")],
1320 body: "ok",
1321 }];
1322 let (base_url, served_counter, handle) = spawn_mock_http_server(responses);
1323 let client = reqwest::blocking::Client::builder()
1324 .timeout(Duration::from_secs(2))
1325 .build()
1326 .expect("failed to build test client");
1327
1328 let wall_start = Instant::now();
1329 let outcome = test_upload_with_base_url(&client, 100_000, OutputFormat::None, &base_url);
1330 let wall_elapsed = wall_start.elapsed();
1331
1332 handle.join().expect("mock server thread panicked");
1333 assert_eq!(served_counter.load(AtomicOrdering::SeqCst), 1);
1334
1335 match outcome {
1336 SampleOutcome::Success { duration, .. } => {
1337 assert!(
1338 duration < Duration::from_millis(200),
1339 "upload duration should stop before delayed response drain: {duration:?}"
1340 );
1341 assert!(
1342 wall_elapsed >= Duration::from_millis(250),
1343 "overall call should include delayed body drain: {wall_elapsed:?}"
1344 );
1345 }
1346 other => panic!("expected upload success, got {other:?}"),
1347 }
1348 }
1349
1350 #[test]
1351 fn test_upload_retryable_failure_parses_retry_after_without_drain_skew() {
1352 let responses = vec![MockHttpResponse {
1353 status_code: 429,
1354 reason: "Too Many Requests",
1355 headers: vec![("Retry-After", "1"), ("X-Test-Delay-Ms", "300")],
1356 body: "retry later",
1357 }];
1358 let (base_url, served_counter, handle) = spawn_mock_http_server(responses);
1359 let client = reqwest::blocking::Client::builder()
1360 .timeout(Duration::from_secs(2))
1361 .build()
1362 .expect("failed to build test client");
1363
1364 let wall_start = Instant::now();
1365 let outcome = test_upload_with_base_url(&client, 100_000, OutputFormat::None, &base_url);
1366 let wall_elapsed = wall_start.elapsed();
1367
1368 handle.join().expect("mock server thread panicked");
1369 assert_eq!(served_counter.load(AtomicOrdering::SeqCst), 1);
1370
1371 match outcome {
1372 SampleOutcome::RetryableFailure {
1373 duration,
1374 status_code,
1375 retry_after,
1376 ..
1377 } => {
1378 assert!(
1379 duration < Duration::from_millis(200),
1380 "retryable failure duration should stop before delayed body drain: {duration:?}"
1381 );
1382 assert_eq!(status_code, Some(StatusCode::TOO_MANY_REQUESTS));
1383 assert_eq!(retry_after, Some(Duration::from_secs(1)));
1384 assert!(
1385 wall_elapsed >= Duration::from_millis(250),
1386 "overall call should include delayed body drain: {wall_elapsed:?}"
1387 );
1388 }
1389 other => panic!("expected retryable upload failure, got {other:?}"),
1390 }
1391 }
1392
1393 #[test]
1394 fn test_fetch_metadata_integration() {
1395 let client = reqwest::blocking::Client::builder()
1398 .timeout(std::time::Duration::from_secs(10))
1399 .build()
1400 .expect("Failed to create HTTP client");
1401
1402 let result = fetch_metadata(&client);
1403
1404 assert!(
1405 result.is_ok(),
1406 "Failed to fetch metadata: {:?}",
1407 result.err()
1408 );
1409 let metadata = result.unwrap();
1410
1411 assert_ne!(metadata.ip, "N/A", "IP field should be populated");
1413 assert_ne!(
1414 metadata.colo, "N/A",
1415 "Colo field should be populated (CRITICAL: Cloudflare API may have changed)"
1416 );
1417 assert_ne!(
1418 metadata.country, "N/A",
1419 "Country field should be populated (CRITICAL: Cloudflare API may have changed)"
1420 );
1421
1422 assert!(
1424 metadata.ip.contains('.') || metadata.ip.contains(':'),
1425 "IP should be in valid format (IPv4 or IPv6): {}",
1426 metadata.ip
1427 );
1428
1429 assert_eq!(
1431 metadata.colo.len(),
1432 3,
1433 "Colo should be 3-letter IATA code: {}",
1434 metadata.colo
1435 );
1436 assert!(
1437 metadata.colo.chars().all(|c| c.is_ascii_uppercase()),
1438 "Colo should be uppercase letters: {}",
1439 metadata.colo
1440 );
1441
1442 assert_eq!(
1444 metadata.country.len(),
1445 2,
1446 "Country should be 2-letter ISO code: {}",
1447 metadata.country
1448 );
1449 assert!(
1450 metadata.country.chars().all(|c| c.is_ascii_uppercase()),
1451 "Country should be uppercase letters: {}",
1452 metadata.country
1453 );
1454
1455 eprintln!(
1456 "✓ Metadata integration test passed: ip={}, colo={}, country={}",
1457 metadata.ip, metadata.colo, metadata.country
1458 );
1459 }
1460}