Skip to main content

metrics_sqlite/
lib.rs

1#![deny(missing_docs)]
2//! # Metrics SQLite backend
3
4#[macro_use]
5extern crate diesel;
6#[macro_use]
7extern crate diesel_migrations;
8use tracing::{debug, error, info, trace, warn};
9
10use diesel::prelude::*;
11use diesel::{insert_into, sql_query};
12
13use metrics::{GaugeValue, Key, KeyName, SetRecorderError, SharedString, Unit};
14
15use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
16use std::sync::Arc;
17use std::{
18    collections::{HashMap, VecDeque},
19    path::{Path, PathBuf},
20    sync::mpsc::{Receiver, RecvTimeoutError, SyncSender},
21    thread::{self, JoinHandle},
22    time::{Duration, Instant, SystemTime},
23};
24use thiserror::Error;
25
26/// Max number of items allowed in worker's queue before flushing regardless of flush duration
27const FLUSH_QUEUE_LIMIT: usize = 1000;
28const BACKGROUND_CHANNEL_LIMIT: usize = 8000;
29const SQLITE_DEFAULT_MAX_VARIABLES: usize = 999;
30const METRIC_FIELDS_PER_ROW: usize = 3;
31const INSERT_BATCH_SIZE: usize = SQLITE_DEFAULT_MAX_VARIABLES / METRIC_FIELDS_PER_ROW;
32
33/// Error type for any db/vitals related errors
34#[derive(Debug, Error)]
35pub enum MetricsError {
36    /// Error with database
37    #[error("Database error: {0}")]
38    DbConnectionError(#[from] ConnectionError),
39    /// Error migrating database
40    #[error("Migration error: {0}")]
41    MigrationError(Box<dyn std::error::Error + Send + Sync>),
42    /// Error querying metrics DB
43    #[error("Error querying DB: {0}")]
44    QueryError(#[from] diesel::result::Error),
45    /// Error if the path given is invalid
46    #[error("Invalid database path")]
47    InvalidDatabasePath,
48    /// IO Error with reader/writer
49    #[cfg(feature = "csv")]
50    #[error("IO Error: {0}")]
51    IoError(#[from] std::io::Error),
52    /// Error writing CSV
53    #[cfg(feature = "csv")]
54    #[error("CSV Error: {0}")]
55    CsvError(#[from] csv::Error),
56    /// Attempted to query the database but found no records
57    #[error("Database has no metrics stored in it")]
58    EmptyDatabase,
59    /// Given metric key name wasn't found in the DB
60    #[error("Metric key {0} not found in database")]
61    KeyNotFound(String),
62    /// Attempting to communicate with exporter but it's gone away
63    #[error("Exporter task has been stopped or crashed")]
64    ExporterUnavailable,
65    /// Session derived from the signpost has zero duration
66    #[error("Session for signpost `{0}` has zero duration")]
67    ZeroLengthSession(String),
68    /// No metrics available for the requested key inside the derived session
69    #[error("No metrics recorded for `{0}` in requested session")]
70    NoMetricsForKey(String),
71}
72
73impl MetricsError {
74    /// Check if this error indicates a malformed/corrupt database
75    fn is_malformed_db(&self) -> bool {
76        self.to_string().contains("malformed")
77    }
78}
79
80/// Metrics result type
81pub type Result<T, E = MetricsError> = std::result::Result<T, E>;
82
83mod metrics_db;
84mod models;
85mod recorder;
86mod schema;
87
88use crate::metrics_db::query;
89use crate::recorder::Handle;
90pub use metrics_db::{MetricsDb, Session};
91pub use models::{Metric, MetricKey, NewMetric};
92
93pub(crate) const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
94
95#[derive(QueryableByName)]
96struct PragmaCheckResult {
97    #[diesel(sql_type = diesel::sql_types::Text)]
98    #[diesel(column_name = quick_check)]
99    result: String,
100}
101
102/// Remove database file and its WAL/SHM sidecar files
103fn remove_db_files(path: &Path) {
104    let db_path = PathBuf::from(path);
105    for suffix in &["", "-wal", "-shm"] {
106        let mut file_path = db_path.clone().into_os_string();
107        file_path.push(suffix);
108        let file_path = PathBuf::from(file_path);
109        if file_path.exists() {
110            if let Err(e) = std::fs::remove_file(&file_path) {
111                error!("Failed to remove {}: {}", file_path.display(), e);
112            } else {
113                info!("Removed corrupt database file: {}", file_path.display());
114            }
115        }
116    }
117}
118
119fn setup_db<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
120    let url = path
121        .as_ref()
122        .to_str()
123        .ok_or(MetricsError::InvalidDatabasePath)?;
124    let mut db = SqliteConnection::establish(url)?;
125
126    // Enable WAL mode for better concurrent access
127    sql_query("PRAGMA journal_mode=WAL;").execute(&mut db)?;
128
129    // Set busy timeout to 5 seconds to handle lock contention gracefully
130    sql_query("PRAGMA busy_timeout = 5000;").execute(&mut db)?;
131
132    db.run_pending_migrations(MIGRATIONS)
133        .map_err(MetricsError::MigrationError)?;
134
135    // Check for corruption that may not surface until queries run
136    let check: String = sql_query("PRAGMA quick_check;")
137        .get_result::<PragmaCheckResult>(&mut db)?
138        .result;
139    if check != "ok" {
140        return Err(MetricsError::QueryError(
141            diesel::result::Error::DatabaseError(
142                diesel::result::DatabaseErrorKind::Unknown,
143                Box::new(format!("database disk image is malformed: {check}")),
144            ),
145        ));
146    }
147
148    Ok(db)
149}
150
151/// Like `setup_db`, but if the database is malformed, removes it and retries once.
152fn setup_db_or_reset<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
153    let path = path.as_ref();
154    match setup_db(path) {
155        Ok(db) => Ok(db),
156        Err(err) if err.is_malformed_db() => {
157            warn!(
158                "Database is malformed, removing and recreating: {}",
159                path.display()
160            );
161            remove_db_files(path);
162            setup_db(path)
163        }
164        Err(err) => Err(err),
165    }
166}
167enum RegisterType {
168    Counter,
169    Gauge,
170    Histogram,
171}
172
173enum Event {
174    Stop,
175    DescribeKey(RegisterType, KeyName, Option<Unit>, SharedString),
176    RegisterKey(RegisterType, Key, Arc<Handle>),
177    IncrementCounter(Duration, Key, u64),
178    AbsoluteCounter(Duration, Key, u64),
179    UpdateGauge(Duration, Key, GaugeValue),
180    UpdateHistogram(Duration, Key, f64),
181    SetHousekeeping {
182        retention_period: Option<Duration>,
183        housekeeping_period: Option<Duration>,
184        record_limit: Option<usize>,
185    },
186    RequestSummaryFromSignpost {
187        signpost_key: String,
188        keys: Vec<String>,
189        tx: tokio::sync::oneshot::Sender<Result<HashMap<String, f64>>>,
190    },
191}
192
193/// Handle for continued communication with sqlite exporter
194pub struct SqliteExporterHandle {
195    sender: SyncSender<Event>,
196}
197impl SqliteExporterHandle {
198    /// Request average metrics from a signpost to latest from exporter's DB
199    pub fn request_average_metrics(
200        &self,
201        from_signpost: &str,
202        with_keys: &[&str],
203    ) -> Result<HashMap<String, f64>> {
204        let (tx, rx) = tokio::sync::oneshot::channel();
205        self.sender
206            .send(Event::RequestSummaryFromSignpost {
207                signpost_key: from_signpost.to_string(),
208                keys: with_keys.iter().map(|s| s.to_string()).collect(),
209                tx,
210            })
211            .map_err(|_| MetricsError::ExporterUnavailable)?;
212        match rx.blocking_recv() {
213            Ok(metrics) => Ok(metrics?),
214            Err(_) => Err(MetricsError::ExporterUnavailable),
215        }
216    }
217}
218
219/// Exports metrics by storing them in an SQLite database at a periodic interval
220pub struct SqliteExporter {
221    thread: Option<JoinHandle<()>>,
222    sender: SyncSender<Event>,
223}
224struct InnerState {
225    db: SqliteConnection,
226    last_housekeeping: Instant,
227    housekeeping: Option<Duration>,
228    retention: Option<Duration>,
229    record_limit: Option<usize>,
230    flush_duration: Duration,
231    last_flush: Instant,
232    last_values: HashMap<Key, f64>,
233    counters: HashMap<Key, u64>,
234    key_ids: HashMap<String, i64>,
235    queue: VecDeque<NewMetric>,
236}
237impl InnerState {
238    fn new(flush_duration: Duration, db: SqliteConnection) -> Self {
239        InnerState {
240            db,
241            last_housekeeping: Instant::now(),
242            housekeeping: None,
243            retention: None,
244            record_limit: None,
245            flush_duration,
246            last_flush: Instant::now(),
247            last_values: HashMap::new(),
248            counters: HashMap::new(),
249            key_ids: HashMap::new(),
250            queue: VecDeque::with_capacity(FLUSH_QUEUE_LIMIT),
251        }
252    }
253    fn set_housekeeping(
254        &mut self,
255        retention: Option<Duration>,
256        housekeeping_duration: Option<Duration>,
257        record_limit: Option<usize>,
258    ) {
259        self.retention = retention;
260        self.housekeeping = housekeeping_duration;
261        self.last_housekeeping = Instant::now();
262        self.record_limit = record_limit;
263    }
264    fn should_housekeep(&self) -> bool {
265        match self.housekeeping {
266            Some(duration) => self.last_housekeeping.elapsed() > duration,
267            None => false,
268        }
269    }
270    fn housekeep(&mut self) -> Result<(), diesel::result::Error> {
271        SqliteExporter::housekeeping(&mut self.db, self.retention, self.record_limit, false);
272        self.last_housekeeping = Instant::now();
273        Ok(())
274    }
275    fn should_flush(&self) -> bool {
276        if self.last_flush.elapsed() > self.flush_duration {
277            true
278        } else if self.queue.len() >= FLUSH_QUEUE_LIMIT {
279            debug!("Flushing due to queue size ({} items)", self.queue.len());
280            true
281        } else {
282            false
283        }
284    }
285    fn flush(&mut self) -> Result<(), diesel::result::Error> {
286        use crate::schema::metrics::dsl::metrics;
287        if self.queue.is_empty() {
288            self.last_flush = Instant::now();
289            return Ok(());
290        }
291        let drain_buffer: Vec<NewMetric> = self.queue.drain(..).collect();
292        let db = &mut self.db;
293        let transaction_result = db.transaction::<_, diesel::result::Error, _>(|db| {
294            let chunk_size = INSERT_BATCH_SIZE.max(1);
295            for chunk in drain_buffer.chunks(chunk_size) {
296                insert_into(metrics).values(chunk).execute(db)?;
297            }
298            Ok(())
299        });
300        match transaction_result {
301            Ok(()) => {
302                self.last_flush = Instant::now();
303                Ok(())
304            }
305            Err(e) => {
306                self.queue.extend(drain_buffer);
307                Err(e)
308            }
309        }
310    }
311    fn queue_metric(&mut self, timestamp: Duration, key: &str, value: f64) -> Result<()> {
312        let metric_key_id = match self.key_ids.get(key) {
313            Some(key) => *key,
314            None => {
315                debug!("Looking up {}", key);
316                let key_id = MetricKey::key_by_name(key, &mut self.db)?.id;
317                self.key_ids.insert(key.to_string(), key_id);
318                key_id
319            }
320        };
321        let metric = NewMetric {
322            timestamp: timestamp.as_secs_f64(),
323            metric_key_id,
324            value: value as _,
325        };
326        self.queue.push_back(metric);
327        Ok(())
328    }
329
330    // --- Summary/Average additions
331
332    pub fn metrics_summary_for_signpost_and_keys(
333        &mut self,
334        signpost: String,
335        metrics: Vec<String>,
336    ) -> Result<HashMap<String, f64>> {
337        query::metrics_summary_for_signpost_and_keys(&mut self.db, &signpost, metrics)
338    }
339}
340
341fn run_worker(
342    db: SqliteConnection,
343    receiver: Receiver<Event>,
344    flush_duration: Duration,
345) -> JoinHandle<()> {
346    thread::Builder::new()
347        .name("metrics-sqlite: worker".to_string())
348        .spawn(move || {
349            let mut state = InnerState::new(flush_duration, db);
350            info!("SQLite worker started");
351            loop {
352                // Check if we need to flush based on elapsed time
353                let time_based_flush = state.last_flush.elapsed() >= flush_duration;
354
355                let (should_flush, should_exit) = match receiver.recv_timeout(flush_duration) {
356                    Ok(Event::Stop) => {
357                        info!("Stopping SQLiteExporter worker, flushing & exiting");
358                        (true, true)
359                    }
360                    Ok(Event::SetHousekeeping {
361                        retention_period,
362                        housekeeping_period,
363                        record_limit,
364                    }) => {
365                        state.set_housekeeping(retention_period, housekeeping_period, record_limit);
366                        (false, false)
367                    }
368                    Ok(Event::DescribeKey(_key_type, key, unit, desc)) => {
369                        info!("Describing key {:?}", key);
370                        if let Err(e) = MetricKey::create_or_update(
371                            key.as_str(),
372                            unit,
373                            Some(desc.as_ref()),
374                            &mut state.db,
375                        ) {
376                            error!("Failed to create key entry: {:?}", e);
377                        }
378                        (false, false)
379                    }
380                    Ok(Event::RegisterKey(_key_type, _key, _handle)) => {
381                        // we currently don't do anything with register...
382                        (false, false)
383                    }
384                    Ok(Event::IncrementCounter(timestamp, key, value)) => {
385                        let key_name = key.name();
386                        let entry = state.counters.entry(key.clone()).or_insert(0);
387                        let value = {
388                            *entry += value;
389                            *entry
390                        };
391                        if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
392                            error!("Error queueing metric: {:?}", e);
393                        }
394
395                        (state.should_flush(), false)
396                    }
397                    Ok(Event::AbsoluteCounter(timestamp, key, value)) => {
398                        let key_name = key.name();
399                        state.counters.insert(key.clone(), value);
400                        if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
401                            error!("Error queueing metric: {:?}", e);
402                        }
403                        (state.should_flush(), false)
404                    }
405                    Ok(Event::UpdateGauge(timestamp, key, value)) => {
406                        let key_name = key.name();
407                        let entry = state.last_values.entry(key.clone()).or_insert(0.0);
408                        let value = match value {
409                            GaugeValue::Absolute(v) => {
410                                *entry = v;
411                                *entry
412                            }
413                            GaugeValue::Increment(v) => {
414                                *entry += v;
415                                *entry
416                            }
417                            GaugeValue::Decrement(v) => {
418                                *entry -= v;
419                                *entry
420                            }
421                        };
422                        if let Err(e) = state.queue_metric(timestamp, key_name, value) {
423                            error!("Error queueing metric: {:?}", e);
424                        }
425                        (state.should_flush(), false)
426                    }
427                    Ok(Event::UpdateHistogram(timestamp, key, value)) => {
428                        let key_name = key.name();
429                        if let Err(e) = state.queue_metric(timestamp, key_name, value) {
430                            error!("Error queueing metric: {:?}", e);
431                        }
432
433                        (state.should_flush(), false)
434                    }
435                    Ok(Event::RequestSummaryFromSignpost {
436                        signpost_key,
437                        keys,
438                        tx,
439                    }) => {
440                        match state.flush() {
441                            Ok(()) => match state
442                                .metrics_summary_for_signpost_and_keys(signpost_key, keys)
443                            {
444                                Ok(metrics) => {
445                                    if tx.send(Ok(metrics)).is_err() {
446                                        error!(
447                                            "Failed to respond with metrics results, discarding"
448                                        );
449                                    }
450                                }
451                                Err(e) => {
452                                    if let Err(e) = tx.send(Err(e)) {
453                                        error!(
454                                            "Failed to respond with metrics error result, discarding: {e:?}"
455                                        );
456                                    }
457                                }
458                            },
459                            Err(e) => {
460                                let err = MetricsError::from(e);
461                                error!(
462                                    "Failed to flush pending metrics before summary request: {err:?}"
463                                );
464                                if let Err(send_err) = tx.send(Err(err)) {
465                                    error!(
466                                        "Failed to respond with metrics flush error result, discarding: {send_err:?}"
467                                    );
468                                }
469                            }
470                        }
471                        (false, false)
472                    }
473                    Err(RecvTimeoutError::Timeout) => {
474                        (true, false)
475                    }
476                    Err(RecvTimeoutError::Disconnected) => {
477                        warn!("SQLiteExporter channel disconnected, exiting worker");
478                        (true, true)
479                    }
480                };
481
482                // Flush if time-based flush is triggered OR if event-based flush is triggered
483                if time_based_flush || should_flush {
484                    if time_based_flush {
485                        debug!("Flushing due to elapsed time ({}s)", flush_duration.as_secs());
486                    }
487                    if let Err(e) = state.flush() {
488                        error!("Error flushing metrics: {}", e);
489                    }
490                }
491                if state.should_housekeep() {
492                    if let Err(e) = state.housekeep() {
493                        error!("Failed running house keeping: {:?}", e);
494                    }
495                }
496                if should_exit {
497                    break;
498                }
499            }
500        })
501        .unwrap()
502}
503
504impl SqliteExporter {
505    /// Creates a new `SqliteExporter` that stores metrics in an SQLite database file.
506    ///
507    /// `flush_interval` specifies how often metrics are flushed to SQLite/disk
508    ///
509    /// `keep_duration` specifies how long data is kept before deleting, performed new()
510    pub fn new<P: AsRef<Path>>(
511        flush_interval: Duration,
512        keep_duration: Option<Duration>,
513        path: P,
514    ) -> Result<Self> {
515        let mut db = setup_db_or_reset(path)?;
516        Self::housekeeping(&mut db, keep_duration, None, true);
517        let (sender, receiver) = std::sync::mpsc::sync_channel(BACKGROUND_CHANNEL_LIMIT);
518        let thread = run_worker(db, receiver, flush_interval);
519        let exporter = SqliteExporter {
520            thread: Some(thread),
521            sender,
522        };
523        Ok(exporter)
524    }
525
526    /// Sets optional periodic housekeeping, None to disable (disabled by default)
527    /// ## Notes
528    /// Periodic housekeeping can affect metric recording, causing some data to be dropped during housekeeping.
529    /// Record limit if set will cause anything over limit + 25% of the limit to be removed
530    pub fn set_periodic_housekeeping(
531        &self,
532        periodic_duration: Option<Duration>,
533        retention: Option<Duration>,
534        record_limit: Option<usize>,
535    ) {
536        if let Err(e) = self.sender.send(Event::SetHousekeeping {
537            retention_period: retention,
538            housekeeping_period: periodic_duration,
539            record_limit,
540        }) {
541            error!("Failed to set house keeping settings: {:?}", e);
542        }
543    }
544
545    /// Run housekeeping.
546    ///
547    /// Does nothing if None was given for keep_duration in `new()`
548    fn housekeeping(
549        db: &mut SqliteConnection,
550        keep_duration: Option<Duration>,
551        record_limit: Option<usize>,
552        vacuum: bool,
553    ) {
554        use crate::schema::metrics::dsl::*;
555        use diesel::dsl::count;
556        if let Some(keep_duration) = keep_duration {
557            match SystemTime::UNIX_EPOCH.elapsed() {
558                Ok(now) => {
559                    let cutoff = now - keep_duration;
560                    trace!("Deleting data {}s old", keep_duration.as_secs());
561                    if let Err(e) =
562                        diesel::delete(metrics.filter(timestamp.le(cutoff.as_secs_f64())))
563                            .execute(db)
564                    {
565                        error!("Failed to remove old metrics data: {}", e);
566                    }
567                    if vacuum {
568                        if let Err(e) = sql_query("VACUUM").execute(db) {
569                            error!("Failed to vacuum SQLite DB: {:?}", e);
570                        }
571                    }
572                }
573                Err(e) => {
574                    error!(
575                        "System time error, skipping metrics-sqlite housekeeping: {}",
576                        e
577                    );
578                }
579            }
580        }
581        if let Some(record_limit) = record_limit {
582            trace!("Checking for records over {} limit", record_limit);
583            match metrics.select(count(id)).first::<i64>(db) {
584                Ok(records) => {
585                    let records = records as usize;
586                    if records > record_limit {
587                        let excess = records - record_limit + (record_limit / 4); // delete excess + 25% of limit
588                        trace!(
589                            "Exceeded limit! {} > {}, deleting {} oldest",
590                            records,
591                            record_limit,
592                            excess
593                        );
594                        let query = format!("DELETE FROM metrics WHERE id IN (SELECT id FROM metrics ORDER BY timestamp ASC LIMIT {excess});");
595                        if let Err(e) = sql_query(query).execute(db) {
596                            error!("Failed to delete excessive records: {:?}", e);
597                        }
598                    }
599                }
600                Err(e) => {
601                    error!("Failed to get record count: {:?}", e);
602                }
603            }
604        }
605    }
606
607    /// Install recorder as `metrics` crate's Recorder
608    pub fn install(self) -> Result<SqliteExporterHandle, SetRecorderError<Self>> {
609        let handle = SqliteExporterHandle {
610            sender: self.sender.clone(),
611        };
612        metrics::set_global_recorder(self)?;
613        Ok(handle)
614    }
615}
616impl Drop for SqliteExporter {
617    fn drop(&mut self) {
618        let _ = self.sender.send(Event::Stop);
619        let _ = self.thread.take().unwrap().join();
620    }
621}
622
623#[cfg(test)]
624mod tests {
625    use crate::SqliteExporter;
626    use std::time::{Duration, Instant};
627
628    #[test]
629    fn test_threading() {
630        use std::thread;
631        SqliteExporter::new(Duration::from_millis(500), None, "metrics.db")
632            .unwrap()
633            .install()
634            .unwrap();
635        let joins: Vec<thread::JoinHandle<()>> = (0..5)
636            .map(|_| {
637                thread::spawn(move || {
638                    let start = Instant::now();
639                    loop {
640                        metrics::gauge!("rate").set(1.0);
641                        metrics::counter!("hits").increment(1);
642                        metrics::histogram!("histogram").record(5.0);
643                        if start.elapsed().as_secs() >= 5 {
644                            break;
645                        }
646                    }
647                })
648            })
649            .collect();
650        for j in joins {
651            j.join().unwrap();
652        }
653    }
654}