metrics_sqlite/
metrics_db.rs1use super::{
3 models::{Metric, MetricKey},
4 setup_db, MetricsError, Result,
5};
6use diesel::prelude::*;
7#[cfg(feature = "import_csv")]
8use serde::Deserialize;
9use std::{path::Path, time::Duration};
10#[cfg(feature = "import_csv")]
11use tracing::{error, trace};
12
13const SESSION_TIME_GAP_THRESHOLD: Duration = Duration::from_secs(30);
15
16#[derive(Debug)]
18pub struct DerivMetric {
19 pub timestamp: f64,
20 pub key: String,
21 pub value: f64,
22}
23#[derive(Debug, Copy, Clone)]
25pub struct Session {
26 pub start_time: f64,
28 pub end_time: f64,
30 pub duration: Duration,
32}
33impl Session {
34 pub fn new(start_time: f64, end_time: f64) -> Self {
36 Session {
37 start_time,
38 end_time,
39 duration: Duration::from_secs_f64(end_time - start_time),
40 }
41 }
42}
43pub(crate) mod query;
44
45pub struct MetricsDb {
47 db: SqliteConnection,
48 sessions: Vec<Session>,
49}
50
51impl MetricsDb {
52 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
54 let mut db = setup_db(path)?;
55 let sessions = Self::process_sessions(&mut db)?;
56 Ok(MetricsDb { db, sessions })
57 }
58
59 pub fn sessions(&self) -> Vec<Session> {
61 self.sessions.clone()
62 }
63
64 pub fn session_from_signpost(&mut self, metric: &str) -> Result<Session> {
66 query::session_from_signpost(&mut self.db, metric)
67 }
68
69 fn process_sessions(db: &mut SqliteConnection) -> Result<Vec<Session>> {
70 use crate::schema::metrics::dsl::*;
71 let timestamps = metrics
72 .select(timestamp)
73 .order(timestamp.asc())
74 .load::<f64>(db)?;
75 if timestamps.is_empty() {
76 return Err(MetricsError::EmptyDatabase);
77 }
78 let mut sessions: Vec<Session> = Vec::new();
79 let mut current_start = timestamps[0];
80 for pair in timestamps.windows(2) {
81 if pair[1] - pair[0] > SESSION_TIME_GAP_THRESHOLD.as_secs_f64() {
82 sessions.push(Session::new(current_start, pair[0]));
83 current_start = pair[1];
84 }
85 }
86 if let Some(last) = timestamps.last() {
87 if current_start < *last {
88 sessions.push(Session::new(current_start, *last));
89 }
90 }
91
92 Ok(sessions)
93 }
94
95 pub fn available_keys(&mut self) -> Result<Vec<String>> {
97 use crate::schema::metric_keys::dsl::*;
98 let r = metric_keys
99 .select(key)
100 .distinct()
101 .load::<String>(&mut self.db)?;
102 Ok(r)
103 }
104
105 pub fn metrics_for_key(
107 &mut self,
108 key_name: &str,
109 session: Option<&Session>,
110 ) -> Result<Vec<Metric>> {
111 query::metrics_for_key(&mut self.db, key_name, session)
112 }
113
114 pub fn deriv_metrics_for_key(
118 &mut self,
119 key_name: &str,
120 session: Option<&Session>,
121 ) -> Result<Vec<DerivMetric>> {
122 let m = self.metrics_for_key(key_name, session)?;
123 let new_values: Vec<_> = m
124 .windows(2)
125 .map(|v| {
126 let dt = v[1].timestamp - v[0].timestamp;
127 let new_value = if dt > 0.0 {
128 (v[1].value - v[0].value) / dt
129 } else {
130 0.0
131 };
132 DerivMetric {
133 timestamp: v[1].timestamp,
134 key: format!("{key_name}.deriv"),
135 value: new_value,
136 }
137 })
138 .collect();
139 Ok(new_values)
140 }
141
142 #[cfg(feature = "export_csv")]
144 pub fn export_to_csv<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
145 use crate::schema::metric_keys::dsl::key;
146 use crate::schema::metrics::dsl::*;
147 use std::fs::File;
148 let out_file = File::create(path)?;
149 let mut csv_writer = csv::Writer::from_writer(out_file);
150 let query = crate::schema::metrics::table.inner_join(crate::schema::metric_keys::table);
152 let query = query
153 .order(timestamp.asc())
154 .select((id, timestamp, key, value));
155 for row in query.load::<JoinedMetric>(&mut self.db)? {
156 csv_writer.serialize(row)?;
157 }
158 csv_writer.flush()?;
159 Ok(())
160 }
161 #[cfg(feature = "import_csv")]
163 pub fn import_from_csv<S: AsRef<Path>, D: AsRef<Path>>(path: S, destination: D) -> Result<()> {
164 use crate::InnerState;
165 use csv::ReaderBuilder;
166 let db = setup_db(destination)?;
167 let mut reader = ReaderBuilder::new().from_path(path)?;
168 let mut inner = InnerState::new(Duration::from_secs(5), db);
169 let header = reader.headers()?.to_owned();
170 let mut flush_counter = 0u64;
171 for record in reader.records() {
172 match record {
173 Ok(record) => match record.deserialize::<MetricCsvRow>(Some(&header)) {
174 Ok(r) => {
175 if let Err(e) =
176 inner.queue_metric(Duration::from_secs_f64(r.timestamp), r.key, r.value)
177 {
178 error!(
179 "Skipping record due to error recording metric into DB: {:?}",
180 e
181 );
182 }
183 flush_counter += 1;
184 }
185 Err(e) => {
186 error!("Skipping record due to error parsing CSV row: {:?}", e);
187 }
188 },
189 Err(e) => {
190 error!("Skipping record due to error reading CSV record: {:?}", e);
191 }
192 }
193 if flush_counter.is_multiple_of(200) {
194 trace!("Flushing");
195 inner.flush()?;
196 }
197 }
198 inner.flush()?;
199 Ok(())
200 }
201}
202#[cfg(feature = "import_csv")]
203#[derive(Deserialize)]
204struct MetricCsvRow<'a> {
205 #[allow(unused)]
206 id: u64,
207 timestamp: f64,
208 key: &'a str,
209 value: f64,
210}
211#[cfg(feature = "export_csv")]
213#[derive(Queryable, Debug)]
214#[cfg_attr(feature = "serde", derive(serde::Serialize))]
215struct JoinedMetric {
216 pub id: i64,
218 pub timestamp: f64,
220 pub key: String,
222 pub value: f64,
224}