metrics_sqlite/
metrics_db.rs

1//! Metrics DB, to use/query/etc metrics SQLite databases
2use super::{
3    models::{Metric, MetricKey},
4    setup_db, MetricsError, Result,
5};
6use diesel::prelude::*;
7#[cfg(feature = "import_csv")]
8use serde::Deserialize;
9use std::{path::Path, time::Duration};
10#[cfg(feature = "import_csv")]
11use tracing::{error, trace};
12
13/// Threshold to separate samples into sessions by
14const SESSION_TIME_GAP_THRESHOLD: Duration = Duration::from_secs(30);
15
16/// Calculated metric type from deriv_metrics_for_key()
17#[derive(Debug)]
18pub struct DerivMetric {
19    pub timestamp: f64,
20    pub key: String,
21    pub value: f64,
22}
23/// Describes a session, which is a subset of metrics data based on time gaps
24#[derive(Debug, Copy, Clone)]
25pub struct Session {
26    /// Timestamp session starts at
27    pub start_time: f64,
28    /// Timestamp session ends at
29    pub end_time: f64,
30    /// Duration of session
31    pub duration: Duration,
32}
33impl Session {
34    /// Creates a new session with given start and end, calculating duration from them
35    pub fn new(start_time: f64, end_time: f64) -> Self {
36        Session {
37            start_time,
38            end_time,
39            duration: Duration::from_secs_f64(end_time - start_time),
40        }
41    }
42}
43pub(crate) mod query;
44
45/// Metrics database, useful for querying stored metrics
46pub struct MetricsDb {
47    db: SqliteConnection,
48    sessions: Vec<Session>,
49}
50
51impl MetricsDb {
52    /// Creates a new metrics DB with a given path of an SQLite database
53    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
54        let mut db = setup_db(path)?;
55        let sessions = Self::process_sessions(&mut db)?;
56        Ok(MetricsDb { db, sessions })
57    }
58
59    /// Returns sessions in the database, based on `SESSION_TIME_GAP_THRESHOLD`
60    pub fn sessions(&self) -> Vec<Session> {
61        self.sessions.clone()
62    }
63
64    /// Returns a session (timestamp range) from the first occurrence of the signpost to the latest metric
65    pub fn session_from_signpost(&mut self, metric: &str) -> Result<Session> {
66        query::session_from_signpost(&mut self.db, metric)
67    }
68
69    fn process_sessions(db: &mut SqliteConnection) -> Result<Vec<Session>> {
70        use crate::schema::metrics::dsl::*;
71        let timestamps = metrics
72            .select(timestamp)
73            .order(timestamp.asc())
74            .load::<f64>(db)?;
75        if timestamps.is_empty() {
76            return Err(MetricsError::EmptyDatabase);
77        }
78        let mut sessions: Vec<Session> = Vec::new();
79        let mut current_start = timestamps[0];
80        for pair in timestamps.windows(2) {
81            if pair[1] - pair[0] > SESSION_TIME_GAP_THRESHOLD.as_secs_f64() {
82                sessions.push(Session::new(current_start, pair[0]));
83                current_start = pair[1];
84            }
85        }
86        if let Some(last) = timestamps.last() {
87            if current_start < *last {
88                sessions.push(Session::new(current_start, *last));
89            }
90        }
91
92        Ok(sessions)
93    }
94
95    /// Returns list of metrics keys stored in the database
96    pub fn available_keys(&mut self) -> Result<Vec<String>> {
97        use crate::schema::metric_keys::dsl::*;
98        let r = metric_keys
99            .select(key)
100            .distinct()
101            .load::<String>(&mut self.db)?;
102        Ok(r)
103    }
104
105    /// Returns all metrics for given key in ascending timestamp order
106    pub fn metrics_for_key(
107        &mut self,
108        key_name: &str,
109        session: Option<&Session>,
110    ) -> Result<Vec<Metric>> {
111        query::metrics_for_key(&mut self.db, key_name, session)
112    }
113
114    /// Returns rate of change, the derivative, of the given metrics key's values
115    ///
116    /// f(t) = (x(t + 1) - x(t)) / ((t+1) - (t)
117    pub fn deriv_metrics_for_key(
118        &mut self,
119        key_name: &str,
120        session: Option<&Session>,
121    ) -> Result<Vec<DerivMetric>> {
122        let m = self.metrics_for_key(key_name, session)?;
123        let new_values: Vec<_> = m
124            .windows(2)
125            .map(|v| {
126                let dt = v[1].timestamp - v[0].timestamp;
127                let new_value = if dt > 0.0 {
128                    (v[1].value - v[0].value) / dt
129                } else {
130                    0.0
131                };
132                DerivMetric {
133                    timestamp: v[1].timestamp,
134                    key: format!("{key_name}.deriv"),
135                    value: new_value,
136                }
137            })
138            .collect();
139        Ok(new_values)
140    }
141
142    /// Exports DB contents to CSV file
143    #[cfg(feature = "export_csv")]
144    pub fn export_to_csv<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
145        use crate::schema::metric_keys::dsl::key;
146        use crate::schema::metrics::dsl::*;
147        use std::fs::File;
148        let out_file = File::create(path)?;
149        let mut csv_writer = csv::Writer::from_writer(out_file);
150        // join the 2 tables so we get a flat CSV with the actual key names
151        let query = crate::schema::metrics::table.inner_join(crate::schema::metric_keys::table);
152        let query = query
153            .order(timestamp.asc())
154            .select((id, timestamp, key, value));
155        for row in query.load::<JoinedMetric>(&mut self.db)? {
156            csv_writer.serialize(row)?;
157        }
158        csv_writer.flush()?;
159        Ok(())
160    }
161    /// Imports CSV file into a MetricsDb file
162    #[cfg(feature = "import_csv")]
163    pub fn import_from_csv<S: AsRef<Path>, D: AsRef<Path>>(path: S, destination: D) -> Result<()> {
164        use crate::InnerState;
165        use csv::ReaderBuilder;
166        let db = setup_db(destination)?;
167        let mut reader = ReaderBuilder::new().from_path(path)?;
168        let mut inner = InnerState::new(Duration::from_secs(5), db);
169        let header = reader.headers()?.to_owned();
170        let mut flush_counter = 0u64;
171        for record in reader.records() {
172            match record {
173                Ok(record) => match record.deserialize::<MetricCsvRow>(Some(&header)) {
174                    Ok(r) => {
175                        if let Err(e) =
176                            inner.queue_metric(Duration::from_secs_f64(r.timestamp), r.key, r.value)
177                        {
178                            error!(
179                                "Skipping record due to error recording metric into DB: {:?}",
180                                e
181                            );
182                        }
183                        flush_counter += 1;
184                    }
185                    Err(e) => {
186                        error!("Skipping record due to error parsing CSV row: {:?}", e);
187                    }
188                },
189                Err(e) => {
190                    error!("Skipping record due to error reading CSV record: {:?}", e);
191                }
192            }
193            if flush_counter.is_multiple_of(200) {
194                trace!("Flushing");
195                inner.flush()?;
196            }
197        }
198        inner.flush()?;
199        Ok(())
200    }
201}
202#[cfg(feature = "import_csv")]
203#[derive(Deserialize)]
204struct MetricCsvRow<'a> {
205    #[allow(unused)]
206    id: u64,
207    timestamp: f64,
208    key: &'a str,
209    value: f64,
210}
211/// Metric model for CSV export
212#[cfg(feature = "export_csv")]
213#[derive(Queryable, Debug)]
214#[cfg_attr(feature = "serde", derive(serde::Serialize))]
215struct JoinedMetric {
216    /// Unique ID of sample
217    pub id: i64,
218    /// Timestamp of sample
219    pub timestamp: f64,
220    /// Key/name of sample
221    pub key: String,
222    /// Value of sample
223    pub value: f64,
224}