metrics_sqlite/
metrics_db.rs1use super::{
3 models::{Metric, MetricKey},
4 setup_db, MetricsError, Result,
5};
6use diesel::prelude::*;
7#[cfg(feature = "import_csv")]
8use serde::Deserialize;
9use std::{path::Path, time::Duration};
10#[cfg(feature = "import_csv")]
11use tracing::{error, trace};
12
13const SESSION_TIME_GAP_THRESHOLD: Duration = Duration::from_secs(30);
15
16#[derive(Debug)]
18pub struct DerivMetric {
19 pub timestamp: f64,
20 pub key: String,
21 pub value: f64,
22}
23#[derive(Debug, Copy, Clone)]
25pub struct Session {
26 pub start_time: f64,
28 pub end_time: f64,
30 pub duration: Duration,
32}
33impl Session {
34 pub fn new(start_time: f64, end_time: f64) -> Self {
36 Session {
37 start_time,
38 end_time,
39 duration: Duration::from_secs_f64(end_time - start_time),
40 }
41 }
42}
43pub(crate) mod query;
44
45pub struct MetricsDb {
47 db: SqliteConnection,
48 sessions: Vec<Session>,
49}
50
51impl MetricsDb {
52 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
54 let mut db = setup_db(path)?;
55 let sessions = Self::process_sessions(&mut db)?;
56 Ok(MetricsDb { db, sessions })
57 }
58
59 pub fn sessions(&self) -> Vec<Session> {
61 self.sessions.clone()
62 }
63
64 pub fn session_from_signpost(&mut self, metric: &str) -> Result<Session> {
66 query::session_from_signpost(&mut self.db, metric)
67 }
68
69 fn process_sessions(db: &mut SqliteConnection) -> Result<Vec<Session>> {
70 use crate::schema::metrics::dsl::*;
71 let timestamps = metrics
72 .select(timestamp)
73 .order(timestamp.asc())
74 .load::<f64>(db)?;
75 if timestamps.is_empty() {
76 return Err(MetricsError::EmptyDatabase);
77 }
78 let mut sessions: Vec<Session> = Vec::new();
79 let mut current_start = timestamps[0];
80 for pair in timestamps.windows(2) {
81 if pair[1] - pair[0] > SESSION_TIME_GAP_THRESHOLD.as_secs_f64() {
82 sessions.push(Session::new(current_start, pair[0]));
83 current_start = pair[1];
84 }
85 }
86 if let Some(last) = timestamps.last() {
87 if current_start < *last {
88 sessions.push(Session::new(current_start, *last));
89 }
90 }
91
92 Ok(sessions)
93 }
94
95 pub fn available_keys(&mut self) -> Result<Vec<String>> {
97 use crate::schema::metric_keys::dsl::*;
98 let r = metric_keys
99 .select(key)
100 .distinct()
101 .load::<String>(&mut self.db)?;
102 Ok(r)
103 }
104
105 pub fn average_metrics_from_signpost(
109 &mut self,
110 signpost: &str,
111 keys: &[&str],
112 ) -> Result<std::collections::HashMap<String, f64>> {
113 query::metrics_summary_for_signpost_and_keys(
114 &mut self.db,
115 signpost,
116 keys.iter().map(|s| s.to_string()).collect(),
117 )
118 }
119
120 pub fn metrics_for_key(
122 &mut self,
123 key_name: &str,
124 session: Option<&Session>,
125 ) -> Result<Vec<Metric>> {
126 query::metrics_for_key(&mut self.db, key_name, session)
127 }
128
129 pub fn deriv_metrics_for_key(
133 &mut self,
134 key_name: &str,
135 session: Option<&Session>,
136 ) -> Result<Vec<DerivMetric>> {
137 let m = self.metrics_for_key(key_name, session)?;
138 let new_values: Vec<_> = m
139 .windows(2)
140 .map(|v| {
141 let dt = v[1].timestamp - v[0].timestamp;
142 let new_value = if dt > 0.0 {
143 (v[1].value - v[0].value) / dt
144 } else {
145 0.0
146 };
147 DerivMetric {
148 timestamp: v[1].timestamp,
149 key: format!("{key_name}.deriv"),
150 value: new_value,
151 }
152 })
153 .collect();
154 Ok(new_values)
155 }
156
157 #[cfg(feature = "export_csv")]
159 pub fn export_to_csv<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
160 use crate::schema::metric_keys::dsl::key;
161 use crate::schema::metrics::dsl::*;
162 use std::fs::File;
163 let out_file = File::create(path)?;
164 let mut csv_writer = csv::Writer::from_writer(out_file);
165 let query = crate::schema::metrics::table.inner_join(crate::schema::metric_keys::table);
167 let query = query
168 .order(timestamp.asc())
169 .select((id, timestamp, key, value));
170 for row in query.load::<JoinedMetric>(&mut self.db)? {
171 csv_writer.serialize(row)?;
172 }
173 csv_writer.flush()?;
174 Ok(())
175 }
176 #[cfg(feature = "import_csv")]
178 pub fn import_from_csv<S: AsRef<Path>, D: AsRef<Path>>(path: S, destination: D) -> Result<()> {
179 use crate::InnerState;
180 use csv::ReaderBuilder;
181 let db = setup_db(destination)?;
182 let mut reader = ReaderBuilder::new().from_path(path)?;
183 let mut inner = InnerState::new(Duration::from_secs(5), db);
184 let header = reader.headers()?.to_owned();
185 let mut flush_counter = 0u64;
186 for record in reader.records() {
187 match record {
188 Ok(record) => match record.deserialize::<MetricCsvRow>(Some(&header)) {
189 Ok(r) => {
190 if let Err(e) =
191 inner.queue_metric(Duration::from_secs_f64(r.timestamp), r.key, r.value)
192 {
193 error!(
194 "Skipping record due to error recording metric into DB: {:?}",
195 e
196 );
197 }
198 flush_counter += 1;
199 }
200 Err(e) => {
201 error!("Skipping record due to error parsing CSV row: {:?}", e);
202 }
203 },
204 Err(e) => {
205 error!("Skipping record due to error reading CSV record: {:?}", e);
206 }
207 }
208 if flush_counter.is_multiple_of(200) {
209 trace!("Flushing");
210 inner.flush()?;
211 }
212 }
213 inner.flush()?;
214 Ok(())
215 }
216}
217#[cfg(feature = "import_csv")]
218#[derive(Deserialize)]
219struct MetricCsvRow<'a> {
220 #[allow(unused)]
221 id: u64,
222 timestamp: f64,
223 key: &'a str,
224 value: f64,
225}
226#[cfg(feature = "export_csv")]
228#[derive(Queryable, Debug)]
229#[cfg_attr(feature = "serde", derive(serde::Serialize))]
230struct JoinedMetric {
231 pub id: i64,
233 pub timestamp: f64,
235 pub key: String,
237 pub value: f64,
239}