gbps/
monitor.rs

1use std::io::Read;
2use std::io::Write;
3
4/// Configuration for sending protocol monitoring data
5#[derive(Clone)]
6pub struct MonitoringConfig {
7    /// Enable sending data
8    enabled: bool,
9    /// Monitoring host
10    host: String,
11    /// URL context
12    context: String,
13}
14
15impl MonitoringConfig {
16    /// Creates a new monitoring configuration
17    ///
18    /// # Arguments
19    ///
20    /// * `enabled` - Share monitoring data
21    /// * `url` - URL of monitoring host
22    pub fn new(enabled: bool, url: &str) -> MonitoringConfig {
23        // remove leading protocol
24        let protocol_removed = if url.starts_with("http://") {
25            &url[7..] }
26        else {
27            url
28        };
29        // separate host and context
30        let (host, context) = match protocol_removed.find("/") {
31            Some(index) => (&protocol_removed[..index], &protocol_removed[index..]),
32            None => (url, "/")
33        };
34        MonitoringConfig {
35            enabled,
36            host: host.to_owned(),
37            context: context.to_owned(),
38        }
39    }
40
41    pub fn enabled(&self) -> bool {
42        self.enabled
43    }
44
45    /// Send monitoring data
46    ///
47    /// # Arguments
48    ///
49    /// * `pid` - Identifier of sending process
50    /// * `peers` - List of peers in the view of the process
51    pub fn send_data(&self, pid: &str, peers: Vec<String>) {
52        let pid = pid.to_owned();
53        let host = self.host.clone();
54        let context = self.context.clone();
55        std::thread::spawn(move || {
56            let peers_str = peers.iter()
57                .map(|peer| format!("\"{}\"", peer))
58                .collect::<Vec<String>>().join(",");
59            let json = format!(
60                "{{\
61                \"id\":\"{}\",\
62                \"peers\":[{}],\
63                \"messages\":[{}]\
64            }}", pid, peers_str, "");
65            //println!("send_data:\n{}", json);
66            match MonitoringConfig::post(&host, &context, json) {
67                Ok(()) => log::debug!("Peer {}: monitoring data sent", pid),
68                Err(e) => log::warn!("Peer {} could not send monitoring data to {}: {}", pid, host, e),
69            }
70        });
71    }
72
73    fn post(host: &str, context: &str, json: String) -> std::io::Result<()> {
74
75        let bytes = json.as_bytes();
76
77        let mut stream = std::net::TcpStream::connect(host)?;
78
79        let mut request_data = String::new();
80        request_data.push_str(&format!("POST {} HTTP/1.1", context));
81        request_data.push_str("\r\n");
82        request_data.push_str(&format!("Host: {}", host));
83        request_data.push_str("\r\n");
84        request_data.push_str("Accept: */*");
85        request_data.push_str("\r\n");
86        request_data.push_str("Content-Type: application/json; charset=UTF-8");
87        request_data.push_str("\r\n");
88        request_data.push_str(&format!("Content-Length: {}", bytes.len()));
89        request_data.push_str("\r\n");
90        request_data.push_str("Connection: close");
91        request_data.push_str("\r\n");
92        request_data.push_str("\r\n");
93        request_data.push_str(&json);
94
95        //println!("request_data = {:?}", request_data);
96
97        let _request = stream.write_all(request_data.as_bytes())?;
98        //println!("request = {:?}", request);
99
100        let mut buf = String::new();
101        let _result = stream.read_to_string(&mut buf)?;
102        //println!("result = {}", result);
103        log::debug!("buf = {}", buf);
104
105        Ok(())
106    }
107}
108
109impl Default for MonitoringConfig {
110    fn default() -> Self {
111        MonitoringConfig {
112            enabled: false,
113            host: "".to_string(),
114            context: "".to_string(),
115        }
116    }
117}