metrics_sqlite/
lib.rs

1#![deny(missing_docs)]
2//! # Metrics SQLite backend
3
4#[macro_use]
5extern crate diesel;
6#[macro_use]
7extern crate diesel_migrations;
8#[macro_use]
9extern crate log;
10
11use diesel::prelude::*;
12use diesel::{insert_into, sql_query};
13
14use metrics::{GaugeValue, Key, KeyName, SetRecorderError, SharedString, Unit};
15
16use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
17use std::sync::Arc;
18use std::{
19    collections::{HashMap, VecDeque},
20    path::Path,
21    sync::mpsc::{Receiver, RecvTimeoutError, SyncSender},
22    thread::{self, JoinHandle},
23    time::{Duration, Instant, SystemTime},
24};
25use thiserror::Error;
26
27/// Max number of items allowed in worker's queue before flushing regardless of flush duration
28const FLUSH_QUEUE_LIMIT: usize = 1000;
29const BACKGROUND_CHANNEL_LIMIT: usize = 8000;
30
31/// Error type for any db/vitals related errors
32#[derive(Debug, Error)]
33pub enum MetricsError {
34    /// Error with database
35    #[error("Database error: {0}")]
36    DbConnectionError(#[from] ConnectionError),
37    /// Error migrating database
38    #[error("Migration error: {0}")]
39    MigrationError(Box<dyn std::error::Error + Send + Sync>),
40    /// Error querying metrics DB
41    #[error("Error querying DB: {0}")]
42    QueryError(#[from] diesel::result::Error),
43    /// Error if path given is invalid
44    #[error("Invalid database path")]
45    InvalidDatabasePath,
46    /// IO Error with reader/writer
47    #[cfg(feature = "csv")]
48    #[error("IO Error: {0}")]
49    IoError(#[from] std::io::Error),
50    /// Error writing CSV
51    #[cfg(feature = "csv")]
52    #[error("CSV Error: {0}")]
53    CsvError(#[from] csv::Error),
54    /// Attempted to query database but found no records
55    #[error("Database has no metrics stored in it")]
56    EmptyDatabase,
57    /// Given metric key name wasn't found in the DB
58    #[error("Metric key {0} not found in database")]
59    KeyNotFound(String),
60}
61/// Metrics result type
62pub type Result<T, E = MetricsError> = std::result::Result<T, E>;
63
64mod metrics_db;
65mod models;
66mod recorder;
67mod schema;
68
69use crate::recorder::Handle;
70pub use metrics_db::{MetricsDb, Session};
71pub use models::{Metric, MetricKey, NewMetric};
72
73pub(crate) const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
74
75fn setup_db<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
76    let url = path
77        .as_ref()
78        .to_str()
79        .ok_or(MetricsError::InvalidDatabasePath)?;
80    let mut db = SqliteConnection::establish(url)?;
81    db.run_pending_migrations(MIGRATIONS)
82        .map_err(|e| MetricsError::MigrationError(e))?;
83
84    Ok(db)
85}
86enum RegisterType {
87    Counter,
88    Gauge,
89    Histogram,
90}
91
92enum Event {
93    Stop,
94    DescribeKey(RegisterType, KeyName, Option<Unit>, SharedString),
95    RegisterKey(RegisterType, Key, Arc<Handle>),
96    IncrementCounter(Duration, Key, u64),
97    AbsoluteCounter(Duration, Key, u64),
98    UpdateGauge(Duration, Key, GaugeValue),
99    UpdateHistogram(Duration, Key, f64),
100    SetHousekeeping {
101        retention_period: Option<Duration>,
102        housekeeping_period: Option<Duration>,
103        record_limit: Option<usize>,
104    },
105}
106
107/// Exports metrics by storing them in a SQLite database at a periodic interval
108pub struct SqliteExporter {
109    thread: Option<JoinHandle<()>>,
110    sender: SyncSender<Event>,
111}
112struct InnerState {
113    db: SqliteConnection,
114    last_housekeeping: Instant,
115    housekeeping: Option<Duration>,
116    retention: Option<Duration>,
117    record_limit: Option<usize>,
118    flush_duration: Duration,
119    last_flush: Instant,
120    last_values: HashMap<Key, f64>,
121    counters: HashMap<Key, u64>,
122    key_ids: HashMap<String, i64>,
123    queue: VecDeque<NewMetric>,
124}
125impl InnerState {
126    fn new(flush_duration: Duration, db: SqliteConnection) -> Self {
127        InnerState {
128            db,
129            last_housekeeping: Instant::now(),
130            housekeeping: None,
131            retention: None,
132            record_limit: None,
133            flush_duration,
134            last_flush: Instant::now(),
135            last_values: HashMap::new(),
136            counters: HashMap::new(),
137            key_ids: HashMap::new(),
138            queue: VecDeque::with_capacity(FLUSH_QUEUE_LIMIT),
139        }
140    }
141    fn set_housekeeping(
142        &mut self,
143        retention: Option<Duration>,
144        housekeeping_duration: Option<Duration>,
145        record_limit: Option<usize>,
146    ) {
147        self.retention = retention;
148        self.housekeeping = housekeeping_duration;
149        self.last_housekeeping = Instant::now();
150        self.record_limit = record_limit;
151    }
152    fn should_housekeep(&self) -> bool {
153        match self.housekeeping {
154            Some(duration) => self.last_housekeeping.elapsed() > duration,
155            None => false,
156        }
157    }
158    fn housekeep(&mut self) -> Result<(), diesel::result::Error> {
159        SqliteExporter::housekeeping(&mut self.db, self.retention, self.record_limit, false);
160        self.last_housekeeping = Instant::now();
161        Ok(())
162    }
163    fn should_flush(&self) -> bool {
164        if self.last_flush.elapsed() > self.flush_duration {
165            debug!("Flushing due to {}s timeout", self.flush_duration.as_secs());
166            true
167        } else {
168            self.queue.len() >= FLUSH_QUEUE_LIMIT
169        }
170    }
171    fn flush(&mut self) -> Result<(), diesel::result::Error> {
172        use crate::schema::metrics::dsl::metrics;
173        // trace!("Flushing {} records", self.queue.len());
174        let db = &mut self.db;
175        let queue = self.queue.drain(..);
176        db.transaction::<_, diesel::result::Error, _>(|db| {
177            for rec in queue {
178                insert_into(metrics).values(&rec).execute(db)?;
179            }
180            Ok(())
181        })?;
182        self.last_flush = Instant::now();
183        Ok(())
184    }
185    fn queue_metric(&mut self, timestamp: Duration, key: &str, value: f64) -> Result<()> {
186        let metric_key_id = match self.key_ids.get(key) {
187            Some(key) => *key,
188            None => {
189                debug!("Looking up {}", key);
190                let key_id = MetricKey::key_by_name(key, &mut self.db)?.id;
191                self.key_ids.insert(key.to_string(), key_id);
192                key_id
193            }
194        };
195        let metric = NewMetric {
196            timestamp: timestamp.as_secs_f64(),
197            metric_key_id,
198            value: value as _,
199        };
200        self.queue.push_back(metric);
201        Ok(())
202    }
203}
204
205fn run_worker(
206    db: SqliteConnection,
207    receiver: Receiver<Event>,
208    flush_duration: Duration,
209) -> JoinHandle<()> {
210    thread::Builder::new()
211        .name("metrics-sqlite: worker".to_string())
212        .spawn(move || {
213            let mut state = InnerState::new(flush_duration, db);
214            info!("SQLite worker started");
215            loop {
216                let (should_flush, should_exit) = match receiver.recv_timeout(flush_duration) {
217                    Ok(Event::Stop) => {
218                        info!("Stopping SQLiteExporter worker, flushing & exiting");
219                        (true, true)
220                    }
221                    Ok(Event::SetHousekeeping {
222                        retention_period,
223                        housekeeping_period,
224                        record_limit,
225                    }) => {
226                        state.set_housekeeping(retention_period, housekeeping_period, record_limit);
227                        (false, false)
228                    }
229                    Ok(Event::DescribeKey(_key_type, key, unit, desc)) => {
230                        info!("Describing key {:?}", key);
231                        if let Err(e) = MetricKey::create_or_update(
232                            key.as_str(),
233                            unit,
234                            Some(desc.as_ref()),
235                            &mut state.db,
236                        ) {
237                            error!("Failed to create key entry: {:?}", e);
238                        }
239                        (false, false)
240                    }
241                    Ok(Event::RegisterKey(_key_type, _key, _handle)) => {
242                        // we currently don't do anything with register...
243                        (false, false)
244                    }
245                    Ok(Event::IncrementCounter(timestamp, key, value)) => {
246                        let key_str = key.name().to_string();
247                        let entry = state.counters.entry(key).or_insert(0);
248                        let value = {
249                            *entry += value;
250                            *entry
251                        };
252                        if let Err(e) = state.queue_metric(timestamp, &key_str, value as _) {
253                            error!("Error queueing metric: {:?}", e);
254                        }
255
256                        (state.should_flush(), false)
257                    }
258                    Ok(Event::AbsoluteCounter(timestamp, key, value)) => {
259                        let key_str = key.name().to_string();
260                        state.counters.insert(key, value);
261                        if let Err(e) = state.queue_metric(timestamp, &key_str, value as _) {
262                            error!("Error queueing metric: {:?}", e);
263                        }
264                        (state.should_flush(), false)
265                    }
266                    Ok(Event::UpdateGauge(timestamp, key, value)) => {
267                        let key_str = key.name().to_string();
268                        let entry = state.last_values.entry(key).or_insert(0.0);
269                        let value = match value {
270                            GaugeValue::Absolute(v) => {
271                                *entry = v;
272                                *entry
273                            }
274                            GaugeValue::Increment(v) => {
275                                *entry += v;
276                                *entry
277                            }
278                            GaugeValue::Decrement(v) => {
279                                *entry -= v;
280                                *entry
281                            }
282                        };
283                        if let Err(e) = state.queue_metric(timestamp, &key_str, value) {
284                            error!("Error queueing metric: {:?}", e);
285                        }
286                        (state.should_flush(), false)
287                    }
288                    Ok(Event::UpdateHistogram(timestamp, key, value)) => {
289                        let key_str = key.name().to_string();
290                        if let Err(e) = state.queue_metric(timestamp, &key_str, value) {
291                            error!("Error queueing metric: {:?}", e);
292                        }
293
294                        (state.should_flush(), false)
295                    }
296                    Err(RecvTimeoutError::Timeout) => {
297                        debug!("Flushing due to {}s timeout", flush_duration.as_secs());
298                        (true, false)
299                    }
300                    Err(RecvTimeoutError::Disconnected) => {
301                        warn!("SQLiteExporter channel disconnected, exiting worker");
302                        (true, true)
303                    }
304                };
305                if should_flush {
306                    if let Err(e) = state.flush() {
307                        error!("Error flushing metrics: {}", e);
308                    }
309                }
310                if state.should_housekeep() {
311                    if let Err(e) = state.housekeep() {
312                        error!("Failed running house keeping: {:?}", e);
313                    }
314                }
315                if should_exit {
316                    break;
317                }
318            }
319        })
320        .unwrap()
321}
322
323impl SqliteExporter {
324    /// Creates a new `SqliteExporter` that stores metrics in a SQLite database file.
325    ///
326    /// `flush_interval` specifies how often metrics are flushed to SQLite/disk
327    ///
328    /// `keep_duration` specifies how long data is kept before deleting, performed new()
329    pub fn new<P: AsRef<Path>>(
330        flush_interval: Duration,
331        keep_duration: Option<Duration>,
332        path: P,
333    ) -> Result<Self> {
334        let mut db = setup_db(path)?;
335        Self::housekeeping(&mut db, keep_duration, None, true);
336        let (sender, receiver) = std::sync::mpsc::sync_channel(BACKGROUND_CHANNEL_LIMIT);
337        let thread = run_worker(db, receiver, flush_interval);
338        let exporter = SqliteExporter {
339            thread: Some(thread),
340            sender,
341        };
342        Ok(exporter)
343    }
344
345    /// Sets optional periodic house keeping, None to disable (disabled by default)
346    /// ## Notes
347    /// Periodic house keeping can affect metric recording, causing some data to be dropped during house keeping.
348    /// Record limit if set will cause anything over limit + 25% of limit to be removed
349    pub fn set_periodic_housekeeping(
350        &self,
351        periodic_duration: Option<Duration>,
352        retention: Option<Duration>,
353        record_limit: Option<usize>,
354    ) {
355        if let Err(e) = self.sender.send(Event::SetHousekeeping {
356            retention_period: retention,
357            housekeeping_period: periodic_duration,
358            record_limit,
359        }) {
360            error!("Failed to set house keeping settings: {:?}", e);
361        }
362    }
363
364    /// Run housekeeping.
365    ///
366    /// Does nothing if None was given for keep_duration in `new()`
367    fn housekeeping(
368        db: &mut SqliteConnection,
369        keep_duration: Option<Duration>,
370        record_limit: Option<usize>,
371        vacuum: bool,
372    ) {
373        use crate::schema::metrics::dsl::*;
374        use diesel::dsl::count;
375        if let Some(keep_duration) = keep_duration {
376            match SystemTime::UNIX_EPOCH.elapsed() {
377                Ok(now) => {
378                    let cutoff = now - keep_duration;
379                    trace!("Deleting data {}s old", keep_duration.as_secs());
380                    if let Err(e) =
381                        diesel::delete(metrics.filter(timestamp.le(cutoff.as_secs_f64())))
382                            .execute(db)
383                    {
384                        error!("Failed to remove old metrics data: {}", e);
385                    }
386                    if vacuum {
387                        if let Err(e) = sql_query("VACUUM").execute(db) {
388                            error!("Failed to vacuum SQLite DB: {:?}", e);
389                        }
390                    }
391                }
392                Err(e) => {
393                    error!(
394                        "System time error, skipping metrics-sqlite housekeeping: {}",
395                        e
396                    );
397                }
398            }
399        }
400        if let Some(record_limit) = record_limit {
401            trace!("Checking for records over {} limit", record_limit);
402            match metrics.select(count(id)).first::<i64>(db) {
403                Ok(records) => {
404                    let records = records as usize;
405                    if records > record_limit {
406                        let excess = records - record_limit + (record_limit / 4); // delete excess + 25% of limit
407                        trace!(
408                            "Exceeded limit! {} > {}, deleting {} oldest",
409                            records,
410                            record_limit,
411                            excess
412                        );
413                        let query = format!("DELETE FROM metrics WHERE id IN (SELECT id FROM metrics ORDER BY timestamp ASC LIMIT {});", excess);
414                        if let Err(e) = sql_query(query).execute(db) {
415                            error!("Failed to delete excessive records: {:?}", e);
416                        }
417                    }
418                }
419                Err(e) => {
420                    error!("Failed to get record count: {:?}", e);
421                }
422            }
423        }
424    }
425
426    /// Install recorder as `metrics` crate's Recorder
427    pub fn install(self) -> Result<(), SetRecorderError> {
428        metrics::set_boxed_recorder(Box::new(self))
429    }
430}
431impl Drop for SqliteExporter {
432    fn drop(&mut self) {
433        let _ = self.sender.send(Event::Stop);
434        let _ = self.thread.take().unwrap().join();
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    use crate::SqliteExporter;
441    use std::time::{Duration, Instant};
442
443    #[test]
444    fn test_threading() {
445        use std::thread;
446        SqliteExporter::new(Duration::from_millis(500), None, "metrics.db")
447            .unwrap()
448            .install()
449            .unwrap();
450        let joins: Vec<thread::JoinHandle<()>> = (0..5)
451            .map(|_| {
452                thread::spawn(move || {
453                    let start = Instant::now();
454                    loop {
455                        metrics::gauge!("rate", 1.0);
456                        metrics::increment_counter!("hits");
457                        metrics::histogram!("histogram", 5.0);
458                        if start.elapsed().as_secs() >= 5 {
459                            break;
460                        }
461                    }
462                })
463            })
464            .collect();
465        for j in joins {
466            j.join().unwrap();
467        }
468    }
469}