bim_core/clients/
tcp_speedtest_net.rs

1use std::error::Error;
2use std::net::{SocketAddr, ToSocketAddrs};
3use std::sync::{Arc, Barrier, RwLock};
4use std::thread;
5use std::time::{Duration, Instant};
6
7#[cfg(debug_assertions)]
8use log::debug;
9
10use url::Url;
11
12use crate::clients::base::{make_connection, Client};
13use crate::utils::SpeedTestResult;
14
15use std::io::{Read, Write};
16use std::net::TcpStream;
17
18pub struct SpeedtestNetTcpClient {
19    multi_thread: bool,
20
21    address: SocketAddr,
22
23    upload: f64,
24    upload_status: String,
25    download: f64,
26    download_status: String,
27    latency: f64,
28    jitter: f64,
29}
30
31impl SpeedtestNetTcpClient {
32    pub fn build(url: String, ipv6: bool, multi_thread: bool) -> Option<Self> {
33        let url = match Url::parse(&url) {
34            Ok(u) => u,
35            Err(_) => return None,
36        };
37
38        let host = match url.host_str() {
39            Some(h) => h,
40            None => return None,
41        };
42        let port = match url.port_or_known_default() {
43            Some(p) => p,
44            None => return None,
45        };
46
47        let host_port = format!("{host}:{port}");
48        let addresses = match host_port.to_socket_addrs() {
49            Ok(addrs) => addrs,
50            Err(_) => return None,
51        };
52
53        let mut address = None;
54        for addr in addresses {
55            if (addr.is_ipv6() && ipv6) || (addr.is_ipv4() && !ipv6) {
56                address = Some(addr);
57            }
58        }
59
60        let address = match address {
61            Some(addr) => addr,
62            None => return None,
63        };
64
65        #[cfg(debug_assertions)]
66        debug!("IP address {address}");
67
68        let r = String::from("取消");
69        Some(Self {
70            multi_thread,
71            address,
72            upload: 0.0,
73            upload_status: r.clone(),
74            download: 0.0,
75            download_status: r.clone(),
76            latency: 0.0,
77            jitter: 0.0,
78        })
79    }
80
81    fn run_load(&mut self, load: u8) -> Result<bool, Box<dyn Error>> {
82        let threads = if self.multi_thread { 8 } else { 1 };
83        let mut counters = vec![];
84
85        let start = Arc::new(Barrier::new(threads + 1));
86        let stop = Arc::new(RwLock::new(false));
87        let end = Arc::new(Barrier::new(threads + 1));
88
89        for _ in 0..threads {
90            let a = self.address.clone();
91            let ct = Arc::new(RwLock::new(0u128));
92            counters.push(ct.clone());
93
94            let s = start.clone();
95            let f = stop.clone();
96            let e = end.clone();
97
98            thread::spawn(move || {
99                let _ = match load {
100                    0 => request_tcp_upload(a, ct, s, f, e),
101                    _ => request_tcp_download(a, ct, s, f, e),
102                };
103            });
104            thread::sleep(Duration::from_millis(250));
105        }
106
107        let mut last = 0;
108        let mut last_time = 0;
109        let mut time_passed = 0;
110        let mut results = [0.0; 28];
111        let mut index = 0;
112        let mut wait = 6;
113
114        start.wait();
115        let now = Instant::now();
116        while time_passed < 14_000_000 {
117            thread::sleep(Duration::from_millis(500));
118            time_passed = now.elapsed().as_micros();
119            let time_used = time_passed - last_time;
120
121            let current = {
122                let mut count = 0;
123                for ct in counters.iter() {
124                    let num = ct.read().unwrap();
125                    count += *num
126                }
127                count
128            };
129
130            if last == current {
131                wait -= 1;
132            }
133            let speed = ((current - last) * 8) as f64 / time_used as f64;
134
135            #[cfg(debug_assertions)]
136            debug!("Transfered {current} bytes in {time_passed} us, speed {speed}");
137
138            results[index] = speed;
139            index += 1;
140            last = current;
141            last_time = time_passed;
142        }
143
144        {
145            let mut f = stop.write().unwrap();
146            *f = true;
147        }
148        end.wait();
149
150        let mut all = 0.0;
151        for i in index - 20..index {
152            all += results[i];
153        }
154        let final_speed = all / 20.0;
155
156        let status = if wait <= 0 {
157            if last < 200 {
158                "失败"
159            } else {
160                "断流"
161            }
162        } else {
163            "正常"
164        }
165        .to_string();
166
167        match load {
168            0 => {
169                self.upload = final_speed;
170                self.upload_status = status;
171            }
172            _ => {
173                self.download = final_speed;
174                self.download_status = status;
175            }
176        }
177
178        Ok(true)
179    }
180}
181
182impl Client for SpeedtestNetTcpClient {
183    fn ping(&mut self) -> bool {
184        let mut count = 0;
185        let mut pings = [0u128; 6];
186        let mut ping_min = 10000000;
187
188        while count < 6 {
189            let ping = request_tcp_ping(&self.address);
190            if ping > 0 {
191                if ping < ping_min {
192                    ping_min = ping
193                }
194                pings[count] = ping;
195            }
196            thread::sleep(Duration::from_millis(1000));
197            count += 1;
198        }
199
200        if pings == [0, 0, 0, 0, 0, 0] {
201            self.latency = 0.0;
202            self.jitter = 0.0;
203            return false;
204        }
205
206        let mut jitter_all = 0;
207        for p in pings {
208            if p > 0 {
209                jitter_all += p - ping_min;
210            }
211        }
212
213        self.latency = ping_min as f64 / 1_000.0;
214        self.jitter = jitter_all as f64 / 5_000.0;
215
216        #[cfg(debug_assertions)]
217        debug!("Ping {} ms", self.latency);
218
219        #[cfg(debug_assertions)]
220        debug!("Jitter {} ms", self.jitter);
221
222        true
223    }
224
225    fn download(&mut self) -> bool {
226        match self.run_load(1) {
227            Ok(_) => true,
228            Err(_) => false,
229        }
230    }
231
232    fn upload(&mut self) -> bool {
233        match self.run_load(0) {
234            Ok(_) => true,
235            Err(_) => false,
236        }
237    }
238
239    fn result(&self) -> SpeedTestResult {
240        SpeedTestResult::build(
241            self.upload,
242            self.upload_status.clone(),
243            self.download,
244            self.download_status.clone(),
245            self.latency,
246            self.jitter,
247        )
248    }
249}
250
251fn request_tcp_ping(address: &SocketAddr) -> u128 {
252    let now = Instant::now();
253    let r = TcpStream::connect_timeout(&address, Duration::from_micros(1_000_000));
254    let used = now.elapsed().as_micros();
255    match r {
256        Ok(_) => used,
257        Err(_e) => {
258            #[cfg(debug_assertions)]
259            debug!("Ping {_e}");
260
261            0
262        }
263    }
264}
265
266fn request_tcp_download(
267    address: SocketAddr,
268    counter: Arc<RwLock<u128>>,
269    start: Arc<Barrier>,
270    stop: Arc<RwLock<bool>>,
271    end: Arc<Barrier>,
272) {
273    let data_size = 15 * 1024 * 1024 * 1024 as u128;
274    let mut buffer = [0; 65536];
275
276    let url = Url::parse("http://bench.im").unwrap();
277    let mut stream = match make_connection(&address, &url) {
278        Ok(s) => s,
279        Err(_) => {
280            start.wait();
281            end.wait();
282            return;
283        }
284    };
285
286    start.wait();
287
288    #[cfg(debug_assertions)]
289    debug!("Download Start");
290
291    let request = format!("DOWNLOAD {data_size}\n").into_bytes();
292    match stream.write_all(&request) {
293        Ok(_) => {
294            if let Ok(size) = stream.read(&mut buffer) {
295                #[cfg(debug_assertions)]
296                debug!("Download Status: {size}");
297
298                if size == 0 {
299                    #[cfg(debug_assertions)]
300                    debug!("Download Error: Start failed");
301
302                    end.wait();
303                    return;
304                }
305
306                let mut ct = counter.write().unwrap();
307                *ct += size as u128;
308            } else {
309                end.wait();
310                return;
311            }
312        }
313        Err(_e) => {
314            #[cfg(debug_assertions)]
315            debug!("Download Error: {}", _e);
316
317            end.wait();
318            return;
319        }
320    }
321
322    while !*(stop.read().unwrap()) {
323        match stream.read(&mut buffer) {
324            Ok(size) => {
325                let mut ct = counter.write().unwrap();
326                *ct += size as u128;
327            }
328            Err(_e) => {
329                #[cfg(debug_assertions)]
330                debug!("Download Error: {}", _e);
331
332                end.wait();
333                return;
334            }
335        }
336    }
337    end.wait();
338}
339
340fn request_tcp_upload(
341    address: SocketAddr,
342    counter: Arc<RwLock<u128>>,
343    start: Arc<Barrier>,
344    stop: Arc<RwLock<bool>>,
345    end: Arc<Barrier>,
346) {
347    let data_size = 15 * 1024 * 1024 * 1024 as u128;
348    let request_chunk = "0123456789AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz-="
349        .repeat(1024)
350        .into_bytes();
351
352    let url = Url::parse("http://bench.im").unwrap();
353
354    let mut stream = match make_connection(&address, &url) {
355        Ok(s) => s,
356        Err(_) => {
357            start.wait();
358            end.wait();
359            return;
360        }
361    };
362
363    start.wait();
364    #[cfg(debug_assertions)]
365    debug!("Upload Start");
366
367    let request = format!("UPLOAD {data_size} 0\n").into_bytes();
368    match stream.write_all(&request) {
369        Ok(_) => {
370            let mut ct = counter.write().unwrap();
371            *ct += request.len() as u128;
372        }
373        Err(_e) => {
374            #[cfg(debug_assertions)]
375            debug!("Upload Error: {}", _e);
376
377            end.wait();
378            return;
379        }
380    }
381
382    while !*(stop.read().unwrap()) {
383        match stream.write(&request_chunk) {
384            Ok(size) => {
385                let mut ct = counter.write().unwrap();
386                *ct += size as u128;
387            }
388            Err(_e) => {
389                #[cfg(debug_assertions)]
390                debug!("Upload Error: {}", _e);
391
392                end.wait();
393                return;
394            }
395        }
396    }
397    end.wait();
398}