use super::{
MetricsError, Result,
models::{Metric, MetricKey},
setup_db,
};
use diesel::prelude::*;
#[cfg(feature = "import_csv")]
use serde::Deserialize;
use std::{path::Path, time::Duration};
#[cfg(feature = "import_csv")]
use tracing::{error, trace};
const SESSION_TIME_GAP_THRESHOLD: Duration = Duration::from_secs(30);
#[derive(Debug)]
pub struct DerivMetric {
pub timestamp: f64,
pub key: String,
pub value: f64,
}
#[derive(Debug, Copy, Clone)]
pub struct Session {
pub start_time: f64,
pub end_time: f64,
pub duration: Duration,
}
impl Session {
pub fn new(start_time: f64, end_time: f64) -> Self {
Session {
start_time,
end_time,
duration: Duration::from_secs_f64(end_time - start_time),
}
}
}
pub(crate) mod query;
pub struct MetricsDb {
db: SqliteConnection,
sessions: Vec<Session>,
}
impl MetricsDb {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let mut db = setup_db(path)?;
let sessions = Self::process_sessions(&mut db)?;
Ok(MetricsDb { db, sessions })
}
pub fn sessions(&self) -> Vec<Session> {
self.sessions.clone()
}
pub fn session_from_signpost(&mut self, metric: &str) -> Result<Session> {
query::session_from_signpost(&mut self.db, metric)
}
fn process_sessions(db: &mut SqliteConnection) -> Result<Vec<Session>> {
use crate::schema::metrics::dsl::*;
let timestamps = metrics
.select(timestamp)
.order(timestamp.asc())
.load::<f64>(db)?;
if timestamps.is_empty() {
return Err(MetricsError::EmptyDatabase);
}
let mut sessions: Vec<Session> = Vec::new();
let mut current_start = timestamps[0];
for pair in timestamps.windows(2) {
if pair[1] - pair[0] > SESSION_TIME_GAP_THRESHOLD.as_secs_f64() {
sessions.push(Session::new(current_start, pair[0]));
current_start = pair[1];
}
}
if let Some(last) = timestamps.last() {
if current_start < *last {
sessions.push(Session::new(current_start, *last));
}
}
Ok(sessions)
}
pub fn available_keys(&mut self) -> Result<Vec<String>> {
use crate::schema::metric_keys::dsl::*;
let r = metric_keys
.select(key)
.distinct()
.load::<String>(&mut self.db)?;
Ok(r)
}
pub fn average_metrics_from_signpost(
&mut self,
signpost: &str,
keys: &[&str],
) -> Result<std::collections::HashMap<String, f64>> {
query::metrics_summary_for_signpost_and_keys(
&mut self.db,
signpost,
keys.iter().map(|s| s.to_string()).collect(),
)
}
pub fn metrics_for_key(
&mut self,
key_name: &str,
session: Option<&Session>,
) -> Result<Vec<Metric>> {
query::metrics_for_key(&mut self.db, key_name, session)
}
pub fn deriv_metrics_for_key(
&mut self,
key_name: &str,
session: Option<&Session>,
) -> Result<Vec<DerivMetric>> {
let m = self.metrics_for_key(key_name, session)?;
let new_values: Vec<_> = m
.windows(2)
.map(|v| {
let dt = v[1].timestamp - v[0].timestamp;
let new_value = if dt > 0.0 {
(v[1].value - v[0].value) / dt
} else {
0.0
};
DerivMetric {
timestamp: v[1].timestamp,
key: format!("{key_name}.deriv"),
value: new_value,
}
})
.collect();
Ok(new_values)
}
#[cfg(feature = "export_csv")]
pub fn export_to_csv<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
use crate::schema::metric_keys::dsl::key;
use crate::schema::metrics::dsl::*;
use std::fs::File;
let out_file = File::create(path)?;
let mut csv_writer = csv::Writer::from_writer(out_file);
let query = crate::schema::metrics::table.inner_join(crate::schema::metric_keys::table);
let query = query
.order(timestamp.asc())
.select((id, timestamp, key, value));
for row in query.load::<JoinedMetric>(&mut self.db)? {
csv_writer.serialize(row)?;
}
csv_writer.flush()?;
Ok(())
}
#[cfg(feature = "import_csv")]
pub fn import_from_csv<S: AsRef<Path>, D: AsRef<Path>>(path: S, destination: D) -> Result<()> {
use crate::InnerState;
use csv::ReaderBuilder;
let db = setup_db(destination)?;
let mut reader = ReaderBuilder::new().from_path(path)?;
let mut inner = InnerState::new(Duration::from_secs(5), db);
let header = reader.headers()?.to_owned();
let mut flush_counter = 0u64;
for record in reader.records() {
match record {
Ok(record) => match record.deserialize::<MetricCsvRow>(Some(&header)) {
Ok(r) => {
if let Err(e) =
inner.queue_metric(Duration::from_secs_f64(r.timestamp), r.key, r.value)
{
error!(
"Skipping record due to error recording metric into DB: {:?}",
e
);
}
flush_counter += 1;
}
Err(e) => {
error!("Skipping record due to error parsing CSV row: {:?}", e);
}
},
Err(e) => {
error!("Skipping record due to error reading CSV record: {:?}", e);
}
}
if flush_counter % 200 == 0 {
trace!("Flushing");
inner.flush()?;
}
}
inner.flush()?;
Ok(())
}
}
#[cfg(feature = "import_csv")]
#[derive(Deserialize)]
struct MetricCsvRow<'a> {
#[allow(unused)]
id: u64,
timestamp: f64,
key: &'a str,
value: f64,
}
#[cfg(feature = "export_csv")]
#[derive(Queryable, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
struct JoinedMetric {
pub id: i64,
pub timestamp: f64,
pub key: String,
pub value: f64,
}