1use std::error::Error;
2use std::net::{SocketAddr, ToSocketAddrs};
3use std::sync::{Arc, Barrier, RwLock};
4use std::thread;
5use std::time::{Duration, Instant};
6
7#[cfg(debug_assertions)]
8use log::debug;
9
10use url::Url;
11
12use crate::clients::base::{make_connection, Client};
13use crate::utils::SpeedTestResult;
14
15use std::io::{Read, Write};
16use std::net::TcpStream;
17use std::time::SystemTime;
18
19pub struct HTTPClient {
20 download_url: Url,
21 upload_url: Url,
22 multi_thread: bool,
23
24 address: SocketAddr,
25
26 upload: f64,
27 upload_status: String,
28 download: f64,
29 download_status: String,
30 latency: f64,
31 jitter: f64,
32}
33
34impl HTTPClient {
35 pub fn build(
36 download_url: String,
37 upload_url: String,
38 ipv6: bool,
39 multi_thread: bool,
40 ) -> Option<Self> {
41 let download_url = match Url::parse(&download_url) {
42 Ok(u) => u,
43 Err(_) => return None,
44 };
45 let upload_url = match Url::parse(&upload_url) {
46 Ok(u) => u,
47 Err(_) => return None,
48 };
49
50 let host = match download_url.host_str() {
51 Some(h) => h,
52 None => return None,
53 };
54 let port = match download_url.port_or_known_default() {
55 Some(p) => p,
56 None => return None,
57 };
58
59 let host_port = format!("{host}:{port}");
60 let addresses = match host_port.to_socket_addrs() {
61 Ok(addrs) => addrs,
62 Err(_) => return None,
63 };
64
65 let mut address = None;
66 for addr in addresses {
67 if (addr.is_ipv6() && ipv6) || (addr.is_ipv4() && !ipv6) {
68 address = Some(addr);
69 }
70 }
71
72 let address = match address {
73 Some(addr) => addr,
74 None => return None,
75 };
76
77 #[cfg(debug_assertions)]
78 debug!("IP address {address}");
79
80 let r = String::from("取消");
81 Some(Self {
82 download_url,
83 upload_url,
84 multi_thread,
85 address,
86 upload: 0.0,
87 upload_status: r.clone(),
88 download: 0.0,
89 download_status: r.clone(),
90 latency: 0.0,
91 jitter: 0.0,
92 })
93 }
94
95 fn run_load(&mut self, load: u8) -> Result<bool, Box<dyn Error>> {
96 let url = match load {
97 0 => self.upload_url.clone(),
98 _ => self.download_url.clone(),
99 };
100 let threads = if self.multi_thread { 8 } else { 1 };
101 let mut counters = vec![];
102
103 let start = Arc::new(Barrier::new(threads + 1));
104 let stop = Arc::new(RwLock::new(false));
105 let end = Arc::new(Barrier::new(threads + 1));
106
107 for _ in 0..threads {
108 let a = self.address.clone();
109 let u = url.clone();
110 let ct = Arc::new(RwLock::new(0u128));
111 counters.push(ct.clone());
112
113 let s = start.clone();
114 let f = stop.clone();
115 let e = end.clone();
116
117 thread::spawn(move || {
118 let _ = match load {
119 0 => request_http_upload(a, u, ct, s, f, e),
120 _ => request_http_download(a, u, ct, s, f, e),
121 };
122 });
123 thread::sleep(Duration::from_millis(250));
124 }
125
126 let mut last = 0;
127 let mut last_time = 0;
128 let mut time_passed = 0;
129 let mut results = [0.0; 28];
130 let mut index = 0;
131 let mut wait = 6;
132
133 start.wait();
134 let now = Instant::now();
135 while time_passed < 14_000_000 {
136 thread::sleep(Duration::from_millis(500));
137 time_passed = now.elapsed().as_micros();
138 let time_used = time_passed - last_time;
139
140 let current = {
141 let mut count = 0;
142 for ct in counters.iter() {
143 let num = ct.read().unwrap();
144 count += *num
145 }
146 count
147 };
148
149 if last == current {
150 wait -= 1;
151 }
152 let speed = ((current - last) * 8) as f64 / time_used as f64;
153
154 #[cfg(debug_assertions)]
155 debug!("Transfered {current} bytes in {time_passed} us, speed {speed}");
156
157 results[index] = speed;
158 index += 1;
159 last = current;
160 last_time = time_passed;
161 }
162
163 {
164 let mut f = stop.write().unwrap();
165 *f = true;
166 }
167 end.wait();
168
169 let mut all = 0.0;
170 for i in index - 20..index {
171 all += results[i];
172 }
173 let final_speed = all / 20.0;
174
175 let status = if wait <= 0 {
176 if last < 200 {
177 "失败"
178 } else {
179 "断流"
180 }
181 } else {
182 "正常"
183 }
184 .to_string();
185
186 match load {
187 0 => {
188 self.upload = final_speed;
189 self.upload_status = status;
190 }
191 _ => {
192 self.download = final_speed;
193 self.download_status = status;
194 }
195 }
196
197 Ok(true)
198 }
199}
200
201impl Client for HTTPClient {
202 fn ping(&mut self) -> bool {
203 let mut count = 0;
204 let mut pings = [0u128; 6];
205 let mut ping_min = 10000000;
206
207 while count < 6 {
208 let ping = request_tcp_ping(&self.address);
209 if ping > 0 {
210 if ping < ping_min {
211 ping_min = ping
212 }
213 pings[count] = ping;
214 }
215 thread::sleep(Duration::from_millis(1000));
216 count += 1;
217 }
218
219 if pings == [0, 0, 0, 0, 0, 0] {
220 self.latency = 0.0;
221 self.jitter = 0.0;
222 return false;
223 }
224
225 let mut jitter_all = 0;
226 for p in pings {
227 if p > 0 {
228 jitter_all += p - ping_min;
229 }
230 }
231
232 self.latency = ping_min as f64 / 1_000.0;
233 self.jitter = jitter_all as f64 / 5_000.0;
234
235 #[cfg(debug_assertions)]
236 debug!("Ping {} ms", self.latency);
237
238 #[cfg(debug_assertions)]
239 debug!("Jitter {} ms", self.jitter);
240
241 true
242 }
243
244 fn download(&mut self) -> bool {
245 match self.run_load(1) {
246 Ok(_) => true,
247 Err(_) => false,
248 }
249 }
250
251 fn upload(&mut self) -> bool {
252 match self.run_load(0) {
253 Ok(_) => true,
254 Err(_) => false,
255 }
256 }
257
258 fn result(&self) -> SpeedTestResult {
259 SpeedTestResult::build(
260 self.upload,
261 self.upload_status.clone(),
262 self.download,
263 self.download_status.clone(),
264 self.latency,
265 self.jitter,
266 )
267 }
268}
269
270fn request_tcp_ping(address: &SocketAddr) -> u128 {
271 let now = Instant::now();
272 let r = TcpStream::connect_timeout(&address, Duration::from_micros(1_000_000));
273 let used = now.elapsed().as_micros();
274 match r {
275 Ok(_) => used,
276 Err(_e) => {
277 #[cfg(debug_assertions)]
278 debug!("Ping {_e}");
279
280 0
281 }
282 }
283}
284
285fn request_http_download(
286 address: SocketAddr,
287 url: Url,
288 counter: Arc<RwLock<u128>>,
289 start: Arc<Barrier>,
290 stop: Arc<RwLock<bool>>,
291 end: Arc<Barrier>,
292) {
293 let chunk_count = 50;
294 let data_size = chunk_count * 1024 * 1024 as u128;
295 let mut data_counter;
296 let mut buffer = [0; 65536];
297
298 let host_port = format!(
299 "{}:{}",
300 url.host_str().unwrap(),
301 url.port_or_known_default().unwrap()
302 );
303 let path_str = url.path();
304
305 let mut stream = match make_connection(&address, &url) {
306 Ok(s) => s,
307 Err(_) => {
308 start.wait();
309 end.wait();
310 return;
311 }
312 };
313
314 start.wait();
315 'request: while !*(stop.read().unwrap()) {
316 let now = SystemTime::now()
317 .duration_since(SystemTime::UNIX_EPOCH)
318 .unwrap()
319 .as_millis();
320 let path_query = format!(
321 "{}?cors=true&r={}&ckSize={}&size={}",
322 path_str, now, chunk_count, data_size
323 );
324
325 #[cfg(debug_assertions)]
326 debug!("Download {path_query}");
327
328 let request_head = format!(
329 "GET {} HTTP/1.1\r\nHost: {}\r\nUser-Agent: bim/1.0\r\n\r\n",
330 path_query, host_port,
331 )
332 .into_bytes();
333
334 match stream.write_all(&request_head) {
335 Ok(_) => {
336 if let Ok(size) = stream.read(&mut buffer) {
337 #[cfg(debug_assertions)]
338 debug!("Download Status: {size}");
339
340 if size > 0 {
341 data_counter = size as u128;
342
343 let mut ct = counter.write().unwrap();
344 *ct += size as u128;
345 } else {
346 break 'request;
347 }
348 } else {
349 break 'request;
350 }
351 }
352 Err(_e) => {
353 #[cfg(debug_assertions)]
354 debug!("Download Error: {}", _e);
355
356 break 'request;
357 }
358 }
359
360 while data_counter < data_size && !*(stop.read().unwrap()) {
361 match stream.read(&mut buffer) {
362 Ok(size) => {
363 data_counter += size as u128;
364
365 let mut ct = counter.write().unwrap();
366 *ct += size as u128;
367 }
368 Err(_e) => {
369 #[cfg(debug_assertions)]
370 debug!("Download Error: {}", _e);
371
372 break 'request;
373 }
374 }
375 }
376 }
377 end.wait();
378}
379
380fn request_http_upload(
381 address: SocketAddr,
382 url: Url,
383 counter: Arc<RwLock<u128>>,
384 start: Arc<Barrier>,
385 stop: Arc<RwLock<bool>>,
386 end: Arc<Barrier>,
387) {
388 let chunk_count = 50;
389 let data_size = chunk_count * 1024 * 1024 as u128;
390 let mut data_counter;
391
392 let host_port = format!(
393 "{}:{}",
394 url.host_str().unwrap(),
395 url.port_or_known_default().unwrap()
396 );
397 let url_path = url.path();
398 let request_chunk = "0123456789AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz-="
399 .repeat(1024)
400 .into_bytes();
401
402 let mut stream = match make_connection(&address, &url) {
403 Ok(s) => s,
404 Err(_) => {
405 start.wait();
406 end.wait();
407 return;
408 }
409 };
410
411 start.wait();
412 'request: while !*(stop.read().unwrap()) {
413 let now = SystemTime::now()
414 .duration_since(SystemTime::UNIX_EPOCH)
415 .unwrap()
416 .as_millis();
417 let path_query = format!("{}?r={}", url_path, now);
418
419 #[cfg(debug_assertions)]
420 debug!("Upload {path_query} size {data_size}");
421
422 let request_head = format!(
423 "POST {} HTTP/1.1\r\nHost: {}\r\nUser-Agent: bim/1.0\r\nContent-Length: {}\r\n\r\n",
424 path_query, host_port, data_size
425 )
426 .into_bytes();
427
428 match stream.write_all(&request_head) {
429 Ok(_) => {
430 let length = request_head.len() as u128;
431 data_counter = length;
432
433 let mut ct = counter.write().unwrap();
434 *ct += length;
435 }
436 Err(_e) => {
437 #[cfg(debug_assertions)]
438 debug!("Upload Error: {}", _e);
439
440 break 'request;
441 }
442 }
443
444 while data_counter < data_size && !*(stop.read().unwrap()) {
445 match stream.write(&request_chunk) {
446 Ok(size) => {
447 data_counter += size as u128;
448
449 let mut ct = counter.write().unwrap();
450 *ct += size as u128;
451 }
452 Err(_e) => {
453 #[cfg(debug_assertions)]
454 debug!("Upload Error: {}", _e);
455
456 break 'request;
457 }
458 }
459 }
460 }
461 end.wait();
462}