Skip to main content

metrics_sqlite/
metrics_db.rs

1//! Metrics DB, to use/query/etc metrics SQLite databases
2use super::{
3    MetricsError, Result,
4    models::{Metric, MetricKey},
5    setup_db,
6};
7use diesel::prelude::*;
8#[cfg(feature = "import_csv")]
9use serde::Deserialize;
10use std::{path::Path, time::Duration};
11#[cfg(feature = "import_csv")]
12use tracing::{error, trace};
13
14/// Threshold to separate samples into sessions by
15const SESSION_TIME_GAP_THRESHOLD: Duration = Duration::from_secs(30);
16
17/// Calculated metric type from deriv_metrics_for_key()
18#[derive(Debug)]
19pub struct DerivMetric {
20    pub timestamp: f64,
21    pub key: String,
22    pub value: f64,
23}
24/// Describes a session, which is a subset of metrics data based on time gaps
25#[derive(Debug, Copy, Clone)]
26pub struct Session {
27    /// Timestamp session starts at
28    pub start_time: f64,
29    /// Timestamp session ends at
30    pub end_time: f64,
31    /// Duration of session
32    pub duration: Duration,
33}
34impl Session {
35    /// Creates a new session with given start and end, calculating duration from them
36    pub fn new(start_time: f64, end_time: f64) -> Self {
37        Session {
38            start_time,
39            end_time,
40            duration: Duration::from_secs_f64(end_time - start_time),
41        }
42    }
43}
44pub(crate) mod query;
45
46/// Metrics database, useful for querying stored metrics
47pub struct MetricsDb {
48    db: SqliteConnection,
49    sessions: Vec<Session>,
50}
51
52impl MetricsDb {
53    /// Creates a new metrics DB with a given path of an SQLite database
54    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
55        let mut db = setup_db(path)?;
56        let sessions = Self::process_sessions(&mut db)?;
57        Ok(MetricsDb { db, sessions })
58    }
59
60    /// Returns sessions in the database, based on `SESSION_TIME_GAP_THRESHOLD`
61    pub fn sessions(&self) -> Vec<Session> {
62        self.sessions.clone()
63    }
64
65    /// Returns a session (timestamp range) from the last occurrence of the signpost to the latest metric
66    pub fn session_from_signpost(&mut self, metric: &str) -> Result<Session> {
67        query::session_from_signpost(&mut self.db, metric)
68    }
69
70    fn process_sessions(db: &mut SqliteConnection) -> Result<Vec<Session>> {
71        use crate::schema::metrics::dsl::*;
72        let timestamps = metrics
73            .select(timestamp)
74            .order(timestamp.asc())
75            .load::<f64>(db)?;
76        if timestamps.is_empty() {
77            return Err(MetricsError::EmptyDatabase);
78        }
79        let mut sessions: Vec<Session> = Vec::new();
80        let mut current_start = timestamps[0];
81        for pair in timestamps.windows(2) {
82            if pair[1] - pair[0] > SESSION_TIME_GAP_THRESHOLD.as_secs_f64() {
83                sessions.push(Session::new(current_start, pair[0]));
84                current_start = pair[1];
85            }
86        }
87        if let Some(last) = timestamps.last() {
88            if current_start < *last {
89                sessions.push(Session::new(current_start, *last));
90            }
91        }
92
93        Ok(sessions)
94    }
95
96    /// Returns list of metrics keys stored in the database
97    pub fn available_keys(&mut self) -> Result<Vec<String>> {
98        use crate::schema::metric_keys::dsl::*;
99        let r = metric_keys
100            .select(key)
101            .distinct()
102            .load::<String>(&mut self.db)?;
103        Ok(r)
104    }
105
106    /// Returns average values for the given metric keys within the session defined by a signpost.
107    ///
108    /// Also includes `session.duration` in the results.
109    pub fn average_metrics_from_signpost(
110        &mut self,
111        signpost: &str,
112        keys: &[&str],
113    ) -> Result<std::collections::HashMap<String, f64>> {
114        query::metrics_summary_for_signpost_and_keys(
115            &mut self.db,
116            signpost,
117            keys.iter().map(|s| s.to_string()).collect(),
118        )
119    }
120
121    /// Returns all metrics for given key in ascending timestamp order
122    pub fn metrics_for_key(
123        &mut self,
124        key_name: &str,
125        session: Option<&Session>,
126    ) -> Result<Vec<Metric>> {
127        query::metrics_for_key(&mut self.db, key_name, session)
128    }
129
130    /// Returns rate of change, the derivative, of the given metrics key's values
131    ///
132    /// f(t) = (x(t + 1) - x(t)) / ((t+1) - (t)
133    pub fn deriv_metrics_for_key(
134        &mut self,
135        key_name: &str,
136        session: Option<&Session>,
137    ) -> Result<Vec<DerivMetric>> {
138        let m = self.metrics_for_key(key_name, session)?;
139        let new_values: Vec<_> = m
140            .windows(2)
141            .map(|v| {
142                let dt = v[1].timestamp - v[0].timestamp;
143                let new_value = if dt > 0.0 {
144                    (v[1].value - v[0].value) / dt
145                } else {
146                    0.0
147                };
148                DerivMetric {
149                    timestamp: v[1].timestamp,
150                    key: format!("{key_name}.deriv"),
151                    value: new_value,
152                }
153            })
154            .collect();
155        Ok(new_values)
156    }
157
158    /// Exports DB contents to CSV file
159    #[cfg(feature = "export_csv")]
160    pub fn export_to_csv<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
161        use crate::schema::metric_keys::dsl::key;
162        use crate::schema::metrics::dsl::*;
163        use std::fs::File;
164        let out_file = File::create(path)?;
165        let mut csv_writer = csv::Writer::from_writer(out_file);
166        // join the 2 tables so we get a flat CSV with the actual key names
167        let query = crate::schema::metrics::table.inner_join(crate::schema::metric_keys::table);
168        let query = query
169            .order(timestamp.asc())
170            .select((id, timestamp, key, value));
171        for row in query.load::<JoinedMetric>(&mut self.db)? {
172            csv_writer.serialize(row)?;
173        }
174        csv_writer.flush()?;
175        Ok(())
176    }
177    /// Imports CSV file into a MetricsDb file
178    #[cfg(feature = "import_csv")]
179    pub fn import_from_csv<S: AsRef<Path>, D: AsRef<Path>>(path: S, destination: D) -> Result<()> {
180        use crate::InnerState;
181        use csv::ReaderBuilder;
182        let db = setup_db(destination)?;
183        let mut reader = ReaderBuilder::new().from_path(path)?;
184        let mut inner = InnerState::new(Duration::from_secs(5), db);
185        let header = reader.headers()?.to_owned();
186        let mut flush_counter = 0u64;
187        for record in reader.records() {
188            match record {
189                Ok(record) => match record.deserialize::<MetricCsvRow>(Some(&header)) {
190                    Ok(r) => {
191                        if let Err(e) =
192                            inner.queue_metric(Duration::from_secs_f64(r.timestamp), r.key, r.value)
193                        {
194                            error!(
195                                "Skipping record due to error recording metric into DB: {:?}",
196                                e
197                            );
198                        }
199                        flush_counter += 1;
200                    }
201                    Err(e) => {
202                        error!("Skipping record due to error parsing CSV row: {:?}", e);
203                    }
204                },
205                Err(e) => {
206                    error!("Skipping record due to error reading CSV record: {:?}", e);
207                }
208            }
209            if flush_counter % 200 == 0 {
210                trace!("Flushing");
211                inner.flush()?;
212            }
213        }
214        inner.flush()?;
215        Ok(())
216    }
217}
218#[cfg(feature = "import_csv")]
219#[derive(Deserialize)]
220struct MetricCsvRow<'a> {
221    #[allow(unused)]
222    id: u64,
223    timestamp: f64,
224    key: &'a str,
225    value: f64,
226}
227/// Metric model for CSV export
228#[cfg(feature = "export_csv")]
229#[derive(Queryable, Debug)]
230#[cfg_attr(feature = "serde", derive(serde::Serialize))]
231struct JoinedMetric {
232    /// Unique ID of sample
233    pub id: i64,
234    /// Timestamp of sample
235    pub timestamp: f64,
236    /// Key/name of sample
237    pub key: String,
238    /// Value of sample
239    pub value: f64,
240}