1use crate::error::{Result, SpeedtestError};
8use crate::http::HttpClient;
9use crate::models::*;
10use crate::utils::distance;
11use rayon::prelude::*;
12use std::collections::HashMap;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17pub struct Speedtest {
18 config: Option<Config>,
19 client: HttpClient,
20 servers: HashMap<u32, Vec<Server>>,
21 closest: Vec<Server>,
22 best: Option<Server>,
23 lat_lon: (f64, f64),
24 debug: bool,
25}
26
27impl Speedtest {
28 pub fn new(timeout: u64, secure: bool, source_address: Option<String>) -> Result<Self> {
29 let client = HttpClient::new(timeout, secure, source_address)?;
30
31 Ok(Self {
32 config: None,
33 client,
34 servers: HashMap::new(),
35 closest: Vec::new(),
36 best: None,
37 lat_lon: (0.0, 0.0),
38 debug: false,
39 })
40 }
41
42 pub fn set_debug(&mut self, debug: bool) {
43 self.debug = debug;
44 }
45
46 pub fn get_config(&mut self) -> Result<&Config> {
47 if self.config.is_some() {
48 return Ok(self.config.as_ref().unwrap());
49 }
50
51 let xml = self
52 .client
53 .get_text("://www.speedtest.net/speedtest-config.php")?;
54
55 use quick_xml::events::Event;
57 use quick_xml::Reader;
58 use std::collections::HashMap;
59
60 let mut reader = Reader::from_str(&xml);
61 reader.trim_text(true);
62
63 let mut client_attrs: HashMap<String, String> = HashMap::new();
64 let mut server_config_attrs: HashMap<String, String> = HashMap::new();
65 let mut download_attrs: HashMap<String, String> = HashMap::new();
66 let mut upload_attrs: HashMap<String, String> = HashMap::new();
67
68 let mut buf = Vec::new();
69 loop {
70 match reader.read_event_into(&mut buf) {
71 Ok(Event::Empty(e)) => {
72 let name = String::from_utf8_lossy(e.name().as_ref()).to_string();
73
74 let attrs: HashMap<String, String> = e
75 .attributes()
76 .filter_map(|a| a.ok())
77 .map(|a| {
78 (
79 String::from_utf8_lossy(a.key.as_ref()).to_string(),
80 String::from_utf8_lossy(&a.value).to_string(),
81 )
82 })
83 .collect();
84
85 match name.as_str() {
86 "client" => client_attrs = attrs,
87 "server-config" => server_config_attrs = attrs,
88 "download" => download_attrs = attrs,
89 "upload" => upload_attrs = attrs,
90 _ => {}
91 }
92 }
93 Ok(Event::Eof) => break,
94 Err(e) => {
95 return Err(SpeedtestError::ConfigRetrieval(format!(
96 "XML parse error at position {}: {:?}",
97 reader.buffer_position(),
98 e
99 )))
100 }
101 _ => {}
102 }
103 buf.clear();
104 }
105
106 let client = Client {
108 ip: client_attrs.get("ip").cloned().unwrap_or_default(),
109 lat: client_attrs.get("lat").cloned().unwrap_or_default(),
110 lon: client_attrs.get("lon").cloned().unwrap_or_default(),
111 isp: client_attrs.get("isp").cloned().unwrap_or_default(),
112 country: client_attrs.get("country").cloned().unwrap_or_default(),
113 isprating: client_attrs.get("isprating").cloned().unwrap_or_default(),
114 rating: client_attrs.get("rating").cloned().unwrap_or_default(),
115 ispdlavg: client_attrs.get("ispdlavg").cloned().unwrap_or_default(),
116 ispulavg: client_attrs.get("ispulavg").cloned().unwrap_or_default(),
117 loggedin: client_attrs.get("loggedin").cloned().unwrap_or_default(),
118 };
119
120 if client.ip.is_empty() {
122 return Err(SpeedtestError::ConfigRetrieval(
123 "Client IP address not provided by server".to_string()
124 ));
125 }
126
127 let ignore_servers: Vec<u32> = server_config_attrs
128 .get("ignoreids")
129 .unwrap_or(&String::new())
130 .split(',')
131 .filter_map(|s| s.trim().parse().ok())
132 .collect();
133
134 let ratio: usize = upload_attrs
135 .get("ratio")
136 .and_then(|s| s.parse().ok())
137 .unwrap_or(1);
138
139 let upload_max: usize = upload_attrs
140 .get("maxchunkcount")
141 .and_then(|s| s.parse().ok())
142 .unwrap_or(4);
143
144 let up_sizes = vec![32768, 65536, 131072, 262144, 524288, 1048576, 7340032];
145 let upload_sizes: Vec<usize> = if ratio > 0 && ratio <= up_sizes.len() {
146 up_sizes[(ratio - 1)..].to_vec()
147 } else {
148 up_sizes
149 };
150
151 let size_count = upload_sizes.len();
152 let upload_count = (upload_max as f64 / size_count as f64).ceil() as usize;
153
154 let lat: f64 = client.lat.parse()
155 .unwrap_or_else(|_| {
156 eprintln!("Warning: Could not parse latitude '{}', using default 0.0", client.lat);
157 0.0
158 });
159 let lon: f64 = client.lon.parse()
160 .unwrap_or_else(|_| {
161 eprintln!("Warning: Could not parse longitude '{}', using default 0.0", client.lon);
162 0.0
163 });
164
165 self.lat_lon = (lat, lon);
166
167 let config = Config {
168 client,
169 ignore_servers,
170 sizes: Sizes {
171 upload: upload_sizes,
172 download: vec![350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000],
173 },
174 counts: Counts {
175 upload: upload_count,
176 download: download_attrs
177 .get("threadsperurl")
178 .and_then(|s| s.parse().ok())
179 .unwrap_or(4),
180 },
181 threads: Threads {
182 upload: upload_attrs
183 .get("threads")
184 .and_then(|s| s.parse().ok())
185 .unwrap_or(8),
186 download: server_config_attrs
187 .get("threadcount")
188 .and_then(|s| s.parse::<usize>().ok())
189 .unwrap_or(4)
190 * 2,
191 },
192 length: Length {
193 upload: upload_attrs
194 .get("testlength")
195 .and_then(|s| s.parse().ok())
196 .unwrap_or(10),
197 download: download_attrs
198 .get("testlength")
199 .and_then(|s| s.parse().ok())
200 .unwrap_or(10),
201 },
202 upload_max: upload_count * size_count,
203 };
204
205 self.config = Some(config);
206 Ok(self.config.as_ref().unwrap())
207 }
208
209 pub fn get_servers(
210 &mut self,
211 server_ids: Option<&[u32]>,
212 exclude: Option<&[u32]>,
213 ) -> Result<&HashMap<u32, Vec<Server>>> {
214 self.servers.clear();
215
216 let urls = vec![
217 "://www.speedtest.net/speedtest-servers-static.php",
218 "http://c.speedtest.net/speedtest-servers-static.php",
219 "://www.speedtest.net/speedtest-servers.php",
220 "http://c.speedtest.net/speedtest-servers.php",
221 ];
222
223 for url in urls {
224 if self.debug {
225 eprintln!("Trying to fetch servers from: {}", url);
226 }
227 match self.fetch_servers(url, server_ids, exclude) {
228 Ok(_) if !self.servers.is_empty() => {
229 if self.debug {
230 eprintln!("Found {} unique servers", self.servers.len());
231 }
232 break;
233 }
234 Ok(_) => {
235 if self.debug {
236 eprintln!("No servers found from this URL");
237 }
238 }
239 Err(e) => {
240 if self.debug {
241 eprintln!("Failed to fetch: {}", e);
242 }
243 continue;
244 }
245 }
246 }
247
248 if (server_ids.is_some() || exclude.is_some()) && self.servers.is_empty() {
249 return Err(SpeedtestError::NoMatchedServers);
250 }
251
252 if self.debug {
253 eprintln!("Total servers available: {}", self.servers.len());
254 }
255 Ok(&self.servers)
256 }
257
258 fn fetch_servers(
259 &mut self,
260 url: &str,
261 server_ids: Option<&[u32]>,
262 exclude: Option<&[u32]>,
263 ) -> Result<()> {
264 let xml = self.client.get_text(url)?;
265
266 use quick_xml::events::Event;
268 use quick_xml::Reader;
269
270 let mut reader = Reader::from_str(&xml);
271 reader.trim_text(true);
272
273 let config = self.config.as_ref()
274 .ok_or_else(|| SpeedtestError::ConfigRetrieval("Config not loaded".to_string()))?;
275
276 let mut buf = Vec::new();
277 loop {
278 match reader.read_event_into(&mut buf) {
279 Ok(Event::Empty(e)) => {
280 let name = String::from_utf8_lossy(e.name().as_ref()).to_string();
281
282 if name == "server" {
283 let attrs: HashMap<String, String> = e
284 .attributes()
285 .filter_map(|a| a.ok())
286 .map(|a| {
287 (
288 String::from_utf8_lossy(a.key.as_ref()).to_string(),
289 String::from_utf8_lossy(&a.value).to_string(),
290 )
291 })
292 .collect();
293
294 let id: u32 = attrs.get("id")
295 .and_then(|s| s.parse().ok())
296 .unwrap_or(0);
297
298 if id == 0 {
299 continue;
300 }
301
302 if let Some(ids) = server_ids {
303 if !ids.contains(&id) {
304 continue;
305 }
306 }
307
308 if config.ignore_servers.contains(&id) {
309 continue;
310 }
311
312 if let Some(excl) = exclude {
313 if excl.contains(&id) {
314 continue;
315 }
316 }
317
318 let lat: f64 = attrs.get("lat")
319 .and_then(|s| s.parse().ok())
320 .unwrap_or(0.0);
321 let lon: f64 = attrs.get("lon")
322 .and_then(|s| s.parse().ok())
323 .unwrap_or(0.0);
324
325 let d = distance(self.lat_lon.0, self.lat_lon.1, lat, lon);
326
327 let server = Server {
328 id,
329 sponsor: attrs.get("sponsor").cloned().unwrap_or_default(),
330 name: attrs.get("name").cloned().unwrap_or_default(),
331 country: attrs.get("country").cloned().unwrap_or_default(),
332 lat,
333 lon,
334 url: attrs.get("url").cloned().unwrap_or_default(),
335 d,
336 latency: 0.0,
337 };
338
339 self.servers.entry(id).or_insert_with(Vec::new).push(server);
340 }
341 }
342 Ok(Event::Eof) => break,
343 Err(e) => {
344 return Err(SpeedtestError::ServersRetrieval(format!(
345 "XML parse error at position {}: {:?}",
346 reader.buffer_position(),
347 e
348 )))
349 }
350 _ => {}
351 }
352 buf.clear();
353 }
354
355 Ok(())
356 }
357
358 pub fn get_closest_servers(&mut self, limit: usize) -> Result<&[Server]> {
359 if self.servers.is_empty() {
360 self.get_servers(None, None)?;
361 }
362
363 let mut all_servers: Vec<Server> = self
364 .servers
365 .values()
366 .flatten()
367 .cloned()
368 .collect();
369
370 if self.debug {
371 eprintln!("Total servers before sorting: {}", all_servers.len());
372 }
373
374 all_servers.sort_by(|a, b| a.d.partial_cmp(&b.d).unwrap());
375
376 self.closest = all_servers.into_iter().take(limit).collect();
377
378 if self.debug {
379 eprintln!("Closest {} servers:", self.closest.len());
380 for (i, s) in self.closest.iter().enumerate() {
381 eprintln!(" {}. {} ({}) - {:.2} km", i+1, s.sponsor, s.name, s.d);
382 }
383 }
384
385 Ok(&self.closest)
386 }
387
388 pub fn get_best_server(&mut self, servers: Option<&[Server]>) -> Result<&Server> {
389 let servers_to_test = if let Some(s) = servers {
390 s.to_vec()
391 } else {
392 if self.closest.is_empty() {
393 self.get_closest_servers(5)?;
394 }
395 self.closest.clone()
396 };
397
398 let results: Vec<(f64, Server)> = servers_to_test
399 .par_iter()
400 .filter_map(|server| {
401 let latency = self.measure_latency(server).ok()?;
402 Some((latency, server.clone()))
403 })
404 .collect();
405
406 let best = results
407 .into_iter()
408 .min_by(|a, b| a.0.partial_cmp(&b.0).unwrap())
409 .ok_or_else(|| SpeedtestError::BestServerFailure(
410 "Unable to connect to servers to test latency".to_string()
411 ))?;
412
413 let mut best_server = best.1;
414 best_server.latency = best.0;
415 self.best = Some(best_server);
416
417 Ok(self.best.as_ref().unwrap())
418 }
419
420 fn measure_latency(&self, server: &Server) -> Result<f64> {
421 let url_parts: Vec<&str> = server.url.split('/').collect();
422 let base_url = url_parts[..url_parts.len() - 1].join("/");
423
424 if self.debug {
425 eprintln!("Testing latency for server: {} ({})", server.sponsor, server.name);
426 eprintln!(" Server URL: {}", server.url);
427 eprintln!(" Base URL: {}", base_url);
428 }
429
430 let mut latencies = Vec::new();
431
432 for i in 0..3 {
433 use std::time::{SystemTime, UNIX_EPOCH};
434 let timestamp = SystemTime::now()
435 .duration_since(UNIX_EPOCH)
436 .unwrap()
437 .as_millis();
438
439 let url = format!("{}/latency.txt?x={}.{}", base_url, timestamp, i);
440
441 if self.debug {
442 eprintln!(" Attempt {} - Testing URL: {}", i+1, url);
443 }
444
445 let start = Instant::now();
446 match self.client.get_text(&url) {
447 Ok(response) if response.trim() == "test=test" => {
448 let latency = start.elapsed().as_secs_f64() * 1000.0;
449 if self.debug {
450 eprintln!(" SUCCESS - Latency: {:.3} ms", latency);
451 }
452 latencies.push(latency);
453 }
454 Ok(response) => {
455 if self.debug {
456 eprintln!(" Unexpected response: '{}'", response.trim());
457 }
458 latencies.push(3600.0);
459 }
460 Err(e) => {
461 if self.debug {
462 eprintln!(" Error: {}", e);
463 }
464 latencies.push(3600.0);
465 }
466 }
467 }
468
469 if latencies.iter().all(|&l| l >= 3600.0) {
470 return Err(SpeedtestError::BestServerFailure(
471 format!("All latency tests failed for {}", server.sponsor)
472 ));
473 }
474
475 let avg = latencies.iter().sum::<f64>() / latencies.len() as f64;
476
477 if self.debug {
478 eprintln!(" Average latency: {:.3} ms", avg);
479 }
480
481 Ok((avg * 1000.0).round() / 1000.0)
482 }
483
484 pub fn download<F>(&self, _callback: F, threads: Option<usize>) -> Result<f64>
485 where
486 F: Fn(usize, usize, bool, bool) + Send + Sync,
487 {
488 let config = self.config.as_ref()
489 .ok_or_else(|| SpeedtestError::ConfigRetrieval("Config not loaded".to_string()))?;
490 let server = self.best.as_ref()
491 .ok_or(SpeedtestError::MissingBestServer)?;
492
493 let base_url = server.url.split('/').collect::<Vec<_>>();
494 let base_url = base_url[..base_url.len() - 1].join("/");
495
496 let mut urls = Vec::new();
497 for size in &config.sizes.download {
498 for _ in 0..config.counts.download {
499 urls.push(format!("{}/random{}x{}.jpg", base_url, size, size));
500 }
501 }
502
503 if self.debug {
504 eprintln!("Download test configuration:");
505 eprintln!(" Base URL: {}", base_url);
506 eprintln!(" Total URLs: {}", urls.len());
507 eprintln!(" Threads: {}", threads.unwrap_or(config.threads.download));
508 eprintln!(" Test duration: {} seconds", config.length.download);
509 }
510
511 let max_threads = threads.unwrap_or(config.threads.download);
512 let test_duration = Duration::from_secs(config.length.download);
513
514 let total_bytes = Arc::new(AtomicU64::new(0));
515 let start_time = Instant::now();
516 let stop_flag = Arc::new(AtomicBool::new(false));
517
518 let handles: Vec<_> = (0..max_threads)
520 .map(|_| {
521 let urls = urls.clone();
522 let total_bytes = Arc::clone(&total_bytes);
523 let stop_flag = Arc::clone(&stop_flag);
524 let client = HttpClient::new(10, false, None).unwrap();
525
526 std::thread::spawn(move || {
527 let mut url_index = 0;
528 while !stop_flag.load(Ordering::Relaxed) {
529 if url_index >= urls.len() {
531 url_index = 0;
532 }
533
534 if let Ok(data) = client.get_bytes(&urls[url_index]) {
535 total_bytes.fetch_add(data.len() as u64, Ordering::Relaxed);
536 }
537
538 url_index += 1;
539 }
540 })
541 })
542 .collect();
543
544 std::thread::sleep(test_duration);
546 stop_flag.store(true, Ordering::Relaxed);
547
548 for handle in handles {
549 let _ = handle.join();
550 }
551
552 let elapsed = start_time.elapsed().as_secs_f64();
553 let bytes = total_bytes.load(Ordering::Relaxed);
554 let speed = (bytes as f64 / elapsed) * 8.0;
555
556 if self.debug {
557 eprintln!("Download test results:");
558 eprintln!(" Bytes downloaded: {}", bytes);
559 eprintln!(" Time elapsed: {:.2} seconds", elapsed);
560 eprintln!(" Speed: {:.2} bits/s ({:.2} Mbit/s)", speed, speed / 1_000_000.0);
561 }
562
563 Ok(speed)
564 }
565
566 pub fn upload<F>(&self, _callback: F, threads: Option<usize>, _pre_allocate: bool) -> Result<f64>
567 where
568 F: Fn(usize, usize, bool, bool) + Send + Sync,
569 {
570 let config = self.config.as_ref()
571 .ok_or_else(|| SpeedtestError::ConfigRetrieval("Config not loaded".to_string()))?;
572 let server = self.best.as_ref()
573 .ok_or(SpeedtestError::MissingBestServer)?;
574
575 let mut sizes = Vec::new();
576 for size in &config.sizes.upload {
577 for _ in 0..config.counts.upload {
578 sizes.push(*size);
579 }
580 }
581
582 if self.debug {
583 eprintln!("Upload test configuration:");
584 eprintln!(" Server URL: {}", server.url);
585 eprintln!(" Total data chunks: {}", sizes.len());
586 eprintln!(" Threads: {}", threads.unwrap_or(config.threads.upload));
587 eprintln!(" Test duration: {} seconds", config.length.upload);
588 }
589
590 let max_threads = threads.unwrap_or(config.threads.upload);
591 let test_duration = Duration::from_secs(config.length.upload);
592
593 let total_bytes = Arc::new(AtomicU64::new(0));
594 let start_time = Instant::now();
595 let stop_flag = Arc::new(AtomicBool::new(false));
596
597 let chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
599 let upload_data: Vec<Vec<u8>> = sizes
600 .iter()
601 .map(|&size| {
602 let multiplier = (size as f64 / 36.0).round() as usize;
603 let content = chars.repeat(multiplier);
604 let data = format!("content1={}", &content[..size.min(content.len()) - 9]);
605 data.into_bytes()
606 })
607 .collect();
608
609 let handles: Vec<_> = (0..max_threads)
610 .map(|i| {
611 let data_chunk = upload_data.clone();
612 let url = server.url.clone();
613 let total_bytes = Arc::clone(&total_bytes);
614 let stop_flag = Arc::clone(&stop_flag);
615 let client = HttpClient::new(10, false, None).unwrap();
616
617 std::thread::spawn(move || {
618 let mut data_index = i;
619 while !stop_flag.load(Ordering::Relaxed) {
620 if data_index >= data_chunk.len() {
622 data_index = i; }
624
625 if let Ok(_) = client.post(&url, data_chunk[data_index].clone()) {
626 total_bytes.fetch_add(data_chunk[data_index].len() as u64, Ordering::Relaxed);
627 }
628
629 data_index += max_threads;
630 }
631 })
632 })
633 .collect();
634
635 std::thread::sleep(test_duration);
637 stop_flag.store(true, Ordering::Relaxed);
638
639 for handle in handles {
640 let _ = handle.join();
641 }
642
643 let elapsed = start_time.elapsed().as_secs_f64();
644 let bytes = total_bytes.load(Ordering::Relaxed);
645 let speed = (bytes as f64 / elapsed) * 8.0;
646
647 if self.debug {
648 eprintln!("Upload test results:");
649 eprintln!(" Bytes uploaded: {}", bytes);
650 eprintln!(" Time elapsed: {:.2} seconds", elapsed);
651 eprintln!(" Speed: {:.2} bits/s ({:.2} Mbit/s)", speed, speed / 1_000_000.0);
652 }
653
654 Ok(speed)
655 }
656
657 pub fn get_results(&self) -> Option<SpeedtestResults> {
658 let config = self.config.as_ref()?;
659 let server = self.best.as_ref()?;
660
661 Some(SpeedtestResults::new(
662 config.client.clone(),
663 server.clone(),
664 ))
665 }
666}
667
668use crate::utils::cache_buster;
669
670#[cfg(test)]
671mod tests {
672 use super::*;
673
674 #[test]
675 fn test_speedtest_creation() {
676 let st = Speedtest::new(10, false, None);
677 assert!(st.is_ok());
678 }
679}