netspeed/
client.rs

1use crate::{
2    command::{Command, DeclineReason, Operator},
3    Result,
4};
5use anyhow::{anyhow, Context};
6use log::{debug, info};
7use std::{
8    fmt,
9    io::{self, Write},
10    net::{TcpStream, ToSocketAddrs},
11    str::FromStr,
12    time::Duration,
13};
14
15#[derive(Default, Debug)]
16struct Throughput {
17    bytes: u64,
18    duration: Duration,
19}
20
21#[derive(Default, Debug)]
22struct NetworkSpec {
23    downstream: Throughput,
24    upstream: Throughput,
25}
26
27pub struct Client {
28    operator: Operator,
29    spec: NetworkSpec,
30}
31
32impl Client {
33    pub fn new(addr: impl ToSocketAddrs + fmt::Debug) -> Result<Self> {
34        info!("Connecting to {:?}", addr);
35
36        Ok(Self {
37            operator: Operator::new(
38                TcpStream::connect_timeout(
39                    &addr.to_socket_addrs().unwrap().next().unwrap(),
40                    Duration::from_secs(3),
41                )
42                .context(format!("Addr:{:?}", addr))?,
43            ),
44            spec: NetworkSpec::default(),
45        })
46    }
47
48    pub fn duration(mut self, duration: Option<&str>) -> Self {
49        let duration =
50            Duration::from_secs(u64::from_str(duration.unwrap_or("3").as_ref()).unwrap());
51        self.spec.downstream.duration = duration;
52        self.spec.upstream.duration = duration;
53        self
54    }
55
56    pub fn run(mut self) -> Result<()> {
57        self.check_server_status()
58            .and_then(|_| self.ping_pon())
59            .and_then(|_| self.downstream())
60            .and_then(|_| self.upstream())
61            .and_then(|_| self.print_result(io::stdout()))
62    }
63
64    fn check_server_status(&mut self) -> Result<()> {
65        let cmd = self.operator.read()?;
66        match cmd {
67            Command::Ready => {
68                debug!("Receive server ready");
69                Ok(())
70            }
71            Command::Decline => match self.operator.read_decline_reason()? {
72                DeclineReason::Unknown => Err(anyhow!("Server decline speed test :(")),
73                DeclineReason::MaxThreadsExceed(max_threads) => Err(anyhow!(
74                    "Server decline speed test. Cause: max threads exceeded({})",
75                    max_threads
76                )),
77            },
78            _ => Err(anyhow!("Unexpected command {:?}", cmd)),
79        }
80    }
81
82    fn ping_pon(&mut self) -> Result<()> {
83        self.operator.ping_write_then_read().map(|r| {
84            debug!("Successfully ping to remote server");
85            r
86        })
87    }
88
89    fn downstream(&mut self) -> Result<()> {
90        info!(
91            "Start downstream duration: {} seconds",
92            self.spec.downstream.duration.as_secs()
93        );
94        self.operator
95            .request_downstream(self.spec.downstream.duration)?;
96        self.spec.downstream.bytes = self.operator.read_loop()?;
97        Ok(())
98    }
99
100    fn upstream(&mut self) -> Result<()> {
101        info!(
102            "Start upstream duration: {} seconds",
103            self.spec.upstream.duration.as_secs()
104        );
105        self.operator
106            .request_upstream(self.spec.upstream.duration)?;
107        self.spec.upstream.bytes = self.operator.write_loop(self.spec.upstream.duration)?;
108        Ok(())
109    }
110
111    fn print_result<W: Write>(&mut self, mut writer: W) -> Result<()> {
112        writeln!(
113            writer,
114            "Downstream: {}",
115            self.format_throughput(&self.spec.downstream)
116        )
117        .and(writeln!(
118            writer,
119            "  Upstream: {}",
120            self.format_throughput(&self.spec.upstream)
121        ))
122        .map_err(anyhow::Error::from)
123    }
124
125    fn format_throughput(&self, throughput: &Throughput) -> String {
126        use crate::util::*;
127        format_bps(to_bps(throughput.bytes, throughput.duration))
128    }
129}