1use crate::{
2 command::{Command, DeclineReason, Operator},
3 Result,
4};
5use anyhow::{anyhow, Context};
6use log::{debug, info};
7use std::{
8 fmt,
9 io::{self, Write},
10 net::{TcpStream, ToSocketAddrs},
11 str::FromStr,
12 time::Duration,
13};
14
15#[derive(Default, Debug)]
16struct Throughput {
17 bytes: u64,
18 duration: Duration,
19}
20
21#[derive(Default, Debug)]
22struct NetworkSpec {
23 downstream: Throughput,
24 upstream: Throughput,
25}
26
27pub struct Client {
28 operator: Operator,
29 spec: NetworkSpec,
30}
31
32impl Client {
33 pub fn new(addr: impl ToSocketAddrs + fmt::Debug) -> Result<Self> {
34 info!("Connecting to {:?}", addr);
35
36 Ok(Self {
37 operator: Operator::new(
38 TcpStream::connect_timeout(
39 &addr.to_socket_addrs().unwrap().next().unwrap(),
40 Duration::from_secs(3),
41 )
42 .context(format!("Addr:{:?}", addr))?,
43 ),
44 spec: NetworkSpec::default(),
45 })
46 }
47
48 pub fn duration(mut self, duration: Option<&str>) -> Self {
49 let duration =
50 Duration::from_secs(u64::from_str(duration.unwrap_or("3").as_ref()).unwrap());
51 self.spec.downstream.duration = duration;
52 self.spec.upstream.duration = duration;
53 self
54 }
55
56 pub fn run(mut self) -> Result<()> {
57 self.check_server_status()
58 .and_then(|_| self.ping_pon())
59 .and_then(|_| self.downstream())
60 .and_then(|_| self.upstream())
61 .and_then(|_| self.print_result(io::stdout()))
62 }
63
64 fn check_server_status(&mut self) -> Result<()> {
65 let cmd = self.operator.read()?;
66 match cmd {
67 Command::Ready => {
68 debug!("Receive server ready");
69 Ok(())
70 }
71 Command::Decline => match self.operator.read_decline_reason()? {
72 DeclineReason::Unknown => Err(anyhow!("Server decline speed test :(")),
73 DeclineReason::MaxThreadsExceed(max_threads) => Err(anyhow!(
74 "Server decline speed test. Cause: max threads exceeded({})",
75 max_threads
76 )),
77 },
78 _ => Err(anyhow!("Unexpected command {:?}", cmd)),
79 }
80 }
81
82 fn ping_pon(&mut self) -> Result<()> {
83 self.operator.ping_write_then_read().map(|r| {
84 debug!("Successfully ping to remote server");
85 r
86 })
87 }
88
89 fn downstream(&mut self) -> Result<()> {
90 info!(
91 "Start downstream duration: {} seconds",
92 self.spec.downstream.duration.as_secs()
93 );
94 self.operator
95 .request_downstream(self.spec.downstream.duration)?;
96 self.spec.downstream.bytes = self.operator.read_loop()?;
97 Ok(())
98 }
99
100 fn upstream(&mut self) -> Result<()> {
101 info!(
102 "Start upstream duration: {} seconds",
103 self.spec.upstream.duration.as_secs()
104 );
105 self.operator
106 .request_upstream(self.spec.upstream.duration)?;
107 self.spec.upstream.bytes = self.operator.write_loop(self.spec.upstream.duration)?;
108 Ok(())
109 }
110
111 fn print_result<W: Write>(&mut self, mut writer: W) -> Result<()> {
112 writeln!(
113 writer,
114 "Downstream: {}",
115 self.format_throughput(&self.spec.downstream)
116 )
117 .and(writeln!(
118 writer,
119 " Upstream: {}",
120 self.format_throughput(&self.spec.upstream)
121 ))
122 .map_err(anyhow::Error::from)
123 }
124
125 fn format_throughput(&self, throughput: &Throughput) -> String {
126 use crate::util::*;
127 format_bps(to_bps(throughput.bytes, throughput.duration))
128 }
129}