vigil_reporter/
lib.rs

1//! rs-vigil-reporter Vigil Reporter for Rust.
2
3#[macro_use]
4extern crate log;
5#[macro_use]
6extern crate serde_derive;
7
8use std::cmp::max;
9use std::convert::TryFrom;
10use std::io;
11use std::thread;
12use std::time::Duration;
13
14use base64::engine::general_purpose::STANDARD as base64_encoder;
15use base64::Engine;
16use http_req::{
17    request::{Method, Request},
18    uri::Uri,
19};
20use serde_json;
21use sys_info::{cpu_num, loadavg, mem_info};
22
23static LOG_NAME: &'static str = "Vigil Reporter";
24
25pub const HTTP_CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
26
27pub struct Reporter<'a> {
28    url: &'a str,
29    token: &'a str,
30    probe_id: Option<&'a str>,
31    node_id: Option<&'a str>,
32    replica_id: Option<&'a str>,
33    interval: Duration,
34}
35
36pub struct ReporterBuilder<'a> {
37    reporter: Reporter<'a>,
38}
39
40struct ReporterManager {
41    report_uri: String,
42    replica_id: String,
43    interval: Duration,
44    useragent: String,
45    authorization: String,
46}
47
48#[derive(Serialize, Debug)]
49struct ReportPayload<'a> {
50    replica: &'a str,
51    interval: u64,
52    load: ReportPayloadLoad,
53}
54
55#[derive(Serialize, Debug)]
56struct ReportPayloadLoad {
57    cpu: f32,
58    ram: f32,
59}
60
61impl<'a> Reporter<'a> {
62    pub fn new(url: &'a str, token: &'a str) -> ReporterBuilder<'a> {
63        ReporterBuilder {
64            reporter: Reporter {
65                url: url,
66                token: token,
67                probe_id: None,
68                node_id: None,
69                replica_id: None,
70                interval: Duration::from_secs(30),
71            },
72        }
73    }
74
75    pub fn run(&self) -> Result<(), ()> {
76        debug!("{}: Will run using URL: {}", LOG_NAME, self.url);
77
78        // Build thread manager context?
79        match (self.probe_id, self.node_id, self.replica_id) {
80            (Some(probe_id), Some(node_id), Some(replica_id)) => {
81                let manager = ReporterManager {
82                    report_uri: format!("{}/reporter/{}/{}/", self.url, probe_id, node_id),
83                    replica_id: replica_id.to_owned(),
84                    interval: self.interval,
85
86                    useragent: format!(
87                        "rs-{}/{}",
88                        env!("CARGO_PKG_NAME"),
89                        env!("CARGO_PKG_VERSION")
90                    ),
91
92                    authorization: format!(
93                        "Basic {}",
94                        base64_encoder.encode(&format!(":{}", self.token))
95                    ),
96                };
97
98                // Spawn thread
99                thread::Builder::new()
100                    .name("vigil-reporter".to_string())
101                    .spawn(move || manager.run())
102                    .or(Err(()))
103                    .and(Ok(()))
104            }
105            _ => Err(()),
106        }
107    }
108}
109
110impl<'a> ReporterBuilder<'a> {
111    pub fn build(self) -> Reporter<'a> {
112        if self.reporter.probe_id.is_none() {
113            panic!("missing probe_id");
114        }
115        if self.reporter.node_id.is_none() {
116            panic!("missing node_id");
117        }
118        if self.reporter.replica_id.is_none() {
119            panic!("missing replica_id");
120        }
121
122        self.reporter
123    }
124
125    pub fn probe_id(mut self, probe_id: &'a str) -> ReporterBuilder<'a> {
126        self.reporter.probe_id = Some(probe_id);
127
128        self
129    }
130
131    pub fn node_id(mut self, node_id: &'a str) -> ReporterBuilder<'a> {
132        self.reporter.node_id = Some(node_id);
133
134        self
135    }
136
137    pub fn replica_id(mut self, replica_id: &'a str) -> ReporterBuilder<'a> {
138        self.reporter.replica_id = Some(replica_id);
139
140        self
141    }
142
143    pub fn interval(mut self, interval: Duration) -> ReporterBuilder<'a> {
144        self.reporter.interval = interval;
145
146        self
147    }
148}
149
150impl ReporterManager {
151    pub fn run(&self) {
152        debug!("{}: Now running", LOG_NAME);
153
154        // Schedule first report after 10 seconds
155        thread::sleep(Duration::from_secs(10));
156
157        loop {
158            if self.report().is_err() == true {
159                warn!(
160                    "{}: Last report failed, trying again sooner than usual",
161                    LOG_NAME
162                );
163
164                // Try reporting again after half the interval (this report failed)
165                thread::sleep(self.interval / 2);
166
167                self.report().ok();
168            }
169
170            thread::sleep(self.interval);
171        }
172    }
173
174    fn report(&self) -> Result<(), ()> {
175        debug!("{}: Will dispatch request", LOG_NAME);
176
177        // Generate report payload
178        let payload = ReportPayload {
179            replica: &self.replica_id,
180            interval: self.interval.as_secs(),
181            load: ReportPayloadLoad {
182                cpu: Self::get_load_cpu(),
183                ram: Self::get_load_ram(),
184            },
185        };
186
187        debug!(
188            "{}: Will send request to URL: {} with payload: {:?}",
189            LOG_NAME, &self.report_uri, payload
190        );
191
192        // Encode payload to string
193        let payload_json = serde_json::to_vec(&payload).or(Err(()))?;
194
195        // Generate request URI
196        let request_uri = Uri::try_from(self.report_uri.as_str()).or(Err(()))?;
197
198        // Acquire report response
199        let mut response_sink = io::sink();
200
201        let response = Request::new(&request_uri)
202            .connect_timeout(Some(HTTP_CLIENT_TIMEOUT))
203            .read_timeout(Some(HTTP_CLIENT_TIMEOUT))
204            .write_timeout(Some(HTTP_CLIENT_TIMEOUT))
205            .method(Method::POST)
206            .header("User-Agent", &self.useragent)
207            .header("Authorization", &self.authorization)
208            .header("Content-Type", "application/json")
209            .header("Content-Length", &payload_json.len())
210            .body(&payload_json)
211            .send(&mut response_sink);
212
213        match response {
214            Ok(response) => {
215                let status_code = response.status_code();
216
217                if status_code.is_success() {
218                    debug!("{}: Request succeeded", LOG_NAME);
219
220                    // Return with success
221                    return Ok(());
222                } else {
223                    warn!("{}: Got non-OK status code: {}", LOG_NAME, status_code);
224                }
225            }
226            Err(err) => error!("{}: Failed dispatching request: {}", LOG_NAME, err),
227        }
228
229        // Return with error
230        Err(())
231    }
232
233    fn get_load_cpu() -> f32 {
234        match (cpu_num(), loadavg()) {
235            (Ok(cpu_num_value), Ok(loadavg_value)) => {
236                (loadavg_value.one / (max(cpu_num_value, 1) as f64)) as f32
237            }
238            _ => 0.00,
239        }
240    }
241
242    fn get_load_ram() -> f32 {
243        if let Ok(mem_info_value) = mem_info() {
244            1.00 - ((mem_info_value.avail as f32) / (mem_info_value.total as f32))
245        } else {
246            0.00
247        }
248    }
249}