metrics_sqlite/
metrics_db.rs

1//! Metrics DB, to use/query/etc metrics SQLite databases
2use super::{models::Metric, setup_db, Result};
3use crate::models::MetricKey;
4use crate::MetricsError;
5use diesel::prelude::*;
6#[cfg(feature = "import_csv")]
7use serde::Deserialize;
8use std::path::Path;
9use std::time::Duration;
10
11/// Threshold to separate samples into sessions by
12const SESSION_TIME_GAP_THRESHOLD: Duration = Duration::from_secs(30);
13
14/// Calculated metric type from deriv_metrics_for_key()
15#[derive(Debug)]
16pub struct DerivMetric {
17    pub timestamp: f64,
18    pub key: String,
19    pub value: f64,
20}
21/// Describes a session, which is a sub-set of metrics data based on time gaps
22#[derive(Debug, Copy, Clone)]
23pub struct Session {
24    /// Timestamp session starts at
25    pub start_time: f64,
26    /// Timestamp session ends at
27    pub end_time: f64,
28    /// Duration of session
29    pub duration: Duration,
30}
31impl Session {
32    /// Creates a new session with given start & end, calculating duration from them
33    pub fn new(start_time: f64, end_time: f64) -> Self {
34        Session {
35            start_time,
36            end_time,
37            duration: Duration::from_secs_f64(end_time - start_time),
38        }
39    }
40}
41/// Metrics database, useful for querying stored metrics
42pub struct MetricsDb {
43    db: SqliteConnection,
44    sessions: Vec<Session>,
45}
46
47impl MetricsDb {
48    /// Creates a new metrics DB with given path of a SQLite database
49    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
50        let mut db = setup_db(path)?;
51        let sessions = Self::process_sessions(&mut db)?;
52        Ok(MetricsDb { db, sessions })
53    }
54
55    /// Returns sessions in database, based on `SESSION_TIME_GAP_THRESHOLD`
56    pub fn sessions(&self) -> Vec<Session> {
57        self.sessions.clone()
58    }
59
60    fn process_sessions(db: &mut SqliteConnection) -> Result<Vec<Session>> {
61        use crate::schema::metrics::dsl::*;
62        let timestamps = metrics
63            .select(timestamp)
64            .order(timestamp.asc())
65            .load::<f64>(db)?;
66        if timestamps.is_empty() {
67            return Err(MetricsError::EmptyDatabase);
68        }
69        let mut sessions: Vec<Session> = Vec::new();
70        let mut current_start = timestamps[0];
71        for pair in timestamps.windows(2) {
72            if pair[1] - pair[0] > SESSION_TIME_GAP_THRESHOLD.as_secs_f64() {
73                sessions.push(Session::new(current_start, pair[0]));
74                current_start = pair[1];
75            }
76        }
77        if let Some(last) = timestamps.last() {
78            if current_start < *last {
79                sessions.push(Session::new(current_start, *last));
80            }
81        }
82
83        Ok(sessions)
84    }
85
86    /// Returns list of metrics keys stored in the database
87    pub fn available_keys(&mut self) -> Result<Vec<String>> {
88        use crate::schema::metric_keys::dsl::*;
89        let r = metric_keys
90            .select(key)
91            .distinct()
92            .load::<String>(&mut self.db)?;
93        Ok(r)
94    }
95
96    /// Returns all metrics for given key in ascending timestamp order
97    pub fn metrics_for_key(
98        &mut self,
99        key_name: &str,
100        session: Option<&Session>,
101    ) -> Result<Vec<Metric>> {
102        use crate::schema::metrics::dsl::*;
103        let metric_key = self.metric_key_for_key(key_name)?;
104        let query = metrics
105            .order(timestamp.asc())
106            .filter(metric_key_id.eq(metric_key.id));
107        let r = match session {
108            Some(session) => query
109                .filter(timestamp.ge(session.start_time))
110                .filter(timestamp.le(session.end_time))
111                .load::<Metric>(&mut self.db)?,
112            None => query.load::<Metric>(&mut self.db)?,
113        };
114        Ok(r)
115    }
116
117    fn metric_key_for_key(&mut self, key_name: &str) -> Result<MetricKey> {
118        use crate::schema::metric_keys::dsl::*;
119        let query = metric_keys.filter(key.eq(key_name));
120        let keys = query.load::<MetricKey>(&mut self.db)?;
121        keys.into_iter()
122            .next()
123            .ok_or_else(|| MetricsError::KeyNotFound(key_name.to_string()))
124    }
125
126    /// Returns rate of change, the derivative, of the given metrics key's values
127    ///
128    /// f(t) = (x(t + 1) - x(t)) / ((t+1) - (t)
129    pub fn deriv_metrics_for_key(
130        &mut self,
131        key_name: &str,
132        session: Option<&Session>,
133    ) -> Result<Vec<DerivMetric>> {
134        let m = self.metrics_for_key(key_name, session)?;
135        let new_values: Vec<_> = m
136            .windows(2)
137            .map(|v| {
138                let new_value =
139                    (v[1].value - v[0].value) / (v[1].timestamp - v[0].timestamp);
140                DerivMetric {
141                    timestamp: v[1].timestamp,
142                    key: format!("{}.deriv", key_name),
143                    value: new_value,
144                }
145            })
146            .collect();
147        Ok(new_values)
148    }
149
150    /// Exports DB contents to CSV file
151    #[cfg(feature = "export_csv")]
152    pub fn export_to_csv<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
153        use crate::schema::metric_keys::dsl::key;
154        use crate::schema::metrics::dsl::*;
155        use std::fs::File;
156        let out_file = File::create(path)?;
157        let mut csv_writer = csv::Writer::from_writer(out_file);
158        // join the 2 tables so we get a flat CSV with the actual key names
159        let query = crate::schema::metrics::table.inner_join(crate::schema::metric_keys::table);
160        let query = query
161            .order(timestamp.asc())
162            .select((id, timestamp, key, value));
163        for row in query.load::<JoinedMetric>(&mut self.db)? {
164            csv_writer.serialize(row)?;
165        }
166        csv_writer.flush()?;
167        Ok(())
168    }
169    /// Imports CSV file into a MetricsDb file
170    #[cfg(feature = "import_csv")]
171    pub fn import_from_csv<S: AsRef<Path>, D: AsRef<Path>>(path: S, destination: D) -> Result<()> {
172        use crate::InnerState;
173        use csv::ReaderBuilder;
174        let db = setup_db(destination)?;
175        let mut reader = ReaderBuilder::new().from_path(path)?;
176        let mut inner = InnerState::new(Duration::from_secs(5), db);
177        let header = reader.headers()?.to_owned();
178        let mut flush_counter = 0u64;
179        for record in reader.records() {
180            match record {
181                Ok(record) => match record.deserialize::<MetricCsvRow>(Some(&header)) {
182                    Ok(r) => {
183                        if let Err(e) =
184                            inner.queue_metric(Duration::from_secs_f64(r.timestamp), r.key, r.value)
185                        {
186                            error!(
187                                "Skipping record due to error recording metric into DB: {:?}",
188                                e
189                            );
190                        }
191                        flush_counter += 1;
192                    }
193                    Err(e) => {
194                        error!("Skipping record due to error parsing CSV row: {:?}", e);
195                    }
196                },
197                Err(e) => {
198                    error!("Skipping record due to error reading CSV record: {:?}", e);
199                }
200            }
201            if flush_counter % 200 == 0 {
202                trace!("Flushing");
203                inner.flush()?;
204            }
205        }
206        inner.flush()?;
207        Ok(())
208    }
209}
210#[cfg(feature = "import_csv")]
211#[derive(Deserialize)]
212struct MetricCsvRow<'a> {
213    #[allow(unused)]
214    id: u64,
215    timestamp: f64,
216    key: &'a str,
217    value: f64,
218}
219/// Metric model for CSV export
220#[cfg(feature = "export_csv")]
221#[derive(Queryable, Debug)]
222#[cfg_attr(feature = "serde", derive(serde::Serialize))]
223struct JoinedMetric {
224    /// Unique ID of sample
225    pub id: i64,
226    /// Timestamp of sample
227    pub timestamp: f64,
228    /// Key/name of sample
229    pub key: String,
230    /// Value of sample
231    pub value: f64,
232}