metrics_sqlite/
lib.rs

1#![deny(missing_docs)]
2//! # Metrics SQLite backend
3
4#[macro_use]
5extern crate diesel;
6#[macro_use]
7extern crate diesel_migrations;
8use tracing::{debug, error, info, trace, warn};
9
10use diesel::prelude::*;
11use diesel::{insert_into, sql_query};
12
13use metrics::{GaugeValue, Key, KeyName, SetRecorderError, SharedString, Unit};
14
15use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
16use std::sync::Arc;
17use std::{
18    collections::{HashMap, VecDeque},
19    path::Path,
20    sync::mpsc::{Receiver, RecvTimeoutError, SyncSender},
21    thread::{self, JoinHandle},
22    time::{Duration, Instant, SystemTime},
23};
24use thiserror::Error;
25
26/// Max number of items allowed in worker's queue before flushing regardless of flush duration
27const FLUSH_QUEUE_LIMIT: usize = 1000;
28const BACKGROUND_CHANNEL_LIMIT: usize = 8000;
29const SQLITE_DEFAULT_MAX_VARIABLES: usize = 999;
30const METRIC_FIELDS_PER_ROW: usize = 3;
31const INSERT_BATCH_SIZE: usize = SQLITE_DEFAULT_MAX_VARIABLES / METRIC_FIELDS_PER_ROW;
32
33/// Error type for any db/vitals related errors
34#[derive(Debug, Error)]
35pub enum MetricsError {
36    /// Error with database
37    #[error("Database error: {0}")]
38    DbConnectionError(#[from] ConnectionError),
39    /// Error migrating database
40    #[error("Migration error: {0}")]
41    MigrationError(Box<dyn std::error::Error + Send + Sync>),
42    /// Error querying metrics DB
43    #[error("Error querying DB: {0}")]
44    QueryError(#[from] diesel::result::Error),
45    /// Error if the path given is invalid
46    #[error("Invalid database path")]
47    InvalidDatabasePath,
48    /// IO Error with reader/writer
49    #[cfg(feature = "csv")]
50    #[error("IO Error: {0}")]
51    IoError(#[from] std::io::Error),
52    /// Error writing CSV
53    #[cfg(feature = "csv")]
54    #[error("CSV Error: {0}")]
55    CsvError(#[from] csv::Error),
56    /// Attempted to query the database but found no records
57    #[error("Database has no metrics stored in it")]
58    EmptyDatabase,
59    /// Given metric key name wasn't found in the DB
60    #[error("Metric key {0} not found in database")]
61    KeyNotFound(String),
62    /// Attempting to communicate with exporter but it's gone away
63    #[error("Exporter task has been stopped or crashed")]
64    ExporterUnavailable,
65    /// Session derived from the signpost has zero duration
66    #[error("Session for signpost `{0}` has zero duration")]
67    ZeroLengthSession(String),
68    /// No metrics available for the requested key inside the derived session
69    #[error("No metrics recorded for `{0}` in requested session")]
70    NoMetricsForKey(String),
71}
72/// Metrics result type
73pub type Result<T, E = MetricsError> = std::result::Result<T, E>;
74
75mod metrics_db;
76mod models;
77mod recorder;
78mod schema;
79
80use crate::metrics_db::query;
81use crate::recorder::Handle;
82pub use metrics_db::{MetricsDb, Session};
83pub use models::{Metric, MetricKey, NewMetric};
84
85pub(crate) const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
86
87fn setup_db<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
88    let url = path
89        .as_ref()
90        .to_str()
91        .ok_or(MetricsError::InvalidDatabasePath)?;
92    let mut db = SqliteConnection::establish(url)?;
93
94    // Enable WAL mode for better concurrent access
95    sql_query("PRAGMA journal_mode=WAL;").execute(&mut db)?;
96
97    // Set busy timeout to 5 seconds to handle lock contention gracefully
98    sql_query("PRAGMA busy_timeout = 5000;").execute(&mut db)?;
99
100    db.run_pending_migrations(MIGRATIONS)
101        .map_err(MetricsError::MigrationError)?;
102
103    Ok(db)
104}
105enum RegisterType {
106    Counter,
107    Gauge,
108    Histogram,
109}
110
111enum Event {
112    Stop,
113    DescribeKey(RegisterType, KeyName, Option<Unit>, SharedString),
114    RegisterKey(RegisterType, Key, Arc<Handle>),
115    IncrementCounter(Duration, Key, u64),
116    AbsoluteCounter(Duration, Key, u64),
117    UpdateGauge(Duration, Key, GaugeValue),
118    UpdateHistogram(Duration, Key, f64),
119    SetHousekeeping {
120        retention_period: Option<Duration>,
121        housekeeping_period: Option<Duration>,
122        record_limit: Option<usize>,
123    },
124    RequestSummaryFromSignpost {
125        signpost_key: String,
126        keys: Vec<String>,
127        tx: tokio::sync::oneshot::Sender<Result<HashMap<String, f64>>>,
128    },
129}
130
131/// Handle for continued communication with sqlite exporter
132pub struct SqliteExporterHandle {
133    sender: SyncSender<Event>,
134}
135impl SqliteExporterHandle {
136    /// Request average metrics from a signpost to latest from exporter's DB
137    pub fn request_average_metrics(
138        &self,
139        from_signpost: &str,
140        with_keys: &[&str],
141    ) -> Result<HashMap<String, f64>> {
142        let (tx, rx) = tokio::sync::oneshot::channel();
143        self.sender
144            .send(Event::RequestSummaryFromSignpost {
145                signpost_key: from_signpost.to_string(),
146                keys: with_keys.iter().map(|s| s.to_string()).collect(),
147                tx,
148            })
149            .map_err(|_| MetricsError::ExporterUnavailable)?;
150        match rx.blocking_recv() {
151            Ok(metrics) => Ok(metrics?),
152            Err(_) => Err(MetricsError::ExporterUnavailable),
153        }
154    }
155}
156
157/// Exports metrics by storing them in an SQLite database at a periodic interval
158pub struct SqliteExporter {
159    thread: Option<JoinHandle<()>>,
160    sender: SyncSender<Event>,
161}
162struct InnerState {
163    db: SqliteConnection,
164    last_housekeeping: Instant,
165    housekeeping: Option<Duration>,
166    retention: Option<Duration>,
167    record_limit: Option<usize>,
168    flush_duration: Duration,
169    last_flush: Instant,
170    last_values: HashMap<Key, f64>,
171    counters: HashMap<Key, u64>,
172    key_ids: HashMap<String, i64>,
173    queue: VecDeque<NewMetric>,
174}
175impl InnerState {
176    fn new(flush_duration: Duration, db: SqliteConnection) -> Self {
177        InnerState {
178            db,
179            last_housekeeping: Instant::now(),
180            housekeeping: None,
181            retention: None,
182            record_limit: None,
183            flush_duration,
184            last_flush: Instant::now(),
185            last_values: HashMap::new(),
186            counters: HashMap::new(),
187            key_ids: HashMap::new(),
188            queue: VecDeque::with_capacity(FLUSH_QUEUE_LIMIT),
189        }
190    }
191    fn set_housekeeping(
192        &mut self,
193        retention: Option<Duration>,
194        housekeeping_duration: Option<Duration>,
195        record_limit: Option<usize>,
196    ) {
197        self.retention = retention;
198        self.housekeeping = housekeeping_duration;
199        self.last_housekeeping = Instant::now();
200        self.record_limit = record_limit;
201    }
202    fn should_housekeep(&self) -> bool {
203        match self.housekeeping {
204            Some(duration) => self.last_housekeeping.elapsed() > duration,
205            None => false,
206        }
207    }
208    fn housekeep(&mut self) -> Result<(), diesel::result::Error> {
209        SqliteExporter::housekeeping(&mut self.db, self.retention, self.record_limit, false);
210        self.last_housekeeping = Instant::now();
211        Ok(())
212    }
213    fn should_flush(&self) -> bool {
214        if self.last_flush.elapsed() > self.flush_duration {
215            true
216        } else if self.queue.len() >= FLUSH_QUEUE_LIMIT {
217            debug!("Flushing due to queue size ({} items)", self.queue.len());
218            true
219        } else {
220            false
221        }
222    }
223    fn flush(&mut self) -> Result<(), diesel::result::Error> {
224        use crate::schema::metrics::dsl::metrics;
225        if self.queue.is_empty() {
226            self.last_flush = Instant::now();
227            return Ok(());
228        }
229        let drain_buffer: Vec<NewMetric> = self.queue.drain(..).collect();
230        let db = &mut self.db;
231        let transaction_result = db.transaction::<_, diesel::result::Error, _>(|db| {
232            let chunk_size = INSERT_BATCH_SIZE.max(1);
233            for chunk in drain_buffer.chunks(chunk_size) {
234                insert_into(metrics).values(chunk).execute(db)?;
235            }
236            Ok(())
237        });
238        match transaction_result {
239            Ok(()) => {
240                self.last_flush = Instant::now();
241                Ok(())
242            }
243            Err(e) => {
244                self.queue.extend(drain_buffer);
245                Err(e)
246            }
247        }
248    }
249    fn queue_metric(&mut self, timestamp: Duration, key: &str, value: f64) -> Result<()> {
250        let metric_key_id = match self.key_ids.get(key) {
251            Some(key) => *key,
252            None => {
253                debug!("Looking up {}", key);
254                let key_id = MetricKey::key_by_name(key, &mut self.db)?.id;
255                self.key_ids.insert(key.to_string(), key_id);
256                key_id
257            }
258        };
259        let metric = NewMetric {
260            timestamp: timestamp.as_secs_f64(),
261            metric_key_id,
262            value: value as _,
263        };
264        self.queue.push_back(metric);
265        Ok(())
266    }
267
268    // --- Summary/Average additions
269
270    pub fn metrics_summary_for_signpost_and_keys(
271        &mut self,
272        signpost: String,
273        metrics: Vec<String>,
274    ) -> Result<HashMap<String, f64>> {
275        query::metrics_summary_for_signpost_and_keys(&mut self.db, &signpost, metrics)
276    }
277}
278
279fn run_worker(
280    db: SqliteConnection,
281    receiver: Receiver<Event>,
282    flush_duration: Duration,
283) -> JoinHandle<()> {
284    thread::Builder::new()
285        .name("metrics-sqlite: worker".to_string())
286        .spawn(move || {
287            let mut state = InnerState::new(flush_duration, db);
288            info!("SQLite worker started");
289            loop {
290                // Check if we need to flush based on elapsed time
291                let time_based_flush = state.last_flush.elapsed() >= flush_duration;
292
293                let (should_flush, should_exit) = match receiver.recv_timeout(flush_duration) {
294                    Ok(Event::Stop) => {
295                        info!("Stopping SQLiteExporter worker, flushing & exiting");
296                        (true, true)
297                    }
298                    Ok(Event::SetHousekeeping {
299                        retention_period,
300                        housekeeping_period,
301                        record_limit,
302                    }) => {
303                        state.set_housekeeping(retention_period, housekeeping_period, record_limit);
304                        (false, false)
305                    }
306                    Ok(Event::DescribeKey(_key_type, key, unit, desc)) => {
307                        info!("Describing key {:?}", key);
308                        if let Err(e) = MetricKey::create_or_update(
309                            key.as_str(),
310                            unit,
311                            Some(desc.as_ref()),
312                            &mut state.db,
313                        ) {
314                            error!("Failed to create key entry: {:?}", e);
315                        }
316                        (false, false)
317                    }
318                    Ok(Event::RegisterKey(_key_type, _key, _handle)) => {
319                        // we currently don't do anything with register...
320                        (false, false)
321                    }
322                    Ok(Event::IncrementCounter(timestamp, key, value)) => {
323                        let key_name = key.name();
324                        let entry = state.counters.entry(key.clone()).or_insert(0);
325                        let value = {
326                            *entry += value;
327                            *entry
328                        };
329                        if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
330                            error!("Error queueing metric: {:?}", e);
331                        }
332
333                        (state.should_flush(), false)
334                    }
335                    Ok(Event::AbsoluteCounter(timestamp, key, value)) => {
336                        let key_name = key.name();
337                        state.counters.insert(key.clone(), value);
338                        if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
339                            error!("Error queueing metric: {:?}", e);
340                        }
341                        (state.should_flush(), false)
342                    }
343                    Ok(Event::UpdateGauge(timestamp, key, value)) => {
344                        let key_name = key.name();
345                        let entry = state.last_values.entry(key.clone()).or_insert(0.0);
346                        let value = match value {
347                            GaugeValue::Absolute(v) => {
348                                *entry = v;
349                                *entry
350                            }
351                            GaugeValue::Increment(v) => {
352                                *entry += v;
353                                *entry
354                            }
355                            GaugeValue::Decrement(v) => {
356                                *entry -= v;
357                                *entry
358                            }
359                        };
360                        if let Err(e) = state.queue_metric(timestamp, key_name, value) {
361                            error!("Error queueing metric: {:?}", e);
362                        }
363                        (state.should_flush(), false)
364                    }
365                    Ok(Event::UpdateHistogram(timestamp, key, value)) => {
366                        let key_name = key.name();
367                        if let Err(e) = state.queue_metric(timestamp, key_name, value) {
368                            error!("Error queueing metric: {:?}", e);
369                        }
370
371                        (state.should_flush(), false)
372                    }
373                    Ok(Event::RequestSummaryFromSignpost {
374                        signpost_key,
375                        keys,
376                        tx,
377                    }) => {
378                        match state.flush() {
379                            Ok(()) => match state
380                                .metrics_summary_for_signpost_and_keys(signpost_key, keys)
381                            {
382                                Ok(metrics) => {
383                                    if tx.send(Ok(metrics)).is_err() {
384                                        error!(
385                                            "Failed to respond with metrics results, discarding"
386                                        );
387                                    }
388                                }
389                                Err(e) => {
390                                    if let Err(e) = tx.send(Err(e)) {
391                                        error!(
392                                            "Failed to respond with metrics error result, discarding: {e:?}"
393                                        );
394                                    }
395                                }
396                            },
397                            Err(e) => {
398                                let err = MetricsError::from(e);
399                                error!(
400                                    "Failed to flush pending metrics before summary request: {err:?}"
401                                );
402                                if let Err(send_err) = tx.send(Err(err)) {
403                                    error!(
404                                        "Failed to respond with metrics flush error result, discarding: {send_err:?}"
405                                    );
406                                }
407                            }
408                        }
409                        (false, false)
410                    }
411                    Err(RecvTimeoutError::Timeout) => {
412                        (true, false)
413                    }
414                    Err(RecvTimeoutError::Disconnected) => {
415                        warn!("SQLiteExporter channel disconnected, exiting worker");
416                        (true, true)
417                    }
418                };
419
420                // Flush if time-based flush is triggered OR if event-based flush is triggered
421                if time_based_flush || should_flush {
422                    if time_based_flush {
423                        debug!("Flushing due to elapsed time ({}s)", flush_duration.as_secs());
424                    }
425                    if let Err(e) = state.flush() {
426                        error!("Error flushing metrics: {}", e);
427                    }
428                }
429                if state.should_housekeep() {
430                    if let Err(e) = state.housekeep() {
431                        error!("Failed running house keeping: {:?}", e);
432                    }
433                }
434                if should_exit {
435                    break;
436                }
437            }
438        })
439        .unwrap()
440}
441
442impl SqliteExporter {
443    /// Creates a new `SqliteExporter` that stores metrics in an SQLite database file.
444    ///
445    /// `flush_interval` specifies how often metrics are flushed to SQLite/disk
446    ///
447    /// `keep_duration` specifies how long data is kept before deleting, performed new()
448    pub fn new<P: AsRef<Path>>(
449        flush_interval: Duration,
450        keep_duration: Option<Duration>,
451        path: P,
452    ) -> Result<Self> {
453        let mut db = setup_db(path)?;
454        Self::housekeeping(&mut db, keep_duration, None, true);
455        let (sender, receiver) = std::sync::mpsc::sync_channel(BACKGROUND_CHANNEL_LIMIT);
456        let thread = run_worker(db, receiver, flush_interval);
457        let exporter = SqliteExporter {
458            thread: Some(thread),
459            sender,
460        };
461        Ok(exporter)
462    }
463
464    /// Sets optional periodic housekeeping, None to disable (disabled by default)
465    /// ## Notes
466    /// Periodic housekeeping can affect metric recording, causing some data to be dropped during housekeeping.
467    /// Record limit if set will cause anything over limit + 25% of the limit to be removed
468    pub fn set_periodic_housekeeping(
469        &self,
470        periodic_duration: Option<Duration>,
471        retention: Option<Duration>,
472        record_limit: Option<usize>,
473    ) {
474        if let Err(e) = self.sender.send(Event::SetHousekeeping {
475            retention_period: retention,
476            housekeeping_period: periodic_duration,
477            record_limit,
478        }) {
479            error!("Failed to set house keeping settings: {:?}", e);
480        }
481    }
482
483    /// Run housekeeping.
484    ///
485    /// Does nothing if None was given for keep_duration in `new()`
486    fn housekeeping(
487        db: &mut SqliteConnection,
488        keep_duration: Option<Duration>,
489        record_limit: Option<usize>,
490        vacuum: bool,
491    ) {
492        use crate::schema::metrics::dsl::*;
493        use diesel::dsl::count;
494        if let Some(keep_duration) = keep_duration {
495            match SystemTime::UNIX_EPOCH.elapsed() {
496                Ok(now) => {
497                    let cutoff = now - keep_duration;
498                    trace!("Deleting data {}s old", keep_duration.as_secs());
499                    if let Err(e) =
500                        diesel::delete(metrics.filter(timestamp.le(cutoff.as_secs_f64())))
501                            .execute(db)
502                    {
503                        error!("Failed to remove old metrics data: {}", e);
504                    }
505                    if vacuum {
506                        if let Err(e) = sql_query("VACUUM").execute(db) {
507                            error!("Failed to vacuum SQLite DB: {:?}", e);
508                        }
509                    }
510                }
511                Err(e) => {
512                    error!(
513                        "System time error, skipping metrics-sqlite housekeeping: {}",
514                        e
515                    );
516                }
517            }
518        }
519        if let Some(record_limit) = record_limit {
520            trace!("Checking for records over {} limit", record_limit);
521            match metrics.select(count(id)).first::<i64>(db) {
522                Ok(records) => {
523                    let records = records as usize;
524                    if records > record_limit {
525                        let excess = records - record_limit + (record_limit / 4); // delete excess + 25% of limit
526                        trace!(
527                            "Exceeded limit! {} > {}, deleting {} oldest",
528                            records,
529                            record_limit,
530                            excess
531                        );
532                        let query = format!("DELETE FROM metrics WHERE id IN (SELECT id FROM metrics ORDER BY timestamp ASC LIMIT {excess});");
533                        if let Err(e) = sql_query(query).execute(db) {
534                            error!("Failed to delete excessive records: {:?}", e);
535                        }
536                    }
537                }
538                Err(e) => {
539                    error!("Failed to get record count: {:?}", e);
540                }
541            }
542        }
543    }
544
545    /// Install recorder as `metrics` crate's Recorder
546    pub fn install(self) -> Result<SqliteExporterHandle, SetRecorderError<Self>> {
547        let handle = SqliteExporterHandle {
548            sender: self.sender.clone(),
549        };
550        metrics::set_global_recorder(self)?;
551        Ok(handle)
552    }
553}
554impl Drop for SqliteExporter {
555    fn drop(&mut self) {
556        let _ = self.sender.send(Event::Stop);
557        let _ = self.thread.take().unwrap().join();
558    }
559}
560
561#[cfg(test)]
562mod tests {
563    use crate::SqliteExporter;
564    use std::time::{Duration, Instant};
565
566    #[test]
567    fn test_threading() {
568        use std::thread;
569        SqliteExporter::new(Duration::from_millis(500), None, "metrics.db")
570            .unwrap()
571            .install()
572            .unwrap();
573        let joins: Vec<thread::JoinHandle<()>> = (0..5)
574            .map(|_| {
575                thread::spawn(move || {
576                    let start = Instant::now();
577                    loop {
578                        metrics::gauge!("rate").set(1.0);
579                        metrics::counter!("hits").increment(1);
580                        metrics::histogram!("histogram").record(5.0);
581                        if start.elapsed().as_secs() >= 5 {
582                            break;
583                        }
584                    }
585                })
586            })
587            .collect();
588        for j in joins {
589            j.join().unwrap();
590        }
591    }
592}