metrics_sqlite/
lib.rs

1#![deny(missing_docs)]
2//! # Metrics SQLite backend
3
4#[macro_use]
5extern crate diesel;
6#[macro_use]
7extern crate diesel_migrations;
8use tracing::{debug, error, info, trace, warn};
9
10use diesel::prelude::*;
11use diesel::{insert_into, sql_query};
12
13use metrics::{GaugeValue, Key, KeyName, SetRecorderError, SharedString, Unit};
14
15use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
16use std::sync::Arc;
17use std::{
18    collections::{HashMap, VecDeque},
19    path::Path,
20    sync::mpsc::{Receiver, RecvTimeoutError, SyncSender},
21    thread::{self, JoinHandle},
22    time::{Duration, Instant, SystemTime},
23};
24use thiserror::Error;
25
26/// Max number of items allowed in worker's queue before flushing regardless of flush duration
27const FLUSH_QUEUE_LIMIT: usize = 1000;
28const BACKGROUND_CHANNEL_LIMIT: usize = 8000;
29
30/// Error type for any db/vitals related errors
31#[derive(Debug, Error)]
32pub enum MetricsError {
33    /// Error with database
34    #[error("Database error: {0}")]
35    DbConnectionError(#[from] ConnectionError),
36    /// Error migrating database
37    #[error("Migration error: {0}")]
38    MigrationError(Box<dyn std::error::Error + Send + Sync>),
39    /// Error querying metrics DB
40    #[error("Error querying DB: {0}")]
41    QueryError(#[from] diesel::result::Error),
42    /// Error if the path given is invalid
43    #[error("Invalid database path")]
44    InvalidDatabasePath,
45    /// IO Error with reader/writer
46    #[cfg(feature = "csv")]
47    #[error("IO Error: {0}")]
48    IoError(#[from] std::io::Error),
49    /// Error writing CSV
50    #[cfg(feature = "csv")]
51    #[error("CSV Error: {0}")]
52    CsvError(#[from] csv::Error),
53    /// Attempted to query the database but found no records
54    #[error("Database has no metrics stored in it")]
55    EmptyDatabase,
56    /// Given metric key name wasn't found in the DB
57    #[error("Metric key {0} not found in database")]
58    KeyNotFound(String),
59    /// Attempting to communicate with exporter but it's gone away
60    #[error("Exporter task has been stopped or crashed")]
61    ExporterUnavailable,
62    /// Session derived from the signpost has zero duration
63    #[error("Session for signpost `{0}` has zero duration")]
64    ZeroLengthSession(String),
65    /// No metrics available for the requested key inside the derived session
66    #[error("No metrics recorded for `{0}` in requested session")]
67    NoMetricsForKey(String),
68}
69/// Metrics result type
70pub type Result<T, E = MetricsError> = std::result::Result<T, E>;
71
72mod metrics_db;
73mod models;
74mod recorder;
75mod schema;
76
77use crate::metrics_db::query;
78use crate::recorder::Handle;
79pub use metrics_db::{MetricsDb, Session};
80pub use models::{Metric, MetricKey, NewMetric};
81
82pub(crate) const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
83
84fn setup_db<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
85    let url = path
86        .as_ref()
87        .to_str()
88        .ok_or(MetricsError::InvalidDatabasePath)?;
89    let mut db = SqliteConnection::establish(url)?;
90    db.run_pending_migrations(MIGRATIONS)
91        .map_err(MetricsError::MigrationError)?;
92
93    Ok(db)
94}
95enum RegisterType {
96    Counter,
97    Gauge,
98    Histogram,
99}
100
101enum Event {
102    Stop,
103    DescribeKey(RegisterType, KeyName, Option<Unit>, SharedString),
104    RegisterKey(RegisterType, Key, Arc<Handle>),
105    IncrementCounter(Duration, Key, u64),
106    AbsoluteCounter(Duration, Key, u64),
107    UpdateGauge(Duration, Key, GaugeValue),
108    UpdateHistogram(Duration, Key, f64),
109    SetHousekeeping {
110        retention_period: Option<Duration>,
111        housekeeping_period: Option<Duration>,
112        record_limit: Option<usize>,
113    },
114    RequestSummaryFromSignpost {
115        signpost_key: String,
116        keys: Vec<String>,
117        tx: tokio::sync::oneshot::Sender<Result<HashMap<String, f64>>>,
118    },
119}
120
121/// Handle for continued communication with sqlite exporter
122pub struct SqliteExporterHandle {
123    sender: SyncSender<Event>,
124}
125impl SqliteExporterHandle {
126    /// Request average metrics from a signpost to latest from exporter's DB
127    pub fn request_average_metrics(
128        &self,
129        from_signpost: &str,
130        with_keys: &[&str],
131    ) -> Result<HashMap<String, f64>> {
132        let (tx, rx) = tokio::sync::oneshot::channel();
133        self.sender
134            .send(Event::RequestSummaryFromSignpost {
135                signpost_key: from_signpost.to_string(),
136                keys: with_keys.iter().map(|s| s.to_string()).collect(),
137                tx,
138            })
139            .map_err(|_| MetricsError::ExporterUnavailable)?;
140        match rx.blocking_recv() {
141            Ok(metrics) => Ok(metrics?),
142            Err(_) => Err(MetricsError::ExporterUnavailable),
143        }
144    }
145}
146
147/// Exports metrics by storing them in an SQLite database at a periodic interval
148pub struct SqliteExporter {
149    thread: Option<JoinHandle<()>>,
150    sender: SyncSender<Event>,
151}
152struct InnerState {
153    db: SqliteConnection,
154    last_housekeeping: Instant,
155    housekeeping: Option<Duration>,
156    retention: Option<Duration>,
157    record_limit: Option<usize>,
158    flush_duration: Duration,
159    last_flush: Instant,
160    last_values: HashMap<Key, f64>,
161    counters: HashMap<Key, u64>,
162    key_ids: HashMap<String, i64>,
163    queue: VecDeque<NewMetric>,
164}
165impl InnerState {
166    fn new(flush_duration: Duration, db: SqliteConnection) -> Self {
167        InnerState {
168            db,
169            last_housekeeping: Instant::now(),
170            housekeeping: None,
171            retention: None,
172            record_limit: None,
173            flush_duration,
174            last_flush: Instant::now(),
175            last_values: HashMap::new(),
176            counters: HashMap::new(),
177            key_ids: HashMap::new(),
178            queue: VecDeque::with_capacity(FLUSH_QUEUE_LIMIT),
179        }
180    }
181    fn set_housekeeping(
182        &mut self,
183        retention: Option<Duration>,
184        housekeeping_duration: Option<Duration>,
185        record_limit: Option<usize>,
186    ) {
187        self.retention = retention;
188        self.housekeeping = housekeeping_duration;
189        self.last_housekeeping = Instant::now();
190        self.record_limit = record_limit;
191    }
192    fn should_housekeep(&self) -> bool {
193        match self.housekeeping {
194            Some(duration) => self.last_housekeeping.elapsed() > duration,
195            None => false,
196        }
197    }
198    fn housekeep(&mut self) -> Result<(), diesel::result::Error> {
199        SqliteExporter::housekeeping(&mut self.db, self.retention, self.record_limit, false);
200        self.last_housekeeping = Instant::now();
201        Ok(())
202    }
203    fn should_flush(&self) -> bool {
204        if self.last_flush.elapsed() > self.flush_duration {
205            debug!("Flushing due to {}s timeout", self.flush_duration.as_secs());
206            true
207        } else {
208            self.queue.len() >= FLUSH_QUEUE_LIMIT
209        }
210    }
211    fn flush(&mut self) -> Result<(), diesel::result::Error> {
212        use crate::schema::metrics::dsl::metrics;
213        // trace!("Flushing {} records", self.queue.len());
214        let db = &mut self.db;
215        let queue = self.queue.drain(..);
216        db.transaction::<_, diesel::result::Error, _>(|db| {
217            for rec in queue {
218                insert_into(metrics).values(&rec).execute(db)?;
219            }
220            Ok(())
221        })?;
222        self.last_flush = Instant::now();
223        Ok(())
224    }
225    fn queue_metric(&mut self, timestamp: Duration, key: &str, value: f64) -> Result<()> {
226        let metric_key_id = match self.key_ids.get(key) {
227            Some(key) => *key,
228            None => {
229                debug!("Looking up {}", key);
230                let key_id = MetricKey::key_by_name(key, &mut self.db)?.id;
231                self.key_ids.insert(key.to_string(), key_id);
232                key_id
233            }
234        };
235        let metric = NewMetric {
236            timestamp: timestamp.as_secs_f64(),
237            metric_key_id,
238            value: value as _,
239        };
240        self.queue.push_back(metric);
241        Ok(())
242    }
243
244    // --- Summary/Average additions
245
246    pub fn metrics_summary_for_signpost_and_keys(
247        &mut self,
248        signpost: String,
249        metrics: Vec<String>,
250    ) -> Result<HashMap<String, f64>> {
251        query::metrics_summary_for_signpost_and_keys(&mut self.db, &signpost, metrics)
252    }
253}
254
255fn run_worker(
256    db: SqliteConnection,
257    receiver: Receiver<Event>,
258    flush_duration: Duration,
259) -> JoinHandle<()> {
260    thread::Builder::new()
261        .name("metrics-sqlite: worker".to_string())
262        .spawn(move || {
263            let mut state = InnerState::new(flush_duration, db);
264            info!("SQLite worker started");
265            loop {
266                let (should_flush, should_exit) = match receiver.recv_timeout(flush_duration) {
267                    Ok(Event::Stop) => {
268                        info!("Stopping SQLiteExporter worker, flushing & exiting");
269                        (true, true)
270                    }
271                    Ok(Event::SetHousekeeping {
272                        retention_period,
273                        housekeeping_period,
274                        record_limit,
275                    }) => {
276                        state.set_housekeeping(retention_period, housekeeping_period, record_limit);
277                        (false, false)
278                    }
279                    Ok(Event::DescribeKey(_key_type, key, unit, desc)) => {
280                        info!("Describing key {:?}", key);
281                        if let Err(e) = MetricKey::create_or_update(
282                            key.as_str(),
283                            unit,
284                            Some(desc.as_ref()),
285                            &mut state.db,
286                        ) {
287                            error!("Failed to create key entry: {:?}", e);
288                        }
289                        (false, false)
290                    }
291                    Ok(Event::RegisterKey(_key_type, _key, _handle)) => {
292                        // we currently don't do anything with register...
293                        (false, false)
294                    }
295                    Ok(Event::IncrementCounter(timestamp, key, value)) => {
296                        let key_str = key.name().to_string();
297                        let entry = state.counters.entry(key).or_insert(0);
298                        let value = {
299                            *entry += value;
300                            *entry
301                        };
302                        if let Err(e) = state.queue_metric(timestamp, &key_str, value as _) {
303                            error!("Error queueing metric: {:?}", e);
304                        }
305
306                        (state.should_flush(), false)
307                    }
308                    Ok(Event::AbsoluteCounter(timestamp, key, value)) => {
309                        let key_str = key.name().to_string();
310                        state.counters.insert(key, value);
311                        if let Err(e) = state.queue_metric(timestamp, &key_str, value as _) {
312                            error!("Error queueing metric: {:?}", e);
313                        }
314                        (state.should_flush(), false)
315                    }
316                    Ok(Event::UpdateGauge(timestamp, key, value)) => {
317                        let key_str = key.name().to_string();
318                        let entry = state.last_values.entry(key).or_insert(0.0);
319                        let value = match value {
320                            GaugeValue::Absolute(v) => {
321                                *entry = v;
322                                *entry
323                            }
324                            GaugeValue::Increment(v) => {
325                                *entry += v;
326                                *entry
327                            }
328                            GaugeValue::Decrement(v) => {
329                                *entry -= v;
330                                *entry
331                            }
332                        };
333                        if let Err(e) = state.queue_metric(timestamp, &key_str, value) {
334                            error!("Error queueing metric: {:?}", e);
335                        }
336                        (state.should_flush(), false)
337                    }
338                    Ok(Event::UpdateHistogram(timestamp, key, value)) => {
339                        let key_str = key.name().to_string();
340                        if let Err(e) = state.queue_metric(timestamp, &key_str, value) {
341                            error!("Error queueing metric: {:?}", e);
342                        }
343
344                        (state.should_flush(), false)
345                    }
346                    Ok(Event::RequestSummaryFromSignpost {
347                        signpost_key,
348                        keys,
349                        tx,
350                    }) => {
351                        match state.flush() {
352                            Ok(()) => match state
353                                .metrics_summary_for_signpost_and_keys(signpost_key, keys)
354                            {
355                                Ok(metrics) => {
356                                    if tx.send(Ok(metrics)).is_err() {
357                                        error!(
358                                            "Failed to respond with metrics results, discarding"
359                                        );
360                                    }
361                                }
362                                Err(e) => {
363                                    if let Err(e) = tx.send(Err(e)) {
364                                        error!(
365                                            "Failed to respond with metrics error result, discarding: {e:?}"
366                                        );
367                                    }
368                                }
369                            },
370                            Err(e) => {
371                                let err = MetricsError::from(e);
372                                error!(
373                                    "Failed to flush pending metrics before summary request: {err:?}"
374                                );
375                                if let Err(send_err) = tx.send(Err(err)) {
376                                    error!(
377                                        "Failed to respond with metrics flush error result, discarding: {send_err:?}"
378                                    );
379                                }
380                            }
381                        }
382                        (false, false)
383                    }
384                    Err(RecvTimeoutError::Timeout) => {
385                        debug!("Flushing due to {}s timeout", flush_duration.as_secs());
386                        (true, false)
387                    }
388                    Err(RecvTimeoutError::Disconnected) => {
389                        warn!("SQLiteExporter channel disconnected, exiting worker");
390                        (true, true)
391                    }
392                };
393                if should_flush {
394                    if let Err(e) = state.flush() {
395                        error!("Error flushing metrics: {}", e);
396                    }
397                }
398                if state.should_housekeep() {
399                    if let Err(e) = state.housekeep() {
400                        error!("Failed running house keeping: {:?}", e);
401                    }
402                }
403                if should_exit {
404                    break;
405                }
406            }
407        })
408        .unwrap()
409}
410
411impl SqliteExporter {
412    /// Creates a new `SqliteExporter` that stores metrics in an SQLite database file.
413    ///
414    /// `flush_interval` specifies how often metrics are flushed to SQLite/disk
415    ///
416    /// `keep_duration` specifies how long data is kept before deleting, performed new()
417    pub fn new<P: AsRef<Path>>(
418        flush_interval: Duration,
419        keep_duration: Option<Duration>,
420        path: P,
421    ) -> Result<Self> {
422        let mut db = setup_db(path)?;
423        Self::housekeeping(&mut db, keep_duration, None, true);
424        let (sender, receiver) = std::sync::mpsc::sync_channel(BACKGROUND_CHANNEL_LIMIT);
425        let thread = run_worker(db, receiver, flush_interval);
426        let exporter = SqliteExporter {
427            thread: Some(thread),
428            sender,
429        };
430        Ok(exporter)
431    }
432
433    /// Sets optional periodic housekeeping, None to disable (disabled by default)
434    /// ## Notes
435    /// Periodic housekeeping can affect metric recording, causing some data to be dropped during housekeeping.
436    /// Record limit if set will cause anything over limit + 25% of the limit to be removed
437    pub fn set_periodic_housekeeping(
438        &self,
439        periodic_duration: Option<Duration>,
440        retention: Option<Duration>,
441        record_limit: Option<usize>,
442    ) {
443        if let Err(e) = self.sender.send(Event::SetHousekeeping {
444            retention_period: retention,
445            housekeeping_period: periodic_duration,
446            record_limit,
447        }) {
448            error!("Failed to set house keeping settings: {:?}", e);
449        }
450    }
451
452    /// Run housekeeping.
453    ///
454    /// Does nothing if None was given for keep_duration in `new()`
455    fn housekeeping(
456        db: &mut SqliteConnection,
457        keep_duration: Option<Duration>,
458        record_limit: Option<usize>,
459        vacuum: bool,
460    ) {
461        use crate::schema::metrics::dsl::*;
462        use diesel::dsl::count;
463        if let Some(keep_duration) = keep_duration {
464            match SystemTime::UNIX_EPOCH.elapsed() {
465                Ok(now) => {
466                    let cutoff = now - keep_duration;
467                    trace!("Deleting data {}s old", keep_duration.as_secs());
468                    if let Err(e) =
469                        diesel::delete(metrics.filter(timestamp.le(cutoff.as_secs_f64())))
470                            .execute(db)
471                    {
472                        error!("Failed to remove old metrics data: {}", e);
473                    }
474                    if vacuum {
475                        if let Err(e) = sql_query("VACUUM").execute(db) {
476                            error!("Failed to vacuum SQLite DB: {:?}", e);
477                        }
478                    }
479                }
480                Err(e) => {
481                    error!(
482                        "System time error, skipping metrics-sqlite housekeeping: {}",
483                        e
484                    );
485                }
486            }
487        }
488        if let Some(record_limit) = record_limit {
489            trace!("Checking for records over {} limit", record_limit);
490            match metrics.select(count(id)).first::<i64>(db) {
491                Ok(records) => {
492                    let records = records as usize;
493                    if records > record_limit {
494                        let excess = records - record_limit + (record_limit / 4); // delete excess + 25% of limit
495                        trace!(
496                            "Exceeded limit! {} > {}, deleting {} oldest",
497                            records,
498                            record_limit,
499                            excess
500                        );
501                        let query = format!("DELETE FROM metrics WHERE id IN (SELECT id FROM metrics ORDER BY timestamp ASC LIMIT {excess});");
502                        if let Err(e) = sql_query(query).execute(db) {
503                            error!("Failed to delete excessive records: {:?}", e);
504                        }
505                    }
506                }
507                Err(e) => {
508                    error!("Failed to get record count: {:?}", e);
509                }
510            }
511        }
512    }
513
514    /// Install recorder as `metrics` crate's Recorder
515    pub fn install(self) -> Result<SqliteExporterHandle, SetRecorderError<Self>> {
516        let handle = SqliteExporterHandle {
517            sender: self.sender.clone(),
518        };
519        metrics::set_global_recorder(self)?;
520        Ok(handle)
521    }
522}
523impl Drop for SqliteExporter {
524    fn drop(&mut self) {
525        let _ = self.sender.send(Event::Stop);
526        let _ = self.thread.take().unwrap().join();
527    }
528}
529
530#[cfg(test)]
531mod tests {
532    use crate::SqliteExporter;
533    use std::time::{Duration, Instant};
534
535    #[test]
536    fn test_threading() {
537        use std::thread;
538        SqliteExporter::new(Duration::from_millis(500), None, "metrics.db")
539            .unwrap()
540            .install()
541            .unwrap();
542        let joins: Vec<thread::JoinHandle<()>> = (0..5)
543            .map(|_| {
544                thread::spawn(move || {
545                    let start = Instant::now();
546                    loop {
547                        metrics::gauge!("rate").set(1.0);
548                        metrics::counter!("hits").increment(1);
549                        metrics::histogram!("histogram").record(5.0);
550                        if start.elapsed().as_secs() >= 5 {
551                            break;
552                        }
553                    }
554                })
555            })
556            .collect();
557        for j in joins {
558            j.join().unwrap();
559        }
560    }
561}