use std::collections::{BTreeMap, HashSet};
use std::ops::DerefMut;
use std::sync::{Arc, Mutex, Condvar};
use std::thread::{self, JoinHandle};
use std::time::{Instant, SystemTime, Duration};
use diesel::{self, prelude::*};
use log::{trace, error};
use platforms::target::OS;
use reqwest::blocking::Client;
use serde::{Serialize, Deserialize};
use serde_json::Value;
use uuid::Uuid;
use crate::brokers::Broker;
use crate::core::{EmptyResult, GenericResult, GenericError};
use crate::db::{self, schema::{settings, telemetry}, models};
use crate::util;
#[derive(Serialize, Clone)]
pub struct TelemetryRecord {
id: String,
time: u64,
os: &'static str,
version: &'static str,
#[serde(skip_serializing_if = "util::is_default")]
precompiled: bool,
#[serde(skip_serializing_if = "util::is_default")]
container: bool,
command: String,
#[serde(skip_serializing_if = "Vec::is_empty")]
brokers: Vec<String>,
}
impl TelemetryRecord {
#[cfg(test)]
fn mock(id: usize) -> TelemetryRecord {
let mut record = TelemetryRecordBuilder::new().build("command-mock");
record.id = format!("{}", id);
record
}
}
pub struct TelemetryRecordBuilder {
brokers: HashSet<Broker>,
}
impl TelemetryRecordBuilder {
pub fn new() -> TelemetryRecordBuilder {
TelemetryRecordBuilder {
brokers: HashSet::new(),
}
}
pub fn new_with_broker(broker: Broker) -> TelemetryRecordBuilder {
let mut record = TelemetryRecordBuilder::new();
record.add_broker(broker);
record
}
pub fn add_broker(&mut self, broker: Broker) {
self.brokers.insert(broker);
}
pub fn build(self, command: &str) -> TelemetryRecord {
let mut brokers: Vec<String> = self.brokers.iter()
.map(|broker| broker.id().to_owned()).collect();
brokers.sort();
let id = Uuid::new_v4().to_string();
let os = std::env::consts::OS;
let time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default()
.as_secs();
TelemetryRecord {
id, time,
os,
version: env!("CARGO_PKG_VERSION"),
precompiled: option_env!("INVESTMENTS_PRECOMPILED_BINARY").is_some(),
container: os == OS::Linux.as_str() && std::process::id() == 1,
command: command.to_owned(),
brokers,
}
}
}
#[derive(Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct TelemetryConfig {
#[serde(default)]
pub user_id: Option<Uuid>,
#[serde(default)]
pub disable: bool,
}
#[derive(Serialize)]
struct TelemetryRequest {
user_id: String,
records: Vec<Value>,
}
pub struct Telemetry {
db: db::Connection,
user_id: Option<String>,
sender: Option<TelemetrySender>,
}
impl Telemetry {
pub fn new(
connection: db::Connection, user_id: Option<String>,
url: &str, flush_thresholds: BTreeMap<usize, Duration>, max_records: usize,
) -> GenericResult<Telemetry> {
let mut telemetry = Telemetry {
db: connection,
user_id,
sender: None,
};
if let Some((records, last_record_id)) = telemetry.load(max_records)? {
let user_id = telemetry.user_id()?;
let request = TelemetryRequest {user_id, records};
let mut deadline = Instant::now();
for (&threshold, &timeout) in flush_thresholds.iter().rev() {
if request.records.len() % threshold == 0 {
deadline += timeout;
break;
}
}
telemetry.sender.replace(TelemetrySender::new(url, request, last_record_id, deadline));
}
Ok(telemetry)
}
pub fn add(&self, record: TelemetryRecord) -> EmptyResult {
let payload = serde_json::to_string(&record)?;
diesel::insert_into(telemetry::table)
.values(models::NewTelemetryRecord {payload})
.execute(self.db.borrow().deref_mut())?;
Ok(())
}
fn load(&self, max_records: usize) -> GenericResult<Option<(Vec<Value>, i64)>> {
let records = telemetry::table
.select((telemetry::id, telemetry::payload))
.order_by(telemetry::id.asc())
.load::<(i64, String)>(self.db.borrow().deref_mut())?;
let mut records: &[_] = &records;
if records.len() > max_records {
let count = records.len() - max_records;
trace!("Dropping {} telemetry records.", count);
self.delete(records[count - 1].0)?;
records = &records[count..];
}
let mut payloads = Vec::with_capacity(records.len());
for record in records {
let payload = serde_json::from_str(&record.1).map_err(|e| format!(
"Failed to parse telemetry record: {}", e))?;
payloads.push(payload);
}
Ok(records.last().map(|record| (payloads, record.0)))
}
fn delete(&self, last_record_id: i64) -> EmptyResult {
diesel::delete(telemetry::table.filter(telemetry::id.le(last_record_id)))
.execute(self.db.borrow().deref_mut())?;
Ok(())
}
fn user_id(&self) -> GenericResult<String> {
self.db.borrow().transaction::<_, GenericError, _>(|db| {
let name = models::SETTING_USER_ID;
let user_id = settings::table
.select(settings::value)
.filter(settings::name.eq(name))
.get_result::<String>(db).optional()?;
Ok(match (user_id, self.user_id.clone()) {
(Some(user_id), None) => user_id,
(Some(user_id), Some(configured_user_id)) if user_id == configured_user_id => user_id,
(_, configured_user_id) => {
let user_id = configured_user_id.unwrap_or_else(|| Uuid::new_v4().to_string());
diesel::replace_into(settings::table)
.values(&models::NewSetting {name, value: &user_id})
.execute(db)?;
user_id
},
})
})
}
#[cfg(test)]
fn close(mut self) -> EmptyResult {
self.close_impl()
}
fn close_impl(&mut self) -> EmptyResult {
if let Some(sender) = self.sender.take() {
if let Some(last_record_id) = sender.wait() {
self.delete(last_record_id).map_err(|e| format!(
"Failed to delete telemetry records: {}", e))?;
}
}
Ok(())
}
}
impl Drop for Telemetry {
fn drop(&mut self) {
if let Err(err) = self.close_impl() {
error!("{}.", err)
}
}
}
struct TelemetrySender {
thread: JoinHandle<()>,
result: Arc<(Mutex<Option<Option<i64>>>, Condvar)>,
deadline: Instant,
}
impl TelemetrySender {
fn new(url: &str, request: TelemetryRequest, last_record_id: i64, deadline: Instant) -> TelemetrySender {
let result = Arc::new((Mutex::new(None), Condvar::new()));
let thread = {
let url = url.to_owned();
let result = result.clone();
thread::spawn(move || {
let ok = TelemetrySender::send(&url, request);
let (lock, cond) = result.as_ref();
let mut result = lock.lock().unwrap();
result.replace(if ok {
Some(last_record_id)
} else {
None
});
cond.notify_one();
})
};
TelemetrySender {thread, result, deadline}
}
fn wait(self) -> Option<i64> {
let result = {
let (lock, cond) = self.result.as_ref();
let guard = lock.lock().unwrap();
let timeout = self.deadline.checked_duration_since(Instant::now()).unwrap_or_default();
let (mut result, _) = cond.wait_timeout_while(
guard, timeout, |result| result.is_none(),
).unwrap();
result.take().unwrap_or_default()
};
if cfg!(test) {
self.thread.join().unwrap();
} else {
}
result
}
fn send(base_url: &str, request: TelemetryRequest) -> bool {
let url = format!("{}/telemetry", base_url);
trace!("Sending telemetry ({} records)...", request.records.len());
match Client::new().post(url).json(&request).send() {
Ok(response) => {
let status = response.status();
if status.is_success() {
if cfg!(test) {
let _ = response.bytes();
}
trace!("Telemetry has been successfully sent.");
true
} else {
trace!("Telemetry server returned an error: {}.", status);
false
}
},
Err(e) => {
trace!("Failed to send telemetry: {}.", e);
false
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use mockito::{Server, Mock};
#[test]
fn telemetry() {
let mut server = Server::new();
let url = server.url();
let (_database, connection) = db::new_temporary();
let new_telemetry = || {
Telemetry::new(connection.clone(), None, &url, btreemap!{
1 => Duration::from_millis(10),
}, 5).unwrap()
};
let mut expected = vec![];
let mut mock = broken_server(&mut server).expect(0);
let user_id = {
let telemetry = new_telemetry();
let user_id = telemetry.user_id().unwrap();
for id in 0..4 {
let record = TelemetryRecord::mock(id);
telemetry.add(record.clone()).unwrap();
expected.push(record);
}
telemetry.close().unwrap();
user_id
};
mock.assert();
compare(connection.clone(), &expected); {
let telemetry = new_telemetry();
for id in 4..8 {
let record = TelemetryRecord::mock(id);
telemetry.add(record.clone()).unwrap();
expected.push(record);
}
telemetry.close().unwrap();
}
mock = mock.expect(1);
mock.assert();
compare(connection.clone(), &expected); {
let telemetry = new_telemetry();
expected.drain(..3);
for id in 8..12 {
let record = TelemetryRecord::mock(id);
telemetry.add(record.clone()).unwrap();
expected.push(record);
}
telemetry.close().unwrap();
}
mock = mock.expect(2);
mock.assert();
compare(connection.clone(), &expected); expected.drain(..4);
mock = healthy_server(&mut server, &user_id, &expected); {
let telemetry = new_telemetry();
for id in 12..16 {
let record = TelemetryRecord::mock(id);
telemetry.add(record.clone()).unwrap();
expected.push(record);
}
telemetry.close().unwrap();
}
mock.assert();
expected.drain(..5);
compare(connection.clone(), &expected); mock = unreachable_server(&mut server);
{
let telemetry = new_telemetry();
let record = TelemetryRecord::mock(16);
telemetry.add(record.clone()).unwrap();
expected.push(record);
telemetry.close().unwrap();
}
mock.assert();
compare(connection.clone(), &expected); mock = healthy_server(&mut server, &user_id, &expected);
{
let telemetry = new_telemetry();
let record = TelemetryRecord::mock(17);
telemetry.add(record.clone()).unwrap();
expected.push(record);
telemetry.close().unwrap();
}
mock.assert();
expected.drain(..5);
compare(connection.clone(), &expected); }
fn broken_server(server: &mut Server, ) -> Mock {
server.mock("POST", "/telemetry")
.with_status(500)
.create()
}
fn healthy_server(server: &mut Server, user_id: &str, expected: &[TelemetryRecord]) -> Mock {
let expected_request = TelemetryRequest {
user_id: user_id.to_owned(),
records: expected.iter().map(|record| {
serde_json::to_value(record).unwrap()
}).collect(),
};
let expected_body = serde_json::to_string(&expected_request).unwrap();
server.mock("POST", "/telemetry")
.match_header("content-type", "application/json")
.match_body(expected_body.as_str())
.with_status(200)
.create()
}
fn unreachable_server(server: &mut Server, ) -> Mock {
server.mock("POST", "/telemetry")
.with_status(200)
.with_chunked_body(|_| {
thread::sleep(Duration::from_millis(100));
Ok(())
})
.create()
}
fn compare(connection: db::Connection, expected: &[TelemetryRecord]) {
let actual = telemetry::table
.select(telemetry::payload)
.order_by(telemetry::id.asc())
.load::<String>(connection.borrow().deref_mut()).unwrap();
let expected: Vec<String> = expected.iter()
.map(|record| serde_json::to_string(record).unwrap())
.collect();
assert_eq!(actual, expected);
}
}