metrics_sqlite/
metrics_db.rs1use super::{
3 MetricsError, Result,
4 models::{Metric, MetricKey},
5 setup_db,
6};
7use diesel::prelude::*;
8#[cfg(feature = "import_csv")]
9use serde::Deserialize;
10use std::{path::Path, time::Duration};
11#[cfg(feature = "import_csv")]
12use tracing::{error, trace};
13
14const SESSION_TIME_GAP_THRESHOLD: Duration = Duration::from_secs(30);
16
17#[derive(Debug)]
19pub struct DerivMetric {
20 pub timestamp: f64,
21 pub key: String,
22 pub value: f64,
23}
24#[derive(Debug, Copy, Clone)]
26pub struct Session {
27 pub start_time: f64,
29 pub end_time: f64,
31 pub duration: Duration,
33}
34impl Session {
35 pub fn new(start_time: f64, end_time: f64) -> Self {
37 Session {
38 start_time,
39 end_time,
40 duration: Duration::from_secs_f64(end_time - start_time),
41 }
42 }
43}
44pub(crate) mod query;
45
46pub struct MetricsDb {
48 db: SqliteConnection,
49 sessions: Vec<Session>,
50}
51
52impl MetricsDb {
53 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
55 let mut db = setup_db(path)?;
56 let sessions = Self::process_sessions(&mut db)?;
57 Ok(MetricsDb { db, sessions })
58 }
59
60 pub fn sessions(&self) -> Vec<Session> {
62 self.sessions.clone()
63 }
64
65 pub fn session_from_signpost(&mut self, metric: &str) -> Result<Session> {
67 query::session_from_signpost(&mut self.db, metric)
68 }
69
70 fn process_sessions(db: &mut SqliteConnection) -> Result<Vec<Session>> {
71 use crate::schema::metrics::dsl::*;
72 let timestamps = metrics
73 .select(timestamp)
74 .order(timestamp.asc())
75 .load::<f64>(db)?;
76 if timestamps.is_empty() {
77 return Err(MetricsError::EmptyDatabase);
78 }
79 let mut sessions: Vec<Session> = Vec::new();
80 let mut current_start = timestamps[0];
81 for pair in timestamps.windows(2) {
82 if pair[1] - pair[0] > SESSION_TIME_GAP_THRESHOLD.as_secs_f64() {
83 sessions.push(Session::new(current_start, pair[0]));
84 current_start = pair[1];
85 }
86 }
87 if let Some(last) = timestamps.last() {
88 if current_start < *last {
89 sessions.push(Session::new(current_start, *last));
90 }
91 }
92
93 Ok(sessions)
94 }
95
96 pub fn available_keys(&mut self) -> Result<Vec<String>> {
98 use crate::schema::metric_keys::dsl::*;
99 let r = metric_keys
100 .select(key)
101 .distinct()
102 .load::<String>(&mut self.db)?;
103 Ok(r)
104 }
105
106 pub fn average_metrics_from_signpost(
110 &mut self,
111 signpost: &str,
112 keys: &[&str],
113 ) -> Result<std::collections::HashMap<String, f64>> {
114 query::metrics_summary_for_signpost_and_keys(
115 &mut self.db,
116 signpost,
117 keys.iter().map(|s| s.to_string()).collect(),
118 )
119 }
120
121 pub fn metrics_for_key(
123 &mut self,
124 key_name: &str,
125 session: Option<&Session>,
126 ) -> Result<Vec<Metric>> {
127 query::metrics_for_key(&mut self.db, key_name, session)
128 }
129
130 pub fn deriv_metrics_for_key(
134 &mut self,
135 key_name: &str,
136 session: Option<&Session>,
137 ) -> Result<Vec<DerivMetric>> {
138 let m = self.metrics_for_key(key_name, session)?;
139 let new_values: Vec<_> = m
140 .windows(2)
141 .map(|v| {
142 let dt = v[1].timestamp - v[0].timestamp;
143 let new_value = if dt > 0.0 {
144 (v[1].value - v[0].value) / dt
145 } else {
146 0.0
147 };
148 DerivMetric {
149 timestamp: v[1].timestamp,
150 key: format!("{key_name}.deriv"),
151 value: new_value,
152 }
153 })
154 .collect();
155 Ok(new_values)
156 }
157
158 #[cfg(feature = "export_csv")]
160 pub fn export_to_csv<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
161 use crate::schema::metric_keys::dsl::key;
162 use crate::schema::metrics::dsl::*;
163 use std::fs::File;
164 let out_file = File::create(path)?;
165 let mut csv_writer = csv::Writer::from_writer(out_file);
166 let query = crate::schema::metrics::table.inner_join(crate::schema::metric_keys::table);
168 let query = query
169 .order(timestamp.asc())
170 .select((id, timestamp, key, value));
171 for row in query.load::<JoinedMetric>(&mut self.db)? {
172 csv_writer.serialize(row)?;
173 }
174 csv_writer.flush()?;
175 Ok(())
176 }
177 #[cfg(feature = "import_csv")]
179 pub fn import_from_csv<S: AsRef<Path>, D: AsRef<Path>>(path: S, destination: D) -> Result<()> {
180 use crate::InnerState;
181 use csv::ReaderBuilder;
182 let db = setup_db(destination)?;
183 let mut reader = ReaderBuilder::new().from_path(path)?;
184 let mut inner = InnerState::new(Duration::from_secs(5), db);
185 let header = reader.headers()?.to_owned();
186 let mut flush_counter = 0u64;
187 for record in reader.records() {
188 match record {
189 Ok(record) => match record.deserialize::<MetricCsvRow>(Some(&header)) {
190 Ok(r) => {
191 if let Err(e) =
192 inner.queue_metric(Duration::from_secs_f64(r.timestamp), r.key, r.value)
193 {
194 error!(
195 "Skipping record due to error recording metric into DB: {:?}",
196 e
197 );
198 }
199 flush_counter += 1;
200 }
201 Err(e) => {
202 error!("Skipping record due to error parsing CSV row: {:?}", e);
203 }
204 },
205 Err(e) => {
206 error!("Skipping record due to error reading CSV record: {:?}", e);
207 }
208 }
209 if flush_counter % 200 == 0 {
210 trace!("Flushing");
211 inner.flush()?;
212 }
213 }
214 inner.flush()?;
215 Ok(())
216 }
217}
218#[cfg(feature = "import_csv")]
219#[derive(Deserialize)]
220struct MetricCsvRow<'a> {
221 #[allow(unused)]
222 id: u64,
223 timestamp: f64,
224 key: &'a str,
225 value: f64,
226}
227#[cfg(feature = "export_csv")]
229#[derive(Queryable, Debug)]
230#[cfg_attr(feature = "serde", derive(serde::Serialize))]
231struct JoinedMetric {
232 pub id: i64,
234 pub timestamp: f64,
236 pub key: String,
238 pub value: f64,
240}