bim_core/clients/
tcp_speedtest_net.rs1use std::error::Error;
2use std::net::{SocketAddr, ToSocketAddrs};
3use std::sync::{Arc, Barrier, RwLock};
4use std::thread;
5use std::time::{Duration, Instant};
6
7#[cfg(debug_assertions)]
8use log::debug;
9
10use url::Url;
11
12use crate::clients::base::{make_connection, Client};
13use crate::utils::SpeedTestResult;
14
15use std::io::{Read, Write};
16use std::net::TcpStream;
17
18pub struct SpeedtestNetTcpClient {
19 multi_thread: bool,
20
21 address: SocketAddr,
22
23 upload: f64,
24 upload_status: String,
25 download: f64,
26 download_status: String,
27 latency: f64,
28 jitter: f64,
29}
30
31impl SpeedtestNetTcpClient {
32 pub fn build(url: String, ipv6: bool, multi_thread: bool) -> Option<Self> {
33 let url = match Url::parse(&url) {
34 Ok(u) => u,
35 Err(_) => return None,
36 };
37
38 let host = match url.host_str() {
39 Some(h) => h,
40 None => return None,
41 };
42 let port = match url.port_or_known_default() {
43 Some(p) => p,
44 None => return None,
45 };
46
47 let host_port = format!("{host}:{port}");
48 let addresses = match host_port.to_socket_addrs() {
49 Ok(addrs) => addrs,
50 Err(_) => return None,
51 };
52
53 let mut address = None;
54 for addr in addresses {
55 if (addr.is_ipv6() && ipv6) || (addr.is_ipv4() && !ipv6) {
56 address = Some(addr);
57 }
58 }
59
60 let address = match address {
61 Some(addr) => addr,
62 None => return None,
63 };
64
65 #[cfg(debug_assertions)]
66 debug!("IP address {address}");
67
68 let r = String::from("取消");
69 Some(Self {
70 multi_thread,
71 address,
72 upload: 0.0,
73 upload_status: r.clone(),
74 download: 0.0,
75 download_status: r.clone(),
76 latency: 0.0,
77 jitter: 0.0,
78 })
79 }
80
81 fn run_load(&mut self, load: u8) -> Result<bool, Box<dyn Error>> {
82 let threads = if self.multi_thread { 8 } else { 1 };
83 let mut counters = vec![];
84
85 let start = Arc::new(Barrier::new(threads + 1));
86 let stop = Arc::new(RwLock::new(false));
87 let end = Arc::new(Barrier::new(threads + 1));
88
89 for _ in 0..threads {
90 let a = self.address.clone();
91 let ct = Arc::new(RwLock::new(0u128));
92 counters.push(ct.clone());
93
94 let s = start.clone();
95 let f = stop.clone();
96 let e = end.clone();
97
98 thread::spawn(move || {
99 let _ = match load {
100 0 => request_tcp_upload(a, ct, s, f, e),
101 _ => request_tcp_download(a, ct, s, f, e),
102 };
103 });
104 thread::sleep(Duration::from_millis(250));
105 }
106
107 let mut last = 0;
108 let mut last_time = 0;
109 let mut time_passed = 0;
110 let mut results = [0.0; 28];
111 let mut index = 0;
112 let mut wait = 6;
113
114 start.wait();
115 let now = Instant::now();
116 while time_passed < 14_000_000 {
117 thread::sleep(Duration::from_millis(500));
118 time_passed = now.elapsed().as_micros();
119 let time_used = time_passed - last_time;
120
121 let current = {
122 let mut count = 0;
123 for ct in counters.iter() {
124 let num = ct.read().unwrap();
125 count += *num
126 }
127 count
128 };
129
130 if last == current {
131 wait -= 1;
132 }
133 let speed = ((current - last) * 8) as f64 / time_used as f64;
134
135 #[cfg(debug_assertions)]
136 debug!("Transfered {current} bytes in {time_passed} us, speed {speed}");
137
138 results[index] = speed;
139 index += 1;
140 last = current;
141 last_time = time_passed;
142 }
143
144 {
145 let mut f = stop.write().unwrap();
146 *f = true;
147 }
148 end.wait();
149
150 let mut all = 0.0;
151 for i in index - 20..index {
152 all += results[i];
153 }
154 let final_speed = all / 20.0;
155
156 let status = if wait <= 0 {
157 if last < 200 {
158 "失败"
159 } else {
160 "断流"
161 }
162 } else {
163 "正常"
164 }
165 .to_string();
166
167 match load {
168 0 => {
169 self.upload = final_speed;
170 self.upload_status = status;
171 }
172 _ => {
173 self.download = final_speed;
174 self.download_status = status;
175 }
176 }
177
178 Ok(true)
179 }
180}
181
182impl Client for SpeedtestNetTcpClient {
183 fn ping(&mut self) -> bool {
184 let mut count = 0;
185 let mut pings = [0u128; 6];
186 let mut ping_min = 10000000;
187
188 while count < 6 {
189 let ping = request_tcp_ping(&self.address);
190 if ping > 0 {
191 if ping < ping_min {
192 ping_min = ping
193 }
194 pings[count] = ping;
195 }
196 thread::sleep(Duration::from_millis(1000));
197 count += 1;
198 }
199
200 if pings == [0, 0, 0, 0, 0, 0] {
201 self.latency = 0.0;
202 self.jitter = 0.0;
203 return false;
204 }
205
206 let mut jitter_all = 0;
207 for p in pings {
208 if p > 0 {
209 jitter_all += p - ping_min;
210 }
211 }
212
213 self.latency = ping_min as f64 / 1_000.0;
214 self.jitter = jitter_all as f64 / 5_000.0;
215
216 #[cfg(debug_assertions)]
217 debug!("Ping {} ms", self.latency);
218
219 #[cfg(debug_assertions)]
220 debug!("Jitter {} ms", self.jitter);
221
222 true
223 }
224
225 fn download(&mut self) -> bool {
226 match self.run_load(1) {
227 Ok(_) => true,
228 Err(_) => false,
229 }
230 }
231
232 fn upload(&mut self) -> bool {
233 match self.run_load(0) {
234 Ok(_) => true,
235 Err(_) => false,
236 }
237 }
238
239 fn result(&self) -> SpeedTestResult {
240 SpeedTestResult::build(
241 self.upload,
242 self.upload_status.clone(),
243 self.download,
244 self.download_status.clone(),
245 self.latency,
246 self.jitter,
247 )
248 }
249}
250
251fn request_tcp_ping(address: &SocketAddr) -> u128 {
252 let now = Instant::now();
253 let r = TcpStream::connect_timeout(&address, Duration::from_micros(1_000_000));
254 let used = now.elapsed().as_micros();
255 match r {
256 Ok(_) => used,
257 Err(_e) => {
258 #[cfg(debug_assertions)]
259 debug!("Ping {_e}");
260
261 0
262 }
263 }
264}
265
266fn request_tcp_download(
267 address: SocketAddr,
268 counter: Arc<RwLock<u128>>,
269 start: Arc<Barrier>,
270 stop: Arc<RwLock<bool>>,
271 end: Arc<Barrier>,
272) {
273 let data_size = 15 * 1024 * 1024 * 1024 as u128;
274 let mut buffer = [0; 65536];
275
276 let url = Url::parse("http://bench.im").unwrap();
277 let mut stream = match make_connection(&address, &url) {
278 Ok(s) => s,
279 Err(_) => {
280 start.wait();
281 end.wait();
282 return;
283 }
284 };
285
286 start.wait();
287
288 #[cfg(debug_assertions)]
289 debug!("Download Start");
290
291 let request = format!("DOWNLOAD {data_size}\n").into_bytes();
292 match stream.write_all(&request) {
293 Ok(_) => {
294 if let Ok(size) = stream.read(&mut buffer) {
295 #[cfg(debug_assertions)]
296 debug!("Download Status: {size}");
297
298 if size == 0 {
299 #[cfg(debug_assertions)]
300 debug!("Download Error: Start failed");
301
302 end.wait();
303 return;
304 }
305
306 let mut ct = counter.write().unwrap();
307 *ct += size as u128;
308 } else {
309 end.wait();
310 return;
311 }
312 }
313 Err(_e) => {
314 #[cfg(debug_assertions)]
315 debug!("Download Error: {}", _e);
316
317 end.wait();
318 return;
319 }
320 }
321
322 while !*(stop.read().unwrap()) {
323 match stream.read(&mut buffer) {
324 Ok(size) => {
325 let mut ct = counter.write().unwrap();
326 *ct += size as u128;
327 }
328 Err(_e) => {
329 #[cfg(debug_assertions)]
330 debug!("Download Error: {}", _e);
331
332 end.wait();
333 return;
334 }
335 }
336 }
337 end.wait();
338}
339
340fn request_tcp_upload(
341 address: SocketAddr,
342 counter: Arc<RwLock<u128>>,
343 start: Arc<Barrier>,
344 stop: Arc<RwLock<bool>>,
345 end: Arc<Barrier>,
346) {
347 let data_size = 15 * 1024 * 1024 * 1024 as u128;
348 let request_chunk = "0123456789AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz-="
349 .repeat(1024)
350 .into_bytes();
351
352 let url = Url::parse("http://bench.im").unwrap();
353
354 let mut stream = match make_connection(&address, &url) {
355 Ok(s) => s,
356 Err(_) => {
357 start.wait();
358 end.wait();
359 return;
360 }
361 };
362
363 start.wait();
364 #[cfg(debug_assertions)]
365 debug!("Upload Start");
366
367 let request = format!("UPLOAD {data_size} 0\n").into_bytes();
368 match stream.write_all(&request) {
369 Ok(_) => {
370 let mut ct = counter.write().unwrap();
371 *ct += request.len() as u128;
372 }
373 Err(_e) => {
374 #[cfg(debug_assertions)]
375 debug!("Upload Error: {}", _e);
376
377 end.wait();
378 return;
379 }
380 }
381
382 while !*(stop.read().unwrap()) {
383 match stream.write(&request_chunk) {
384 Ok(size) => {
385 let mut ct = counter.write().unwrap();
386 *ct += size as u128;
387 }
388 Err(_e) => {
389 #[cfg(debug_assertions)]
390 debug!("Upload Error: {}", _e);
391
392 end.wait();
393 return;
394 }
395 }
396 }
397 end.wait();
398}