Skip to main content

metrics_sqlite/
lib.rs

1#![deny(missing_docs)]
2//! # Metrics SQLite backend
3
4#[macro_use]
5extern crate diesel;
6#[macro_use]
7extern crate diesel_migrations;
8use tracing::{debug, error, info, trace, warn};
9
10use diesel::prelude::*;
11use diesel::{insert_into, sql_query};
12
13use metrics::{GaugeValue, Key, KeyName, SetRecorderError, SharedString, Unit};
14
15use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
16use std::sync::Mutex;
17use std::{
18    collections::{HashMap, VecDeque},
19    path::{Path, PathBuf},
20    sync::mpsc::{Receiver, RecvTimeoutError, SyncSender},
21    thread::{self, JoinHandle},
22    time::{Duration, Instant, SystemTime},
23};
24use thiserror::Error;
25
26/// Max number of items allowed in worker's queue before flushing regardless of flush duration
27const FLUSH_QUEUE_LIMIT: usize = 1000;
28const BACKGROUND_CHANNEL_LIMIT: usize = 8000;
29const SQLITE_DEFAULT_MAX_VARIABLES: usize = 999;
30const METRIC_FIELDS_PER_ROW: usize = 3;
31const INSERT_BATCH_SIZE: usize = SQLITE_DEFAULT_MAX_VARIABLES / METRIC_FIELDS_PER_ROW;
32/// Hard cap on metrics buffered in memory by the worker. If flushing to SQLite
33/// keeps failing, the oldest metrics beyond this limit are dropped so a broken
34/// database can never grow the queue without bound.
35const QUEUE_HARD_LIMIT: usize = 100_000;
36/// Number of consecutive failed flushes after which the worker rebuilds its
37/// database connection, even if the error didn't look connection-fatal.
38const RECONNECT_AFTER_FAILURES: u64 = 3;
39/// Minimum delay between worker database reconnection attempts.
40const RECONNECT_BACKOFF: Duration = Duration::from_secs(30);
41/// Minimum delay between repeated error log lines of the same kind.
42const ERROR_LOG_INTERVAL: Duration = Duration::from_secs(60);
43
44/// Rate limiter for repetitive error logs. Emits at most one log per interval
45/// and reports how many were suppressed in between.
46struct LogThrottle {
47    interval: Duration,
48    last_logged: Option<Instant>,
49    suppressed: u64,
50}
51impl LogThrottle {
52    const fn new(interval: Duration) -> Self {
53        LogThrottle {
54            interval,
55            last_logged: None,
56            suppressed: 0,
57        }
58    }
59    /// Returns `Some(suppressed_since_last_log)` when a log line should be
60    /// emitted now, or `None` when it should be suppressed.
61    fn allow(&mut self) -> Option<u64> {
62        let now = Instant::now();
63        let due = match self.last_logged {
64            Some(last) => now.duration_since(last) >= self.interval,
65            None => true,
66        };
67        if due {
68            self.last_logged = Some(now);
69            Some(std::mem::take(&mut self.suppressed))
70        } else {
71            self.suppressed += 1;
72            None
73        }
74    }
75
76    /// Invokes `emit` with the count of previously-suppressed lines, but only
77    /// if enough time has passed since the last emission. Otherwise the call
78    /// is silently dropped and the suppression counter is incremented.
79    fn log_if_due(&mut self, emit: impl FnOnce(u64)) {
80        if let Some(suppressed) = self.allow() {
81            emit(suppressed);
82        }
83    }
84}
85
86/// Error type for any db/vitals related errors
87#[derive(Debug, Error)]
88pub enum MetricsError {
89    /// Error with database
90    #[error("Database error: {0}")]
91    DbConnectionError(#[from] ConnectionError),
92    /// Error migrating database
93    #[error("Migration error: {0}")]
94    MigrationError(Box<dyn std::error::Error + Send + Sync>),
95    /// Error querying metrics DB
96    #[error("Error querying DB: {0}")]
97    QueryError(#[from] diesel::result::Error),
98    /// Error if the path given is invalid
99    #[error("Invalid database path")]
100    InvalidDatabasePath,
101    /// IO Error with reader/writer
102    #[cfg(feature = "csv")]
103    #[error("IO Error: {0}")]
104    IoError(#[from] std::io::Error),
105    /// Error writing CSV
106    #[cfg(feature = "csv")]
107    #[error("CSV Error: {0}")]
108    CsvError(#[from] csv::Error),
109    /// Attempted to query the database but found no records
110    #[error("Database has no metrics stored in it")]
111    EmptyDatabase,
112    /// Given metric key name wasn't found in the DB
113    #[error("Metric key {0} not found in database")]
114    KeyNotFound(String),
115    /// Attempting to communicate with exporter but it's gone away
116    #[error("Exporter task has been stopped or crashed")]
117    ExporterUnavailable,
118    /// Session derived from the signpost has zero duration
119    #[error("Session for signpost `{0}` has zero duration")]
120    ZeroLengthSession(String),
121    /// No metrics available for the requested key inside the derived session
122    #[error("No metrics recorded for `{0}` in requested session")]
123    NoMetricsForKey(String),
124}
125
126impl MetricsError {
127    /// Check if this error indicates a malformed/corrupt database
128    fn is_malformed_db(&self) -> bool {
129        self.to_string().contains("malformed")
130    }
131}
132
133/// Metrics result type
134pub type Result<T, E = MetricsError> = std::result::Result<T, E>;
135
136mod metrics_db;
137mod models;
138mod recorder;
139mod schema;
140
141use crate::metrics_db::query;
142pub use metrics_db::{MetricsDb, Session};
143pub use models::{Metric, MetricKey, NewMetric};
144
145pub(crate) const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
146
147#[derive(QueryableByName)]
148struct PragmaCheckResult {
149    #[diesel(sql_type = diesel::sql_types::Text)]
150    #[diesel(column_name = quick_check)]
151    result: String,
152}
153
154/// Remove database file and its WAL/SHM sidecar files
155fn remove_db_files(path: &Path) {
156    let db_path = PathBuf::from(path);
157    for suffix in &["", "-wal", "-shm"] {
158        let mut file_path = db_path.clone().into_os_string();
159        file_path.push(suffix);
160        let file_path = PathBuf::from(file_path);
161        if file_path.exists() {
162            if let Err(e) = std::fs::remove_file(&file_path) {
163                error!("Failed to remove {}: {}", file_path.display(), e);
164            } else {
165                info!("Removed corrupt database file: {}", file_path.display());
166            }
167        }
168    }
169}
170
171fn setup_db<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
172    let url = path
173        .as_ref()
174        .to_str()
175        .ok_or(MetricsError::InvalidDatabasePath)?;
176    let mut db = SqliteConnection::establish(url)?;
177
178    // Enable WAL mode for better concurrent access
179    sql_query("PRAGMA journal_mode=WAL;").execute(&mut db)?;
180
181    // Set busy timeout to 5 seconds to handle lock contention gracefully
182    sql_query("PRAGMA busy_timeout = 5000;").execute(&mut db)?;
183
184    db.run_pending_migrations(MIGRATIONS)
185        .map_err(MetricsError::MigrationError)?;
186
187    // Check for corruption that may not surface until queries run
188    let check: String = sql_query("PRAGMA quick_check;")
189        .get_result::<PragmaCheckResult>(&mut db)?
190        .result;
191    if check != "ok" {
192        return Err(MetricsError::QueryError(
193            diesel::result::Error::DatabaseError(
194                diesel::result::DatabaseErrorKind::Unknown,
195                Box::new(format!("database disk image is malformed: {check}")),
196            ),
197        ));
198    }
199
200    Ok(db)
201}
202
203/// Like `setup_db`, but if the database is malformed, removes it and retries
204/// once. The boolean in the success result is `true` when a reset actually
205/// happened, so callers can drop any cached state that referenced the old
206/// database (notably `metric_keys` row ids).
207fn setup_db_or_reset<P: AsRef<Path>>(path: P) -> Result<(SqliteConnection, bool)> {
208    let path = path.as_ref();
209    match setup_db(path) {
210        Ok(db) => Ok((db, false)),
211        Err(err) if err.is_malformed_db() => {
212            warn!(
213                "Database is malformed, removing and recreating: {}",
214                path.display()
215            );
216            remove_db_files(path);
217            setup_db(path).map(|db| (db, true))
218        }
219        Err(err) => Err(err),
220    }
221}
222enum RegisterType {
223    Counter,
224    Gauge,
225    Histogram,
226}
227
228enum Event {
229    Stop,
230    DescribeKey(RegisterType, KeyName, Option<Unit>, SharedString),
231    IncrementCounter(Duration, Key, u64),
232    AbsoluteCounter(Duration, Key, u64),
233    UpdateGauge(Duration, Key, GaugeValue),
234    UpdateHistogram(Duration, Key, f64),
235    SetHousekeeping {
236        retention_period: Option<Duration>,
237        housekeeping_period: Option<Duration>,
238        record_limit: Option<usize>,
239    },
240    RequestSummaryFromSignpost {
241        signpost_key: String,
242        keys: Vec<String>,
243        tx: tokio::sync::oneshot::Sender<Result<HashMap<String, f64>>>,
244    },
245}
246
247/// Handle for continued communication with sqlite exporter
248pub struct SqliteExporterHandle {
249    sender: SyncSender<Event>,
250}
251impl SqliteExporterHandle {
252    /// Request average metrics from a signpost to latest from exporter's DB
253    pub fn request_average_metrics(
254        &self,
255        from_signpost: &str,
256        with_keys: &[&str],
257    ) -> Result<HashMap<String, f64>> {
258        let (tx, rx) = tokio::sync::oneshot::channel();
259        self.sender
260            .send(Event::RequestSummaryFromSignpost {
261                signpost_key: from_signpost.to_string(),
262                keys: with_keys.iter().map(|s| s.to_string()).collect(),
263                tx,
264            })
265            .map_err(|_| MetricsError::ExporterUnavailable)?;
266        match rx.blocking_recv() {
267            Ok(metrics) => Ok(metrics?),
268            Err(_) => Err(MetricsError::ExporterUnavailable),
269        }
270    }
271}
272
273/// Exports metrics by storing them in an SQLite database at a periodic interval
274pub struct SqliteExporter {
275    thread: Option<JoinHandle<()>>,
276    sender: SyncSender<Event>,
277    send_error_throttle: Mutex<LogThrottle>,
278}
279struct InnerState {
280    db: SqliteConnection,
281    db_path: PathBuf,
282    last_housekeeping: Instant,
283    housekeeping: Option<Duration>,
284    retention: Option<Duration>,
285    record_limit: Option<usize>,
286    flush_duration: Duration,
287    last_flush: Instant,
288    last_values: HashMap<Key, f64>,
289    counters: HashMap<Key, u64>,
290    key_ids: HashMap<String, i64>,
291    queue: VecDeque<NewMetric>,
292    consecutive_flush_failures: u64,
293    last_reconnect: Option<Instant>,
294}
295impl InnerState {
296    fn new(flush_duration: Duration, db: SqliteConnection, db_path: PathBuf) -> Self {
297        InnerState {
298            db,
299            db_path,
300            last_housekeeping: Instant::now(),
301            housekeeping: None,
302            retention: None,
303            record_limit: None,
304            flush_duration,
305            last_flush: Instant::now(),
306            last_values: HashMap::new(),
307            counters: HashMap::new(),
308            key_ids: HashMap::new(),
309            queue: VecDeque::with_capacity(FLUSH_QUEUE_LIMIT),
310            consecutive_flush_failures: 0,
311            last_reconnect: None,
312        }
313    }
314    fn set_housekeeping(
315        &mut self,
316        retention: Option<Duration>,
317        housekeeping_duration: Option<Duration>,
318        record_limit: Option<usize>,
319    ) {
320        self.retention = retention;
321        self.housekeeping = housekeeping_duration;
322        self.last_housekeeping = Instant::now();
323        self.record_limit = record_limit;
324    }
325    fn should_housekeep(&self) -> bool {
326        match self.housekeeping {
327            Some(duration) => self.last_housekeeping.elapsed() > duration,
328            None => false,
329        }
330    }
331    fn housekeep(&mut self) -> Result<(), diesel::result::Error> {
332        SqliteExporter::housekeeping(&mut self.db, self.retention, self.record_limit, false);
333        self.last_housekeeping = Instant::now();
334        Ok(())
335    }
336    fn should_flush(&self) -> bool {
337        if self.last_flush.elapsed() > self.flush_duration {
338            true
339        } else if self.queue.len() >= FLUSH_QUEUE_LIMIT {
340            debug!("Flushing due to queue size ({} items)", self.queue.len());
341            true
342        } else {
343            false
344        }
345    }
346    fn flush(&mut self) -> Result<(), diesel::result::Error> {
347        if self.queue.is_empty() {
348            self.last_flush = Instant::now();
349            return Ok(());
350        }
351        // Operate on the queue in-place: pass the deque's two backing slices
352        // directly to the insert. On failure we leave the queue alone, so a
353        // broken database stays at zero memcpy cost per attempt — the cascade
354        // that filled the channel in the original incident is what made each
355        // failed flush O(queue_size) by draining and re-extending.
356        let (front, back) = self.queue.as_slices();
357        match Self::insert_metrics(&mut self.db, [front, back]) {
358            Ok(()) => {
359                self.queue.clear();
360                self.last_flush = Instant::now();
361                self.consecutive_flush_failures = 0;
362                Ok(())
363            }
364            Err(e) => {
365                self.consecutive_flush_failures += 1;
366                // Queue is intact; just cap memory.
367                self.enforce_queue_cap();
368                // A broken transaction manager never heals on its own, so the
369                // connection has to be rebuilt; also rebuild after repeated
370                // failures of any kind as a backstop.
371                if Self::is_connection_fatal(&e)
372                    || self.consecutive_flush_failures >= RECONNECT_AFTER_FAILURES
373                {
374                    self.reconnect();
375                }
376                Err(e)
377            }
378        }
379    }
380
381    /// Inserts every metric in a single transaction, batched to stay under
382    /// SQLite's bound-variable limit. Accepts multiple slices so a `VecDeque`
383    /// can be inserted in-place without copying into a contiguous buffer.
384    fn insert_metrics<'a, S>(
385        db: &mut SqliteConnection,
386        slabs: S,
387    ) -> Result<(), diesel::result::Error>
388    where
389        S: IntoIterator<Item = &'a [NewMetric]>,
390    {
391        use crate::schema::metrics::dsl::metrics;
392        db.transaction::<_, diesel::result::Error, _>(|db| {
393            let chunk_size = INSERT_BATCH_SIZE.max(1);
394            for slab in slabs {
395                for chunk in slab.chunks(chunk_size) {
396                    insert_into(metrics).values(chunk).execute(db)?;
397                }
398            }
399            Ok(())
400        })
401    }
402
403    /// Drops the oldest queued metrics if the queue has grown past its hard
404    /// limit, so a database that stays unreachable can't exhaust memory.
405    fn enforce_queue_cap(&mut self) {
406        if self.queue.len() > QUEUE_HARD_LIMIT {
407            let overflow = self.queue.len() - QUEUE_HARD_LIMIT;
408            self.queue.drain(..overflow);
409            warn!(
410                "metrics-sqlite queue exceeded {} items while flushing kept failing, dropped {} oldest metrics",
411                QUEUE_HARD_LIMIT, overflow
412            );
413        }
414    }
415
416    /// True for errors that leave the connection permanently unusable, where
417    /// the only recovery is to rebuild it.
418    fn is_connection_fatal(e: &diesel::result::Error) -> bool {
419        matches!(e, diesel::result::Error::BrokenTransactionManager)
420    }
421
422    /// Rebuilds the SQLite connection after a fatal error, subject to a backoff
423    /// so a permanently broken database can't cause a reconnect storm.
424    fn reconnect(&mut self) {
425        if let Some(last) = self.last_reconnect {
426            if last.elapsed() < RECONNECT_BACKOFF {
427                return;
428            }
429        }
430        self.last_reconnect = Some(Instant::now());
431        warn!(
432            "metrics-sqlite database connection is broken, reconnecting to {}",
433            self.db_path.display()
434        );
435        match setup_db_or_reset(&self.db_path) {
436            Ok((db, was_reset)) => {
437                self.db = db;
438                // Cached key ids may be stale if the database was recreated.
439                self.key_ids.clear();
440                self.consecutive_flush_failures = 0;
441                if was_reset {
442                    // The database was malformed and recreated, so any rows we
443                    // had queued reference `metric_key_id` values from the old
444                    // `metric_keys` table. Inserting them now would orphan or
445                    // misattribute them in the join, so drop the queue.
446                    let dropped = self.queue.len();
447                    if dropped > 0 {
448                        warn!(
449                            "metrics-sqlite database was recreated; dropping {dropped} queued metrics with stale key ids"
450                        );
451                        self.queue.clear();
452                    }
453                }
454                info!("metrics-sqlite database connection re-established");
455            }
456            Err(e) => {
457                error!("metrics-sqlite failed to reconnect to database: {:?}", e);
458            }
459        }
460    }
461    fn queue_metric(&mut self, timestamp: Duration, key: &str, value: f64) -> Result<()> {
462        let metric_key_id = match self.key_ids.get(key) {
463            Some(key) => *key,
464            None => {
465                debug!("Looking up {}", key);
466                let key_id = MetricKey::key_by_name(key, &mut self.db)?.id;
467                self.key_ids.insert(key.to_string(), key_id);
468                key_id
469            }
470        };
471        let metric = NewMetric {
472            timestamp: timestamp.as_secs_f64(),
473            metric_key_id,
474            value: value as _,
475        };
476        self.queue.push_back(metric);
477        Ok(())
478    }
479
480    // --- Summary/Average additions
481
482    pub fn metrics_summary_for_signpost_and_keys(
483        &mut self,
484        signpost: String,
485        metrics: Vec<String>,
486    ) -> Result<HashMap<String, f64>> {
487        query::metrics_summary_for_signpost_and_keys(&mut self.db, &signpost, metrics)
488    }
489}
490
491fn run_worker(
492    db: SqliteConnection,
493    db_path: PathBuf,
494    receiver: Receiver<Event>,
495    flush_duration: Duration,
496) -> JoinHandle<()> {
497    thread::Builder::new()
498        .name("metrics-sqlite: worker".to_string())
499        .spawn(move || {
500            let mut state = InnerState::new(flush_duration, db, db_path);
501            let mut flush_error_throttle = LogThrottle::new(ERROR_LOG_INTERVAL);
502            let mut queue_error_throttle = LogThrottle::new(ERROR_LOG_INTERVAL);
503            info!("SQLite worker started");
504            loop {
505                // Check if we need to flush based on elapsed time
506                let time_based_flush = state.last_flush.elapsed() >= flush_duration;
507
508                let mut should_flush = false;
509                let mut should_exit = false;
510                match receiver.recv_timeout(flush_duration) {
511                    Ok(Event::Stop) => {
512                        info!("Stopping SQLiteExporter worker, flushing & exiting");
513                        should_flush = true;
514                        should_exit = true;
515                    }
516                    Ok(Event::SetHousekeeping {
517                        retention_period,
518                        housekeeping_period,
519                        record_limit,
520                    }) => {
521                        state.set_housekeeping(retention_period, housekeeping_period, record_limit);
522                    }
523                    Ok(Event::DescribeKey(_key_type, key, unit, desc)) => {
524                        info!("Describing key {:?}", key);
525                        if let Err(e) = MetricKey::create_or_update(
526                            key.as_str(),
527                            unit,
528                            Some(desc.as_ref()),
529                            &mut state.db,
530                        ) {
531                            error!("Failed to create key entry: {:?}", e);
532                        }
533                    }
534                    Ok(Event::IncrementCounter(timestamp, key, value)) => {
535                        let key_name = key.name();
536                        let entry = state.counters.entry(key.clone()).or_insert(0);
537                        let value = {
538                            *entry += value;
539                            *entry
540                        };
541                        if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
542                            queue_error_throttle.log_if_due(|suppressed| {
543                                if suppressed > 0 {
544                                    error!(
545                                        "Error queueing metric: {:?} ({} similar errors suppressed in the last {}s)",
546                                        e,
547                                        suppressed,
548                                        ERROR_LOG_INTERVAL.as_secs()
549                                    );
550                                } else {
551                                    error!("Error queueing metric: {:?}", e);
552                                }
553                            });
554                        }
555                        should_flush = state.should_flush();
556                    }
557                    Ok(Event::AbsoluteCounter(timestamp, key, value)) => {
558                        let key_name = key.name();
559                        state.counters.insert(key.clone(), value);
560                        if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
561                            queue_error_throttle.log_if_due(|suppressed| {
562                                if suppressed > 0 {
563                                    error!(
564                                        "Error queueing metric: {:?} ({} similar errors suppressed in the last {}s)",
565                                        e,
566                                        suppressed,
567                                        ERROR_LOG_INTERVAL.as_secs()
568                                    );
569                                } else {
570                                    error!("Error queueing metric: {:?}", e);
571                                }
572                            });
573                        }
574                        should_flush = state.should_flush();
575                    }
576                    Ok(Event::UpdateGauge(timestamp, key, value)) => {
577                        let key_name = key.name();
578                        let entry = state.last_values.entry(key.clone()).or_insert(0.0);
579                        let value = match value {
580                            GaugeValue::Absolute(v) => {
581                                *entry = v;
582                                *entry
583                            }
584                            GaugeValue::Increment(v) => {
585                                *entry += v;
586                                *entry
587                            }
588                            GaugeValue::Decrement(v) => {
589                                *entry -= v;
590                                *entry
591                            }
592                        };
593                        if let Err(e) = state.queue_metric(timestamp, key_name, value) {
594                            queue_error_throttle.log_if_due(|suppressed| {
595                                if suppressed > 0 {
596                                    error!(
597                                        "Error queueing metric: {:?} ({} similar errors suppressed in the last {}s)",
598                                        e,
599                                        suppressed,
600                                        ERROR_LOG_INTERVAL.as_secs()
601                                    );
602                                } else {
603                                    error!("Error queueing metric: {:?}", e);
604                                }
605                            });
606                        }
607                        should_flush = state.should_flush();
608                    }
609                    Ok(Event::UpdateHistogram(timestamp, key, value)) => {
610                        let key_name = key.name();
611                        if let Err(e) = state.queue_metric(timestamp, key_name, value) {
612                            queue_error_throttle.log_if_due(|suppressed| {
613                                if suppressed > 0 {
614                                    error!(
615                                        "Error queueing metric: {:?} ({} similar errors suppressed in the last {}s)",
616                                        e,
617                                        suppressed,
618                                        ERROR_LOG_INTERVAL.as_secs()
619                                    );
620                                } else {
621                                    error!("Error queueing metric: {:?}", e);
622                                }
623                            });
624                        }
625                        should_flush = state.should_flush();
626                    }
627                    Ok(Event::RequestSummaryFromSignpost {
628                        signpost_key,
629                        keys,
630                        tx,
631                    }) => {
632                        match state.flush() {
633                            Ok(()) => match state
634                                .metrics_summary_for_signpost_and_keys(signpost_key, keys)
635                            {
636                                Ok(metrics) => {
637                                    if tx.send(Ok(metrics)).is_err() {
638                                        error!(
639                                            "Failed to respond with metrics results, discarding"
640                                        );
641                                    }
642                                }
643                                Err(e) => {
644                                    if let Err(e) = tx.send(Err(e)) {
645                                        error!(
646                                            "Failed to respond with metrics error result, discarding: {e:?}"
647                                        );
648                                    }
649                                }
650                            },
651                            Err(e) => {
652                                let err = MetricsError::from(e);
653                                error!(
654                                    "Failed to flush pending metrics before summary request: {err:?}"
655                                );
656                                if let Err(send_err) = tx.send(Err(err)) {
657                                    error!(
658                                        "Failed to respond with metrics flush error result, discarding: {send_err:?}"
659                                    );
660                                }
661                            }
662                        }
663                    }
664                    Err(RecvTimeoutError::Timeout) => {
665                        should_flush = true;
666                    }
667                    Err(RecvTimeoutError::Disconnected) => {
668                        warn!("SQLiteExporter channel disconnected, exiting worker");
669                        should_flush = true;
670                        should_exit = true;
671                    }
672                }
673
674                // Flush if time-based flush is triggered OR if event-based flush is triggered
675                if time_based_flush || should_flush {
676                    if time_based_flush {
677                        debug!("Flushing due to elapsed time ({}s)", flush_duration.as_secs());
678                    }
679                    if let Err(e) = state.flush() {
680                        if let Some(suppressed) = flush_error_throttle.allow() {
681                            if suppressed > 0 {
682                                error!(
683                                    "Error flushing metrics: {} ({} similar errors suppressed in the last {}s)",
684                                    e,
685                                    suppressed,
686                                    ERROR_LOG_INTERVAL.as_secs()
687                                );
688                            } else {
689                                error!("Error flushing metrics: {}", e);
690                            }
691                        }
692                    }
693                }
694                if state.should_housekeep() {
695                    if let Err(e) = state.housekeep() {
696                        error!("Failed running house keeping: {:?}", e);
697                    }
698                }
699                if should_exit {
700                    break;
701                }
702            }
703        })
704        .unwrap()
705}
706
707impl SqliteExporter {
708    /// Creates a new `SqliteExporter` that stores metrics in an SQLite database file.
709    ///
710    /// `flush_interval` specifies how often metrics are flushed to SQLite/disk
711    ///
712    /// `keep_duration` specifies how long data is kept before deleting, performed new()
713    pub fn new<P: AsRef<Path>>(
714        flush_interval: Duration,
715        keep_duration: Option<Duration>,
716        path: P,
717    ) -> Result<Self> {
718        let path = path.as_ref().to_path_buf();
719        let (mut db, _was_reset) = setup_db_or_reset(&path)?;
720        Self::housekeeping(&mut db, keep_duration, None, true);
721        let (sender, receiver) = std::sync::mpsc::sync_channel(BACKGROUND_CHANNEL_LIMIT);
722        let thread = run_worker(db, path, receiver, flush_interval);
723        let exporter = SqliteExporter {
724            thread: Some(thread),
725            sender,
726            send_error_throttle: Mutex::new(LogThrottle::new(ERROR_LOG_INTERVAL)),
727        };
728        Ok(exporter)
729    }
730
731    /// Sets optional periodic housekeeping, None to disable (disabled by default)
732    /// ## Notes
733    /// Periodic housekeeping can affect metric recording, causing some data to be dropped during housekeeping.
734    /// Record limit if set will cause anything over limit + 25% of the limit to be removed
735    pub fn set_periodic_housekeeping(
736        &self,
737        periodic_duration: Option<Duration>,
738        retention: Option<Duration>,
739        record_limit: Option<usize>,
740    ) {
741        if let Err(e) = self.sender.send(Event::SetHousekeeping {
742            retention_period: retention,
743            housekeeping_period: periodic_duration,
744            record_limit,
745        }) {
746            error!("Failed to set house keeping settings: {:?}", e);
747        }
748    }
749
750    /// Run housekeeping.
751    ///
752    /// Does nothing if None was given for keep_duration in `new()`
753    fn housekeeping(
754        db: &mut SqliteConnection,
755        keep_duration: Option<Duration>,
756        record_limit: Option<usize>,
757        vacuum: bool,
758    ) {
759        use crate::schema::metrics::dsl::*;
760        use diesel::dsl::count;
761        if let Some(keep_duration) = keep_duration {
762            match SystemTime::UNIX_EPOCH.elapsed() {
763                Ok(now) => {
764                    let cutoff = now - keep_duration;
765                    trace!("Deleting data {}s old", keep_duration.as_secs());
766                    if let Err(e) =
767                        diesel::delete(metrics.filter(timestamp.le(cutoff.as_secs_f64())))
768                            .execute(db)
769                    {
770                        error!("Failed to remove old metrics data: {}", e);
771                    }
772                    if vacuum {
773                        if let Err(e) = sql_query("VACUUM").execute(db) {
774                            error!("Failed to vacuum SQLite DB: {:?}", e);
775                        }
776                    }
777                }
778                Err(e) => {
779                    error!(
780                        "System time error, skipping metrics-sqlite housekeeping: {}",
781                        e
782                    );
783                }
784            }
785        }
786        if let Some(record_limit) = record_limit {
787            trace!("Checking for records over {} limit", record_limit);
788            match metrics.select(count(id)).first::<i64>(db) {
789                Ok(records) => {
790                    let records = records as usize;
791                    if records > record_limit {
792                        let excess = records - record_limit + (record_limit / 4); // delete excess + 25% of limit
793                        trace!(
794                            "Exceeded limit! {} > {}, deleting {} oldest",
795                            records, record_limit, excess
796                        );
797                        let query = format!(
798                            "DELETE FROM metrics WHERE id IN (SELECT id FROM metrics ORDER BY timestamp ASC LIMIT {excess});"
799                        );
800                        if let Err(e) = sql_query(query).execute(db) {
801                            error!("Failed to delete excessive records: {:?}", e);
802                        }
803                    }
804                }
805                Err(e) => {
806                    error!("Failed to get record count: {:?}", e);
807                }
808            }
809        }
810    }
811
812    /// Logs a failure to hand an event to the worker, rate-limited so that a
813    /// full channel (sustained backpressure) can't flood the logs.
814    fn log_send_failure(&self, context: &str, err: &dyn std::fmt::Debug) {
815        if let Ok(mut throttle) = self.send_error_throttle.lock() {
816            if let Some(suppressed) = throttle.allow() {
817                if suppressed > 0 {
818                    error!(
819                        "Error sending metric {} to SQLite worker: {:?} ({} similar errors suppressed in the last {}s)",
820                        context,
821                        err,
822                        suppressed,
823                        ERROR_LOG_INTERVAL.as_secs()
824                    );
825                } else {
826                    error!(
827                        "Error sending metric {} to SQLite worker: {:?}",
828                        context, err
829                    );
830                }
831            }
832        }
833    }
834
835    /// Install recorder as `metrics` crate's Recorder
836    pub fn install(self) -> Result<SqliteExporterHandle, SetRecorderError<Self>> {
837        let handle = SqliteExporterHandle {
838            sender: self.sender.clone(),
839        };
840        metrics::set_global_recorder(self)?;
841        Ok(handle)
842    }
843}
844impl Drop for SqliteExporter {
845    fn drop(&mut self) {
846        let _ = self.sender.send(Event::Stop);
847        let _ = self.thread.take().unwrap().join();
848    }
849}
850
851#[cfg(test)]
852mod tests {
853    use crate::{
854        InnerState, LogThrottle, NewMetric, QUEUE_HARD_LIMIT, SqliteExporter, setup_db_or_reset,
855    };
856    use std::time::{Duration, Instant};
857
858    fn test_state() -> (InnerState, tempfile::TempDir) {
859        let dir = tempfile::tempdir().unwrap();
860        let path = dir.path().join("metrics.db");
861        let (db, _was_reset) = setup_db_or_reset(&path).unwrap();
862        (InnerState::new(Duration::from_secs(5), db, path), dir)
863    }
864
865    #[test]
866    fn enforce_queue_cap_drops_oldest_when_over_limit() {
867        let (mut state, _dir) = test_state();
868        // Queue more than the hard limit; values are tagged 0..N so we can tell
869        // which survived.
870        let total = QUEUE_HARD_LIMIT + 250;
871        for i in 0..total {
872            state.queue.push_back(NewMetric {
873                timestamp: 0.0,
874                metric_key_id: 1,
875                value: i as f64,
876            });
877        }
878        state.enforce_queue_cap();
879        // Queue is clamped to the limit and it's the oldest that were dropped.
880        assert_eq!(state.queue.len(), QUEUE_HARD_LIMIT);
881        assert_eq!(state.queue.front().unwrap().value, 250.0);
882        assert_eq!(state.queue.back().unwrap().value, (total - 1) as f64);
883        // A queue at or under the limit is left untouched.
884        state.enforce_queue_cap();
885        assert_eq!(state.queue.len(), QUEUE_HARD_LIMIT);
886    }
887
888    #[test]
889    fn failed_flush_leaves_queue_intact() {
890        use diesel::connection::SimpleConnection;
891        let (mut state, _dir) = test_state();
892        // Drop the metrics table so every insert in the next flush fails. We
893        // do this from the test, not via a production API, to keep the test
894        // hook out of the public crate surface.
895        state
896            .db
897            .batch_execute("DROP TABLE metrics")
898            .expect("setup: drop metrics table");
899        for i in 0..5_000 {
900            state.queue.push_back(NewMetric {
901                timestamp: 0.0,
902                metric_key_id: 1,
903                value: i as f64,
904            });
905        }
906        let before = state.queue.len();
907        assert!(state.flush().is_err(), "flush should fail");
908        // The whole point of the in-place flush: the queue is intact.
909        assert_eq!(state.queue.len(), before);
910        assert_eq!(state.queue.front().unwrap().value, 0.0);
911        assert_eq!(state.queue.back().unwrap().value, 4_999.0);
912        assert_eq!(state.consecutive_flush_failures, 1);
913    }
914
915    #[test]
916    fn reconnect_drops_queue_when_database_is_recreated() {
917        use crate::setup_db;
918        use diesel::connection::SimpleConnection;
919        use std::io::{Seek, SeekFrom, Write};
920        let (mut state, _dir) = test_state();
921        for i in 0..100 {
922            state.queue.push_back(NewMetric {
923                timestamp: 0.0,
924                metric_key_id: 1,
925                value: i as f64,
926            });
927        }
928        // Force pending writes into the main DB file and truncate the WAL so
929        // the corruption below isn't masked by WAL contents.
930        state
931            .db
932            .batch_execute("PRAGMA wal_checkpoint(TRUNCATE)")
933            .expect("setup: wal checkpoint");
934        // Release the real DB's file handle before we corrupt and reset. On
935        // Windows the malformed-reset path can't `DeleteFile` a file that
936        // still has an open handle, so without this swap the reset silently
937        // fails and the queue never gets cleared (unlike POSIX, where unlink
938        // works through open handles). The in-memory placeholder just keeps
939        // `InnerState` in a valid shape until `reconnect` replaces it.
940        state.db = setup_db(":memory:").expect("setup: in-memory placeholder");
941        // Keep a valid SQLite header (first 100 bytes) but overwrite a later
942        // page so the next `PRAGMA quick_check` trips the malformed path. We
943        // overwrite a long enough region to clobber whichever page holds the
944        // schema, regardless of page size.
945        let mut f = std::fs::OpenOptions::new()
946            .write(true)
947            .open(&state.db_path)
948            .expect("setup: open db file");
949        f.seek(SeekFrom::Start(100))
950            .expect("setup: seek past header");
951        f.write_all(&[0xffu8; 16 * 1024])
952            .expect("setup: write garbage pages");
953        f.sync_all().expect("setup: sync garbage to disk");
954        drop(f);
955
956        state.reconnect();
957
958        // Reset happened: the rebuilt DB has fresh `metric_keys` ids, so the
959        // queued rows must be dropped to avoid orphaning them.
960        assert!(
961            state.queue.is_empty(),
962            "queue should be cleared after reset"
963        );
964        assert_eq!(state.consecutive_flush_failures, 0);
965        // And the new connection is usable.
966        assert!(state.flush().is_ok());
967    }
968
969    #[test]
970    fn reconnect_rebuilds_connection_with_backoff() {
971        let (mut state, _dir) = test_state();
972        state.consecutive_flush_failures = 5;
973        state.reconnect();
974        // A successful reconnect clears the failure counter and records when it
975        // happened.
976        assert_eq!(state.consecutive_flush_failures, 0);
977        let first_attempt = state.last_reconnect.expect("reconnect should run");
978        // The rebuilt connection is usable.
979        assert!(state.flush().is_ok());
980
981        // A second reconnect right away is suppressed by the backoff, so it
982        // neither re-attempts nor touches state.
983        state.consecutive_flush_failures = 5;
984        state.reconnect();
985        assert_eq!(state.consecutive_flush_failures, 5);
986        assert_eq!(state.last_reconnect, Some(first_attempt));
987    }
988
989    #[test]
990    fn log_throttle_suppresses_and_reports_count() {
991        let mut throttle = LogThrottle::new(Duration::from_millis(50));
992        // First call is always allowed, with nothing suppressed yet.
993        assert_eq!(throttle.allow(), Some(0));
994        // Rapid follow-ups are suppressed.
995        for _ in 0..7 {
996            assert_eq!(throttle.allow(), None);
997        }
998        // Once the interval elapses, the next call is allowed and reports how
999        // many were suppressed in between.
1000        std::thread::sleep(Duration::from_millis(60));
1001        assert_eq!(throttle.allow(), Some(7));
1002        // Counter resets after reporting.
1003        std::thread::sleep(Duration::from_millis(60));
1004        assert_eq!(throttle.allow(), Some(0));
1005    }
1006
1007    #[test]
1008    fn test_threading() {
1009        use std::thread;
1010        SqliteExporter::new(Duration::from_millis(500), None, "metrics.db")
1011            .unwrap()
1012            .install()
1013            .unwrap();
1014        let joins: Vec<thread::JoinHandle<()>> = (0..5)
1015            .map(|_| {
1016                thread::spawn(move || {
1017                    let start = Instant::now();
1018                    loop {
1019                        metrics::gauge!("rate").set(1.0);
1020                        metrics::counter!("hits").increment(1);
1021                        metrics::histogram!("histogram").record(5.0);
1022                        if start.elapsed().as_secs() >= 5 {
1023                            break;
1024                        }
1025                    }
1026                })
1027            })
1028            .collect();
1029        for j in joins {
1030            j.join().unwrap();
1031        }
1032    }
1033}