metrics_sqlite/
metrics_db.rs1use super::{models::Metric, setup_db, Result};
3use crate::models::MetricKey;
4use crate::MetricsError;
5use diesel::prelude::*;
6#[cfg(feature = "import_csv")]
7use serde::Deserialize;
8use std::path::Path;
9use std::time::Duration;
10
11const SESSION_TIME_GAP_THRESHOLD: Duration = Duration::from_secs(30);
13
14#[derive(Debug)]
16pub struct DerivMetric {
17 pub timestamp: f64,
18 pub key: String,
19 pub value: f64,
20}
21#[derive(Debug, Copy, Clone)]
23pub struct Session {
24 pub start_time: f64,
26 pub end_time: f64,
28 pub duration: Duration,
30}
31impl Session {
32 pub fn new(start_time: f64, end_time: f64) -> Self {
34 Session {
35 start_time,
36 end_time,
37 duration: Duration::from_secs_f64(end_time - start_time),
38 }
39 }
40}
41pub struct MetricsDb {
43 db: SqliteConnection,
44 sessions: Vec<Session>,
45}
46
47impl MetricsDb {
48 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
50 let mut db = setup_db(path)?;
51 let sessions = Self::process_sessions(&mut db)?;
52 Ok(MetricsDb { db, sessions })
53 }
54
55 pub fn sessions(&self) -> Vec<Session> {
57 self.sessions.clone()
58 }
59
60 fn process_sessions(db: &mut SqliteConnection) -> Result<Vec<Session>> {
61 use crate::schema::metrics::dsl::*;
62 let timestamps = metrics
63 .select(timestamp)
64 .order(timestamp.asc())
65 .load::<f64>(db)?;
66 if timestamps.is_empty() {
67 return Err(MetricsError::EmptyDatabase);
68 }
69 let mut sessions: Vec<Session> = Vec::new();
70 let mut current_start = timestamps[0];
71 for pair in timestamps.windows(2) {
72 if pair[1] - pair[0] > SESSION_TIME_GAP_THRESHOLD.as_secs_f64() {
73 sessions.push(Session::new(current_start, pair[0]));
74 current_start = pair[1];
75 }
76 }
77 if let Some(last) = timestamps.last() {
78 if current_start < *last {
79 sessions.push(Session::new(current_start, *last));
80 }
81 }
82
83 Ok(sessions)
84 }
85
86 pub fn available_keys(&mut self) -> Result<Vec<String>> {
88 use crate::schema::metric_keys::dsl::*;
89 let r = metric_keys
90 .select(key)
91 .distinct()
92 .load::<String>(&mut self.db)?;
93 Ok(r)
94 }
95
96 pub fn metrics_for_key(
98 &mut self,
99 key_name: &str,
100 session: Option<&Session>,
101 ) -> Result<Vec<Metric>> {
102 use crate::schema::metrics::dsl::*;
103 let metric_key = self.metric_key_for_key(key_name)?;
104 let query = metrics
105 .order(timestamp.asc())
106 .filter(metric_key_id.eq(metric_key.id));
107 let r = match session {
108 Some(session) => query
109 .filter(timestamp.ge(session.start_time))
110 .filter(timestamp.le(session.end_time))
111 .load::<Metric>(&mut self.db)?,
112 None => query.load::<Metric>(&mut self.db)?,
113 };
114 Ok(r)
115 }
116
117 fn metric_key_for_key(&mut self, key_name: &str) -> Result<MetricKey> {
118 use crate::schema::metric_keys::dsl::*;
119 let query = metric_keys.filter(key.eq(key_name));
120 let keys = query.load::<MetricKey>(&mut self.db)?;
121 keys.into_iter()
122 .next()
123 .ok_or_else(|| MetricsError::KeyNotFound(key_name.to_string()))
124 }
125
126 pub fn deriv_metrics_for_key(
130 &mut self,
131 key_name: &str,
132 session: Option<&Session>,
133 ) -> Result<Vec<DerivMetric>> {
134 let m = self.metrics_for_key(key_name, session)?;
135 let new_values: Vec<_> = m
136 .windows(2)
137 .map(|v| {
138 let new_value =
139 (v[1].value - v[0].value) / (v[1].timestamp - v[0].timestamp);
140 DerivMetric {
141 timestamp: v[1].timestamp,
142 key: format!("{}.deriv", key_name),
143 value: new_value,
144 }
145 })
146 .collect();
147 Ok(new_values)
148 }
149
150 #[cfg(feature = "export_csv")]
152 pub fn export_to_csv<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
153 use crate::schema::metric_keys::dsl::key;
154 use crate::schema::metrics::dsl::*;
155 use std::fs::File;
156 let out_file = File::create(path)?;
157 let mut csv_writer = csv::Writer::from_writer(out_file);
158 let query = crate::schema::metrics::table.inner_join(crate::schema::metric_keys::table);
160 let query = query
161 .order(timestamp.asc())
162 .select((id, timestamp, key, value));
163 for row in query.load::<JoinedMetric>(&mut self.db)? {
164 csv_writer.serialize(row)?;
165 }
166 csv_writer.flush()?;
167 Ok(())
168 }
169 #[cfg(feature = "import_csv")]
171 pub fn import_from_csv<S: AsRef<Path>, D: AsRef<Path>>(path: S, destination: D) -> Result<()> {
172 use crate::InnerState;
173 use csv::ReaderBuilder;
174 let db = setup_db(destination)?;
175 let mut reader = ReaderBuilder::new().from_path(path)?;
176 let mut inner = InnerState::new(Duration::from_secs(5), db);
177 let header = reader.headers()?.to_owned();
178 let mut flush_counter = 0u64;
179 for record in reader.records() {
180 match record {
181 Ok(record) => match record.deserialize::<MetricCsvRow>(Some(&header)) {
182 Ok(r) => {
183 if let Err(e) =
184 inner.queue_metric(Duration::from_secs_f64(r.timestamp), r.key, r.value)
185 {
186 error!(
187 "Skipping record due to error recording metric into DB: {:?}",
188 e
189 );
190 }
191 flush_counter += 1;
192 }
193 Err(e) => {
194 error!("Skipping record due to error parsing CSV row: {:?}", e);
195 }
196 },
197 Err(e) => {
198 error!("Skipping record due to error reading CSV record: {:?}", e);
199 }
200 }
201 if flush_counter % 200 == 0 {
202 trace!("Flushing");
203 inner.flush()?;
204 }
205 }
206 inner.flush()?;
207 Ok(())
208 }
209}
210#[cfg(feature = "import_csv")]
211#[derive(Deserialize)]
212struct MetricCsvRow<'a> {
213 #[allow(unused)]
214 id: u64,
215 timestamp: f64,
216 key: &'a str,
217 value: f64,
218}
219#[cfg(feature = "export_csv")]
221#[derive(Queryable, Debug)]
222#[cfg_attr(feature = "serde", derive(serde::Serialize))]
223struct JoinedMetric {
224 pub id: i64,
226 pub timestamp: f64,
228 pub key: String,
230 pub value: f64,
232}