1#![deny(missing_docs)]
2#[macro_use]
5extern crate diesel;
6#[macro_use]
7extern crate diesel_migrations;
8use tracing::{debug, error, info, trace, warn};
9
10use diesel::prelude::*;
11use diesel::{insert_into, sql_query};
12
13use metrics::{GaugeValue, Key, KeyName, SetRecorderError, SharedString, Unit};
14
15use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
16use std::sync::Arc;
17use std::{
18 collections::{HashMap, VecDeque},
19 path::Path,
20 sync::mpsc::{Receiver, RecvTimeoutError, SyncSender},
21 thread::{self, JoinHandle},
22 time::{Duration, Instant, SystemTime},
23};
24use thiserror::Error;
25
26const FLUSH_QUEUE_LIMIT: usize = 1000;
28const BACKGROUND_CHANNEL_LIMIT: usize = 8000;
29
30#[derive(Debug, Error)]
32pub enum MetricsError {
33 #[error("Database error: {0}")]
35 DbConnectionError(#[from] ConnectionError),
36 #[error("Migration error: {0}")]
38 MigrationError(Box<dyn std::error::Error + Send + Sync>),
39 #[error("Error querying DB: {0}")]
41 QueryError(#[from] diesel::result::Error),
42 #[error("Invalid database path")]
44 InvalidDatabasePath,
45 #[cfg(feature = "csv")]
47 #[error("IO Error: {0}")]
48 IoError(#[from] std::io::Error),
49 #[cfg(feature = "csv")]
51 #[error("CSV Error: {0}")]
52 CsvError(#[from] csv::Error),
53 #[error("Database has no metrics stored in it")]
55 EmptyDatabase,
56 #[error("Metric key {0} not found in database")]
58 KeyNotFound(String),
59 #[error("Exporter task has been stopped or crashed")]
61 ExporterUnavailable,
62 #[error("Session for signpost `{0}` has zero duration")]
64 ZeroLengthSession(String),
65 #[error("No metrics recorded for `{0}` in requested session")]
67 NoMetricsForKey(String),
68}
69pub type Result<T, E = MetricsError> = std::result::Result<T, E>;
71
72mod metrics_db;
73mod models;
74mod recorder;
75mod schema;
76
77use crate::metrics_db::query;
78use crate::recorder::Handle;
79pub use metrics_db::{MetricsDb, Session};
80pub use models::{Metric, MetricKey, NewMetric};
81
82pub(crate) const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
83
84fn setup_db<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
85 let url = path
86 .as_ref()
87 .to_str()
88 .ok_or(MetricsError::InvalidDatabasePath)?;
89 let mut db = SqliteConnection::establish(url)?;
90 db.run_pending_migrations(MIGRATIONS)
91 .map_err(MetricsError::MigrationError)?;
92
93 Ok(db)
94}
95enum RegisterType {
96 Counter,
97 Gauge,
98 Histogram,
99}
100
101enum Event {
102 Stop,
103 DescribeKey(RegisterType, KeyName, Option<Unit>, SharedString),
104 RegisterKey(RegisterType, Key, Arc<Handle>),
105 IncrementCounter(Duration, Key, u64),
106 AbsoluteCounter(Duration, Key, u64),
107 UpdateGauge(Duration, Key, GaugeValue),
108 UpdateHistogram(Duration, Key, f64),
109 SetHousekeeping {
110 retention_period: Option<Duration>,
111 housekeeping_period: Option<Duration>,
112 record_limit: Option<usize>,
113 },
114 RequestSummaryFromSignpost {
115 signpost_key: String,
116 keys: Vec<String>,
117 tx: tokio::sync::oneshot::Sender<Result<HashMap<String, f64>>>,
118 },
119}
120
121pub struct SqliteExporterHandle {
123 sender: SyncSender<Event>,
124}
125impl SqliteExporterHandle {
126 pub fn request_average_metrics(
128 &self,
129 from_signpost: &str,
130 with_keys: &[&str],
131 ) -> Result<HashMap<String, f64>> {
132 let (tx, rx) = tokio::sync::oneshot::channel();
133 self.sender
134 .send(Event::RequestSummaryFromSignpost {
135 signpost_key: from_signpost.to_string(),
136 keys: with_keys.iter().map(|s| s.to_string()).collect(),
137 tx,
138 })
139 .map_err(|_| MetricsError::ExporterUnavailable)?;
140 match rx.blocking_recv() {
141 Ok(metrics) => Ok(metrics?),
142 Err(_) => Err(MetricsError::ExporterUnavailable),
143 }
144 }
145}
146
147pub struct SqliteExporter {
149 thread: Option<JoinHandle<()>>,
150 sender: SyncSender<Event>,
151}
152struct InnerState {
153 db: SqliteConnection,
154 last_housekeeping: Instant,
155 housekeeping: Option<Duration>,
156 retention: Option<Duration>,
157 record_limit: Option<usize>,
158 flush_duration: Duration,
159 last_flush: Instant,
160 last_values: HashMap<Key, f64>,
161 counters: HashMap<Key, u64>,
162 key_ids: HashMap<String, i64>,
163 queue: VecDeque<NewMetric>,
164}
165impl InnerState {
166 fn new(flush_duration: Duration, db: SqliteConnection) -> Self {
167 InnerState {
168 db,
169 last_housekeeping: Instant::now(),
170 housekeeping: None,
171 retention: None,
172 record_limit: None,
173 flush_duration,
174 last_flush: Instant::now(),
175 last_values: HashMap::new(),
176 counters: HashMap::new(),
177 key_ids: HashMap::new(),
178 queue: VecDeque::with_capacity(FLUSH_QUEUE_LIMIT),
179 }
180 }
181 fn set_housekeeping(
182 &mut self,
183 retention: Option<Duration>,
184 housekeeping_duration: Option<Duration>,
185 record_limit: Option<usize>,
186 ) {
187 self.retention = retention;
188 self.housekeeping = housekeeping_duration;
189 self.last_housekeeping = Instant::now();
190 self.record_limit = record_limit;
191 }
192 fn should_housekeep(&self) -> bool {
193 match self.housekeeping {
194 Some(duration) => self.last_housekeeping.elapsed() > duration,
195 None => false,
196 }
197 }
198 fn housekeep(&mut self) -> Result<(), diesel::result::Error> {
199 SqliteExporter::housekeeping(&mut self.db, self.retention, self.record_limit, false);
200 self.last_housekeeping = Instant::now();
201 Ok(())
202 }
203 fn should_flush(&self) -> bool {
204 if self.last_flush.elapsed() > self.flush_duration {
205 debug!("Flushing due to {}s timeout", self.flush_duration.as_secs());
206 true
207 } else {
208 self.queue.len() >= FLUSH_QUEUE_LIMIT
209 }
210 }
211 fn flush(&mut self) -> Result<(), diesel::result::Error> {
212 use crate::schema::metrics::dsl::metrics;
213 let db = &mut self.db;
215 let queue = self.queue.drain(..);
216 db.transaction::<_, diesel::result::Error, _>(|db| {
217 for rec in queue {
218 insert_into(metrics).values(&rec).execute(db)?;
219 }
220 Ok(())
221 })?;
222 self.last_flush = Instant::now();
223 Ok(())
224 }
225 fn queue_metric(&mut self, timestamp: Duration, key: &str, value: f64) -> Result<()> {
226 let metric_key_id = match self.key_ids.get(key) {
227 Some(key) => *key,
228 None => {
229 debug!("Looking up {}", key);
230 let key_id = MetricKey::key_by_name(key, &mut self.db)?.id;
231 self.key_ids.insert(key.to_string(), key_id);
232 key_id
233 }
234 };
235 let metric = NewMetric {
236 timestamp: timestamp.as_secs_f64(),
237 metric_key_id,
238 value: value as _,
239 };
240 self.queue.push_back(metric);
241 Ok(())
242 }
243
244 pub fn metrics_summary_for_signpost_and_keys(
247 &mut self,
248 signpost: String,
249 metrics: Vec<String>,
250 ) -> Result<HashMap<String, f64>> {
251 query::metrics_summary_for_signpost_and_keys(&mut self.db, &signpost, metrics)
252 }
253}
254
255fn run_worker(
256 db: SqliteConnection,
257 receiver: Receiver<Event>,
258 flush_duration: Duration,
259) -> JoinHandle<()> {
260 thread::Builder::new()
261 .name("metrics-sqlite: worker".to_string())
262 .spawn(move || {
263 let mut state = InnerState::new(flush_duration, db);
264 info!("SQLite worker started");
265 loop {
266 let (should_flush, should_exit) = match receiver.recv_timeout(flush_duration) {
267 Ok(Event::Stop) => {
268 info!("Stopping SQLiteExporter worker, flushing & exiting");
269 (true, true)
270 }
271 Ok(Event::SetHousekeeping {
272 retention_period,
273 housekeeping_period,
274 record_limit,
275 }) => {
276 state.set_housekeeping(retention_period, housekeeping_period, record_limit);
277 (false, false)
278 }
279 Ok(Event::DescribeKey(_key_type, key, unit, desc)) => {
280 info!("Describing key {:?}", key);
281 if let Err(e) = MetricKey::create_or_update(
282 key.as_str(),
283 unit,
284 Some(desc.as_ref()),
285 &mut state.db,
286 ) {
287 error!("Failed to create key entry: {:?}", e);
288 }
289 (false, false)
290 }
291 Ok(Event::RegisterKey(_key_type, _key, _handle)) => {
292 (false, false)
294 }
295 Ok(Event::IncrementCounter(timestamp, key, value)) => {
296 let key_str = key.name().to_string();
297 let entry = state.counters.entry(key).or_insert(0);
298 let value = {
299 *entry += value;
300 *entry
301 };
302 if let Err(e) = state.queue_metric(timestamp, &key_str, value as _) {
303 error!("Error queueing metric: {:?}", e);
304 }
305
306 (state.should_flush(), false)
307 }
308 Ok(Event::AbsoluteCounter(timestamp, key, value)) => {
309 let key_str = key.name().to_string();
310 state.counters.insert(key, value);
311 if let Err(e) = state.queue_metric(timestamp, &key_str, value as _) {
312 error!("Error queueing metric: {:?}", e);
313 }
314 (state.should_flush(), false)
315 }
316 Ok(Event::UpdateGauge(timestamp, key, value)) => {
317 let key_str = key.name().to_string();
318 let entry = state.last_values.entry(key).or_insert(0.0);
319 let value = match value {
320 GaugeValue::Absolute(v) => {
321 *entry = v;
322 *entry
323 }
324 GaugeValue::Increment(v) => {
325 *entry += v;
326 *entry
327 }
328 GaugeValue::Decrement(v) => {
329 *entry -= v;
330 *entry
331 }
332 };
333 if let Err(e) = state.queue_metric(timestamp, &key_str, value) {
334 error!("Error queueing metric: {:?}", e);
335 }
336 (state.should_flush(), false)
337 }
338 Ok(Event::UpdateHistogram(timestamp, key, value)) => {
339 let key_str = key.name().to_string();
340 if let Err(e) = state.queue_metric(timestamp, &key_str, value) {
341 error!("Error queueing metric: {:?}", e);
342 }
343
344 (state.should_flush(), false)
345 }
346 Ok(Event::RequestSummaryFromSignpost {
347 signpost_key,
348 keys,
349 tx,
350 }) => {
351 match state.flush() {
352 Ok(()) => match state
353 .metrics_summary_for_signpost_and_keys(signpost_key, keys)
354 {
355 Ok(metrics) => {
356 if tx.send(Ok(metrics)).is_err() {
357 error!(
358 "Failed to respond with metrics results, discarding"
359 );
360 }
361 }
362 Err(e) => {
363 if let Err(e) = tx.send(Err(e)) {
364 error!(
365 "Failed to respond with metrics error result, discarding: {e:?}"
366 );
367 }
368 }
369 },
370 Err(e) => {
371 let err = MetricsError::from(e);
372 error!(
373 "Failed to flush pending metrics before summary request: {err:?}"
374 );
375 if let Err(send_err) = tx.send(Err(err)) {
376 error!(
377 "Failed to respond with metrics flush error result, discarding: {send_err:?}"
378 );
379 }
380 }
381 }
382 (false, false)
383 }
384 Err(RecvTimeoutError::Timeout) => {
385 debug!("Flushing due to {}s timeout", flush_duration.as_secs());
386 (true, false)
387 }
388 Err(RecvTimeoutError::Disconnected) => {
389 warn!("SQLiteExporter channel disconnected, exiting worker");
390 (true, true)
391 }
392 };
393 if should_flush {
394 if let Err(e) = state.flush() {
395 error!("Error flushing metrics: {}", e);
396 }
397 }
398 if state.should_housekeep() {
399 if let Err(e) = state.housekeep() {
400 error!("Failed running house keeping: {:?}", e);
401 }
402 }
403 if should_exit {
404 break;
405 }
406 }
407 })
408 .unwrap()
409}
410
411impl SqliteExporter {
412 pub fn new<P: AsRef<Path>>(
418 flush_interval: Duration,
419 keep_duration: Option<Duration>,
420 path: P,
421 ) -> Result<Self> {
422 let mut db = setup_db(path)?;
423 Self::housekeeping(&mut db, keep_duration, None, true);
424 let (sender, receiver) = std::sync::mpsc::sync_channel(BACKGROUND_CHANNEL_LIMIT);
425 let thread = run_worker(db, receiver, flush_interval);
426 let exporter = SqliteExporter {
427 thread: Some(thread),
428 sender,
429 };
430 Ok(exporter)
431 }
432
433 pub fn set_periodic_housekeeping(
438 &self,
439 periodic_duration: Option<Duration>,
440 retention: Option<Duration>,
441 record_limit: Option<usize>,
442 ) {
443 if let Err(e) = self.sender.send(Event::SetHousekeeping {
444 retention_period: retention,
445 housekeeping_period: periodic_duration,
446 record_limit,
447 }) {
448 error!("Failed to set house keeping settings: {:?}", e);
449 }
450 }
451
452 fn housekeeping(
456 db: &mut SqliteConnection,
457 keep_duration: Option<Duration>,
458 record_limit: Option<usize>,
459 vacuum: bool,
460 ) {
461 use crate::schema::metrics::dsl::*;
462 use diesel::dsl::count;
463 if let Some(keep_duration) = keep_duration {
464 match SystemTime::UNIX_EPOCH.elapsed() {
465 Ok(now) => {
466 let cutoff = now - keep_duration;
467 trace!("Deleting data {}s old", keep_duration.as_secs());
468 if let Err(e) =
469 diesel::delete(metrics.filter(timestamp.le(cutoff.as_secs_f64())))
470 .execute(db)
471 {
472 error!("Failed to remove old metrics data: {}", e);
473 }
474 if vacuum {
475 if let Err(e) = sql_query("VACUUM").execute(db) {
476 error!("Failed to vacuum SQLite DB: {:?}", e);
477 }
478 }
479 }
480 Err(e) => {
481 error!(
482 "System time error, skipping metrics-sqlite housekeeping: {}",
483 e
484 );
485 }
486 }
487 }
488 if let Some(record_limit) = record_limit {
489 trace!("Checking for records over {} limit", record_limit);
490 match metrics.select(count(id)).first::<i64>(db) {
491 Ok(records) => {
492 let records = records as usize;
493 if records > record_limit {
494 let excess = records - record_limit + (record_limit / 4); trace!(
496 "Exceeded limit! {} > {}, deleting {} oldest",
497 records,
498 record_limit,
499 excess
500 );
501 let query = format!("DELETE FROM metrics WHERE id IN (SELECT id FROM metrics ORDER BY timestamp ASC LIMIT {excess});");
502 if let Err(e) = sql_query(query).execute(db) {
503 error!("Failed to delete excessive records: {:?}", e);
504 }
505 }
506 }
507 Err(e) => {
508 error!("Failed to get record count: {:?}", e);
509 }
510 }
511 }
512 }
513
514 pub fn install(self) -> Result<SqliteExporterHandle, SetRecorderError<Self>> {
516 let handle = SqliteExporterHandle {
517 sender: self.sender.clone(),
518 };
519 metrics::set_global_recorder(self)?;
520 Ok(handle)
521 }
522}
523impl Drop for SqliteExporter {
524 fn drop(&mut self) {
525 let _ = self.sender.send(Event::Stop);
526 let _ = self.thread.take().unwrap().join();
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use crate::SqliteExporter;
533 use std::time::{Duration, Instant};
534
535 #[test]
536 fn test_threading() {
537 use std::thread;
538 SqliteExporter::new(Duration::from_millis(500), None, "metrics.db")
539 .unwrap()
540 .install()
541 .unwrap();
542 let joins: Vec<thread::JoinHandle<()>> = (0..5)
543 .map(|_| {
544 thread::spawn(move || {
545 let start = Instant::now();
546 loop {
547 metrics::gauge!("rate").set(1.0);
548 metrics::counter!("hits").increment(1);
549 metrics::histogram!("histogram").record(5.0);
550 if start.elapsed().as_secs() >= 5 {
551 break;
552 }
553 }
554 })
555 })
556 .collect();
557 for j in joins {
558 j.join().unwrap();
559 }
560 }
561}