bim_core/clients/
http.rs

1use std::error::Error;
2use std::net::{SocketAddr, ToSocketAddrs};
3use std::sync::{Arc, Barrier, RwLock};
4use std::thread;
5use std::time::{Duration, Instant};
6
7#[cfg(debug_assertions)]
8use log::debug;
9
10use url::Url;
11
12use crate::clients::base::{make_connection, Client};
13use crate::utils::SpeedTestResult;
14
15use std::io::{Read, Write};
16use std::net::TcpStream;
17use std::time::SystemTime;
18
19pub struct HTTPClient {
20    download_url: Url,
21    upload_url: Url,
22    multi_thread: bool,
23
24    address: SocketAddr,
25
26    upload: f64,
27    upload_status: String,
28    download: f64,
29    download_status: String,
30    latency: f64,
31    jitter: f64,
32}
33
34impl HTTPClient {
35    pub fn build(
36        download_url: String,
37        upload_url: String,
38        ipv6: bool,
39        multi_thread: bool,
40    ) -> Option<Self> {
41        let download_url = match Url::parse(&download_url) {
42            Ok(u) => u,
43            Err(_) => return None,
44        };
45        let upload_url = match Url::parse(&upload_url) {
46            Ok(u) => u,
47            Err(_) => return None,
48        };
49
50        let host = match download_url.host_str() {
51            Some(h) => h,
52            None => return None,
53        };
54        let port = match download_url.port_or_known_default() {
55            Some(p) => p,
56            None => return None,
57        };
58
59        let host_port = format!("{host}:{port}");
60        let addresses = match host_port.to_socket_addrs() {
61            Ok(addrs) => addrs,
62            Err(_) => return None,
63        };
64
65        let mut address = None;
66        for addr in addresses {
67            if (addr.is_ipv6() && ipv6) || (addr.is_ipv4() && !ipv6) {
68                address = Some(addr);
69            }
70        }
71
72        let address = match address {
73            Some(addr) => addr,
74            None => return None,
75        };
76
77        #[cfg(debug_assertions)]
78        debug!("IP address {address}");
79
80        let r = String::from("取消");
81        Some(Self {
82            download_url,
83            upload_url,
84            multi_thread,
85            address,
86            upload: 0.0,
87            upload_status: r.clone(),
88            download: 0.0,
89            download_status: r.clone(),
90            latency: 0.0,
91            jitter: 0.0,
92        })
93    }
94
95    fn run_load(&mut self, load: u8) -> Result<bool, Box<dyn Error>> {
96        let url = match load {
97            0 => self.upload_url.clone(),
98            _ => self.download_url.clone(),
99        };
100        let threads = if self.multi_thread { 8 } else { 1 };
101        let mut counters = vec![];
102
103        let start = Arc::new(Barrier::new(threads + 1));
104        let stop = Arc::new(RwLock::new(false));
105        let end = Arc::new(Barrier::new(threads + 1));
106
107        for _ in 0..threads {
108            let a = self.address.clone();
109            let u = url.clone();
110            let ct = Arc::new(RwLock::new(0u128));
111            counters.push(ct.clone());
112
113            let s = start.clone();
114            let f = stop.clone();
115            let e = end.clone();
116
117            thread::spawn(move || {
118                let _ = match load {
119                    0 => request_http_upload(a, u, ct, s, f, e),
120                    _ => request_http_download(a, u, ct, s, f, e),
121                };
122            });
123            thread::sleep(Duration::from_millis(250));
124        }
125
126        let mut last = 0;
127        let mut last_time = 0;
128        let mut time_passed = 0;
129        let mut results = [0.0; 28];
130        let mut index = 0;
131        let mut wait = 6;
132
133        start.wait();
134        let now = Instant::now();
135        while time_passed < 14_000_000 {
136            thread::sleep(Duration::from_millis(500));
137            time_passed = now.elapsed().as_micros();
138            let time_used = time_passed - last_time;
139
140            let current = {
141                let mut count = 0;
142                for ct in counters.iter() {
143                    let num = ct.read().unwrap();
144                    count += *num
145                }
146                count
147            };
148
149            if last == current {
150                wait -= 1;
151            }
152            let speed = ((current - last) * 8) as f64 / time_used as f64;
153
154            #[cfg(debug_assertions)]
155            debug!("Transfered {current} bytes in {time_passed} us, speed {speed}");
156
157            results[index] = speed;
158            index += 1;
159            last = current;
160            last_time = time_passed;
161        }
162
163        {
164            let mut f = stop.write().unwrap();
165            *f = true;
166        }
167        end.wait();
168
169        let mut all = 0.0;
170        for i in index - 20..index {
171            all += results[i];
172        }
173        let final_speed = all / 20.0;
174
175        let status = if wait <= 0 {
176            if last < 200 {
177                "失败"
178            } else {
179                "断流"
180            }
181        } else {
182            "正常"
183        }
184        .to_string();
185
186        match load {
187            0 => {
188                self.upload = final_speed;
189                self.upload_status = status;
190            }
191            _ => {
192                self.download = final_speed;
193                self.download_status = status;
194            }
195        }
196
197        Ok(true)
198    }
199}
200
201impl Client for HTTPClient {
202    fn ping(&mut self) -> bool {
203        let mut count = 0;
204        let mut pings = [0u128; 6];
205        let mut ping_min = 10000000;
206
207        while count < 6 {
208            let ping = request_tcp_ping(&self.address);
209            if ping > 0 {
210                if ping < ping_min {
211                    ping_min = ping
212                }
213                pings[count] = ping;
214            }
215            thread::sleep(Duration::from_millis(1000));
216            count += 1;
217        }
218
219        if pings == [0, 0, 0, 0, 0, 0] {
220            self.latency = 0.0;
221            self.jitter = 0.0;
222            return false;
223        }
224
225        let mut jitter_all = 0;
226        for p in pings {
227            if p > 0 {
228                jitter_all += p - ping_min;
229            }
230        }
231
232        self.latency = ping_min as f64 / 1_000.0;
233        self.jitter = jitter_all as f64 / 5_000.0;
234
235        #[cfg(debug_assertions)]
236        debug!("Ping {} ms", self.latency);
237
238        #[cfg(debug_assertions)]
239        debug!("Jitter {} ms", self.jitter);
240
241        true
242    }
243
244    fn download(&mut self) -> bool {
245        match self.run_load(1) {
246            Ok(_) => true,
247            Err(_) => false,
248        }
249    }
250
251    fn upload(&mut self) -> bool {
252        match self.run_load(0) {
253            Ok(_) => true,
254            Err(_) => false,
255        }
256    }
257
258    fn result(&self) -> SpeedTestResult {
259        SpeedTestResult::build(
260            self.upload,
261            self.upload_status.clone(),
262            self.download,
263            self.download_status.clone(),
264            self.latency,
265            self.jitter,
266        )
267    }
268}
269
270fn request_tcp_ping(address: &SocketAddr) -> u128 {
271    let now = Instant::now();
272    let r = TcpStream::connect_timeout(&address, Duration::from_micros(1_000_000));
273    let used = now.elapsed().as_micros();
274    match r {
275        Ok(_) => used,
276        Err(_e) => {
277            #[cfg(debug_assertions)]
278            debug!("Ping {_e}");
279
280            0
281        }
282    }
283}
284
285fn request_http_download(
286    address: SocketAddr,
287    url: Url,
288    counter: Arc<RwLock<u128>>,
289    start: Arc<Barrier>,
290    stop: Arc<RwLock<bool>>,
291    end: Arc<Barrier>,
292) {
293    let chunk_count = 50;
294    let data_size = chunk_count * 1024 * 1024 as u128;
295    let mut data_counter;
296    let mut buffer = [0; 65536];
297
298    let host_port = format!(
299        "{}:{}",
300        url.host_str().unwrap(),
301        url.port_or_known_default().unwrap()
302    );
303    let path_str = url.path();
304
305    let mut stream = match make_connection(&address, &url) {
306        Ok(s) => s,
307        Err(_) => {
308            start.wait();
309            end.wait();
310            return;
311        }
312    };
313
314    start.wait();
315    'request: while !*(stop.read().unwrap()) {
316        let now = SystemTime::now()
317            .duration_since(SystemTime::UNIX_EPOCH)
318            .unwrap()
319            .as_millis();
320        let path_query = format!(
321            "{}?cors=true&r={}&ckSize={}&size={}",
322            path_str, now, chunk_count, data_size
323        );
324
325        #[cfg(debug_assertions)]
326        debug!("Download {path_query}");
327
328        let request_head = format!(
329            "GET {} HTTP/1.1\r\nHost: {}\r\nUser-Agent: bim/1.0\r\n\r\n",
330            path_query, host_port,
331        )
332        .into_bytes();
333
334        match stream.write_all(&request_head) {
335            Ok(_) => {
336                if let Ok(size) = stream.read(&mut buffer) {
337                    #[cfg(debug_assertions)]
338                    debug!("Download Status: {size}");
339
340                    if size > 0 {
341                        data_counter = size as u128;
342
343                        let mut ct = counter.write().unwrap();
344                        *ct += size as u128;
345                    } else {
346                        break 'request;
347                    }
348                } else {
349                    break 'request;
350                }
351            }
352            Err(_e) => {
353                #[cfg(debug_assertions)]
354                debug!("Download Error: {}", _e);
355
356                break 'request;
357            }
358        }
359
360        while data_counter < data_size && !*(stop.read().unwrap()) {
361            match stream.read(&mut buffer) {
362                Ok(size) => {
363                    data_counter += size as u128;
364
365                    let mut ct = counter.write().unwrap();
366                    *ct += size as u128;
367                }
368                Err(_e) => {
369                    #[cfg(debug_assertions)]
370                    debug!("Download Error: {}", _e);
371
372                    break 'request;
373                }
374            }
375        }
376    }
377    end.wait();
378}
379
380fn request_http_upload(
381    address: SocketAddr,
382    url: Url,
383    counter: Arc<RwLock<u128>>,
384    start: Arc<Barrier>,
385    stop: Arc<RwLock<bool>>,
386    end: Arc<Barrier>,
387) {
388    let chunk_count = 50;
389    let data_size = chunk_count * 1024 * 1024 as u128;
390    let mut data_counter;
391
392    let host_port = format!(
393        "{}:{}",
394        url.host_str().unwrap(),
395        url.port_or_known_default().unwrap()
396    );
397    let url_path = url.path();
398    let request_chunk = "0123456789AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz-="
399        .repeat(1024)
400        .into_bytes();
401
402    let mut stream = match make_connection(&address, &url) {
403        Ok(s) => s,
404        Err(_) => {
405            start.wait();
406            end.wait();
407            return;
408        }
409    };
410
411    start.wait();
412    'request: while !*(stop.read().unwrap()) {
413        let now = SystemTime::now()
414            .duration_since(SystemTime::UNIX_EPOCH)
415            .unwrap()
416            .as_millis();
417        let path_query = format!("{}?r={}", url_path, now);
418
419        #[cfg(debug_assertions)]
420        debug!("Upload {path_query} size {data_size}");
421
422        let request_head = format!(
423            "POST {} HTTP/1.1\r\nHost: {}\r\nUser-Agent: bim/1.0\r\nContent-Length: {}\r\n\r\n",
424            path_query, host_port, data_size
425        )
426        .into_bytes();
427
428        match stream.write_all(&request_head) {
429            Ok(_) => {
430                let length = request_head.len() as u128;
431                data_counter = length;
432
433                let mut ct = counter.write().unwrap();
434                *ct += length;
435            }
436            Err(_e) => {
437                #[cfg(debug_assertions)]
438                debug!("Upload Error: {}", _e);
439
440                break 'request;
441            }
442        }
443
444        while data_counter < data_size && !*(stop.read().unwrap()) {
445            match stream.write(&request_chunk) {
446                Ok(size) => {
447                    data_counter += size as u128;
448
449                    let mut ct = counter.write().unwrap();
450                    *ct += size as u128;
451                }
452                Err(_e) => {
453                    #[cfg(debug_assertions)]
454                    debug!("Upload Error: {}", _e);
455
456                    break 'request;
457                }
458            }
459        }
460    }
461    end.wait();
462}