1#[macro_use]
4extern crate log;
5#[macro_use]
6extern crate serde_derive;
7
8use std::cmp::max;
9use std::convert::TryFrom;
10use std::io;
11use std::thread;
12use std::time::Duration;
13
14use base64::engine::general_purpose::STANDARD as base64_encoder;
15use base64::Engine;
16use http_req::{
17 request::{Method, Request},
18 uri::Uri,
19};
20use serde_json;
21use sys_info::{cpu_num, loadavg, mem_info};
22
23static LOG_NAME: &'static str = "Vigil Reporter";
24
25pub const HTTP_CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
26
27pub struct Reporter<'a> {
28 url: &'a str,
29 token: &'a str,
30 probe_id: Option<&'a str>,
31 node_id: Option<&'a str>,
32 replica_id: Option<&'a str>,
33 interval: Duration,
34}
35
36pub struct ReporterBuilder<'a> {
37 reporter: Reporter<'a>,
38}
39
40struct ReporterManager {
41 report_uri: String,
42 replica_id: String,
43 interval: Duration,
44 useragent: String,
45 authorization: String,
46}
47
48#[derive(Serialize, Debug)]
49struct ReportPayload<'a> {
50 replica: &'a str,
51 interval: u64,
52 load: ReportPayloadLoad,
53}
54
55#[derive(Serialize, Debug)]
56struct ReportPayloadLoad {
57 cpu: f32,
58 ram: f32,
59}
60
61impl<'a> Reporter<'a> {
62 pub fn new(url: &'a str, token: &'a str) -> ReporterBuilder<'a> {
63 ReporterBuilder {
64 reporter: Reporter {
65 url: url,
66 token: token,
67 probe_id: None,
68 node_id: None,
69 replica_id: None,
70 interval: Duration::from_secs(30),
71 },
72 }
73 }
74
75 pub fn run(&self) -> Result<(), ()> {
76 debug!("{}: Will run using URL: {}", LOG_NAME, self.url);
77
78 match (self.probe_id, self.node_id, self.replica_id) {
80 (Some(probe_id), Some(node_id), Some(replica_id)) => {
81 let manager = ReporterManager {
82 report_uri: format!("{}/reporter/{}/{}/", self.url, probe_id, node_id),
83 replica_id: replica_id.to_owned(),
84 interval: self.interval,
85
86 useragent: format!(
87 "rs-{}/{}",
88 env!("CARGO_PKG_NAME"),
89 env!("CARGO_PKG_VERSION")
90 ),
91
92 authorization: format!(
93 "Basic {}",
94 base64_encoder.encode(&format!(":{}", self.token))
95 ),
96 };
97
98 thread::Builder::new()
100 .name("vigil-reporter".to_string())
101 .spawn(move || manager.run())
102 .or(Err(()))
103 .and(Ok(()))
104 }
105 _ => Err(()),
106 }
107 }
108}
109
110impl<'a> ReporterBuilder<'a> {
111 pub fn build(self) -> Reporter<'a> {
112 if self.reporter.probe_id.is_none() {
113 panic!("missing probe_id");
114 }
115 if self.reporter.node_id.is_none() {
116 panic!("missing node_id");
117 }
118 if self.reporter.replica_id.is_none() {
119 panic!("missing replica_id");
120 }
121
122 self.reporter
123 }
124
125 pub fn probe_id(mut self, probe_id: &'a str) -> ReporterBuilder<'a> {
126 self.reporter.probe_id = Some(probe_id);
127
128 self
129 }
130
131 pub fn node_id(mut self, node_id: &'a str) -> ReporterBuilder<'a> {
132 self.reporter.node_id = Some(node_id);
133
134 self
135 }
136
137 pub fn replica_id(mut self, replica_id: &'a str) -> ReporterBuilder<'a> {
138 self.reporter.replica_id = Some(replica_id);
139
140 self
141 }
142
143 pub fn interval(mut self, interval: Duration) -> ReporterBuilder<'a> {
144 self.reporter.interval = interval;
145
146 self
147 }
148}
149
150impl ReporterManager {
151 pub fn run(&self) {
152 debug!("{}: Now running", LOG_NAME);
153
154 thread::sleep(Duration::from_secs(10));
156
157 loop {
158 if self.report().is_err() == true {
159 warn!(
160 "{}: Last report failed, trying again sooner than usual",
161 LOG_NAME
162 );
163
164 thread::sleep(self.interval / 2);
166
167 self.report().ok();
168 }
169
170 thread::sleep(self.interval);
171 }
172 }
173
174 fn report(&self) -> Result<(), ()> {
175 debug!("{}: Will dispatch request", LOG_NAME);
176
177 let payload = ReportPayload {
179 replica: &self.replica_id,
180 interval: self.interval.as_secs(),
181 load: ReportPayloadLoad {
182 cpu: Self::get_load_cpu(),
183 ram: Self::get_load_ram(),
184 },
185 };
186
187 debug!(
188 "{}: Will send request to URL: {} with payload: {:?}",
189 LOG_NAME, &self.report_uri, payload
190 );
191
192 let payload_json = serde_json::to_vec(&payload).or(Err(()))?;
194
195 let request_uri = Uri::try_from(self.report_uri.as_str()).or(Err(()))?;
197
198 let mut response_sink = io::sink();
200
201 let response = Request::new(&request_uri)
202 .connect_timeout(Some(HTTP_CLIENT_TIMEOUT))
203 .read_timeout(Some(HTTP_CLIENT_TIMEOUT))
204 .write_timeout(Some(HTTP_CLIENT_TIMEOUT))
205 .method(Method::POST)
206 .header("User-Agent", &self.useragent)
207 .header("Authorization", &self.authorization)
208 .header("Content-Type", "application/json")
209 .header("Content-Length", &payload_json.len())
210 .body(&payload_json)
211 .send(&mut response_sink);
212
213 match response {
214 Ok(response) => {
215 let status_code = response.status_code();
216
217 if status_code.is_success() {
218 debug!("{}: Request succeeded", LOG_NAME);
219
220 return Ok(());
222 } else {
223 warn!("{}: Got non-OK status code: {}", LOG_NAME, status_code);
224 }
225 }
226 Err(err) => error!("{}: Failed dispatching request: {}", LOG_NAME, err),
227 }
228
229 Err(())
231 }
232
233 fn get_load_cpu() -> f32 {
234 match (cpu_num(), loadavg()) {
235 (Ok(cpu_num_value), Ok(loadavg_value)) => {
236 (loadavg_value.one / (max(cpu_num_value, 1) as f64)) as f32
237 }
238 _ => 0.00,
239 }
240 }
241
242 fn get_load_ram() -> f32 {
243 if let Ok(mem_info_value) = mem_info() {
244 1.00 - ((mem_info_value.avail as f32) / (mem_info_value.total as f32))
245 } else {
246 0.00
247 }
248 }
249}